lightning_background_processor/
lib.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
11//! running properly, and (2) either can or should be run in the background.
12#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
13#![deny(rustdoc::broken_intra_doc_links)]
14#![deny(rustdoc::private_intra_doc_links)]
15#![deny(missing_docs)]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
18
19#[cfg(any(test, feature = "std"))]
20extern crate core;
21
22#[cfg(not(feature = "std"))]
23extern crate alloc;
24
25#[macro_use]
26extern crate lightning;
27extern crate lightning_rapid_gossip_sync;
28
29mod fwd_batch;
30
31use fwd_batch::BatchDelay;
32
33use lightning::chain;
34use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35use lightning::chain::chainmonitor::{ChainMonitor, Persist};
36#[cfg(feature = "std")]
37use lightning::events::EventHandler;
38#[cfg(feature = "std")]
39use lightning::events::EventsProvider;
40use lightning::events::ReplayEvent;
41use lightning::events::{Event, PathFailure};
42use lightning::util::ser::Writeable;
43
44use lightning::ln::channelmanager::AChannelManager;
45use lightning::ln::msgs::OnionMessageHandler;
46use lightning::ln::peer_handler::APeerManager;
47use lightning::onion_message::messenger::AOnionMessenger;
48use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
49use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
50use lightning::routing::utxo::UtxoLookup;
51use lightning::sign::{
52	ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53};
54use lightning::util::logger::Logger;
55use lightning::util::persist::{
56	KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
57	CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
58	NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
59	NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
60	SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
61};
62use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
63#[cfg(feature = "std")]
64use lightning::util::wakers::Sleeper;
65use lightning_rapid_gossip_sync::RapidGossipSync;
66
67use lightning_liquidity::ALiquidityManager;
68#[cfg(feature = "std")]
69use lightning_liquidity::ALiquidityManagerSync;
70
71use core::ops::Deref;
72use core::time::Duration;
73
74#[cfg(feature = "std")]
75use core::sync::atomic::{AtomicBool, Ordering};
76#[cfg(feature = "std")]
77use std::sync::Arc;
78#[cfg(feature = "std")]
79use std::thread::{self, JoinHandle};
80#[cfg(feature = "std")]
81use std::time::Instant;
82
83#[cfg(not(feature = "std"))]
84use alloc::boxed::Box;
85#[cfg(all(not(c_bindings), not(feature = "std")))]
86use alloc::sync::Arc;
87
88/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
89/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
90/// responsibilities are:
91/// * Processing [`Event`]s with a user-provided [`EventHandler`].
92/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
93///   writing it to disk/backups by invoking the callback given to it at startup.
94///   [`ChannelManager`] persistence should be done in the background.
95/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
96///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
97/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
98///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
99///
100/// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
101/// upon as doing so may result in high latency.
102///
103/// # Note
104///
105/// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
106/// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
107/// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
108/// unilateral chain closure fees are at risk.
109///
110/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
111/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
112/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
113/// [`Event`]: lightning::events::Event
114/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
115/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
116#[cfg(feature = "std")]
117#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
118pub struct BackgroundProcessor {
119	stop_thread: Arc<AtomicBool>,
120	thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
121}
122
123#[cfg(not(test))]
124const FRESHNESS_TIMER: Duration = Duration::from_secs(60);
125#[cfg(test)]
126const FRESHNESS_TIMER: Duration = Duration::from_secs(1);
127
128#[cfg(all(not(test), not(debug_assertions)))]
129const PING_TIMER: Duration = Duration::from_secs(10);
130/// Signature operations take a lot longer without compiler optimisations.
131/// Increasing the ping timer allows for this but slower devices will be disconnected if the
132/// timeout is reached.
133#[cfg(all(not(test), debug_assertions))]
134const PING_TIMER: Duration = Duration::from_secs(30);
135#[cfg(test)]
136const PING_TIMER: Duration = Duration::from_secs(1);
137
138#[cfg(not(test))]
139const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(10);
140#[cfg(test)]
141const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(1);
142
143/// Prune the network graph of stale entries hourly.
144const NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60 * 60);
145
146#[cfg(not(test))]
147const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(60 * 5);
148#[cfg(test)]
149const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(1);
150
151#[cfg(not(test))]
152const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60);
153#[cfg(test)]
154const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(1);
155
156#[cfg(not(test))]
157const REBROADCAST_TIMER: Duration = Duration::from_secs(30);
158#[cfg(test)]
159const REBROADCAST_TIMER: Duration = Duration::from_secs(1);
160
161#[cfg(not(test))]
162const SWEEPER_TIMER: Duration = Duration::from_secs(30);
163#[cfg(test)]
164const SWEEPER_TIMER: Duration = Duration::from_secs(1);
165
166/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
167const fn min_duration(a: Duration, b: Duration) -> Duration {
168	if a.as_nanos() < b.as_nanos() {
169		a
170	} else {
171		b
172	}
173}
174const FASTEST_TIMER: Duration = min_duration(
175	min_duration(FRESHNESS_TIMER, PING_TIMER),
176	min_duration(SCORER_PERSIST_TIMER, min_duration(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
177);
178
179/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
180pub enum GossipSync<
181	P: Deref<Target = P2PGossipSync<G, U, L>>,
182	R: Deref<Target = RapidGossipSync<G, L>>,
183	G: Deref<Target = NetworkGraph<L>>,
184	U: Deref,
185	L: Deref,
186> where
187	U::Target: UtxoLookup,
188	L::Target: Logger,
189{
190	/// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
191	P2P(P),
192	/// Rapid gossip sync from a trusted server.
193	Rapid(R),
194	/// No gossip sync.
195	None,
196}
197
198impl<
199		P: Deref<Target = P2PGossipSync<G, U, L>>,
200		R: Deref<Target = RapidGossipSync<G, L>>,
201		G: Deref<Target = NetworkGraph<L>>,
202		U: Deref,
203		L: Deref,
204	> GossipSync<P, R, G, U, L>
205where
206	U::Target: UtxoLookup,
207	L::Target: Logger,
208{
209	fn network_graph(&self) -> Option<&G> {
210		match self {
211			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
212			GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
213			GossipSync::None => None,
214		}
215	}
216
217	fn prunable_network_graph(&self) -> Option<&G> {
218		match self {
219			GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
220			GossipSync::Rapid(gossip_sync) => {
221				if gossip_sync.is_initial_sync_complete() {
222					Some(gossip_sync.network_graph())
223				} else {
224					None
225				}
226			},
227			GossipSync::None => None,
228		}
229	}
230}
231
232/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
233impl<
234		P: Deref<Target = P2PGossipSync<G, U, L>>,
235		G: Deref<Target = NetworkGraph<L>>,
236		U: Deref,
237		L: Deref,
238	> GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
239where
240	U::Target: UtxoLookup,
241	L::Target: Logger,
242{
243	/// Initializes a new [`GossipSync::P2P`] variant.
244	pub fn p2p(gossip_sync: P) -> Self {
245		GossipSync::P2P(gossip_sync)
246	}
247}
248
249/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
250impl<
251		'a,
252		R: Deref<Target = RapidGossipSync<G, L>>,
253		G: Deref<Target = NetworkGraph<L>>,
254		L: Deref,
255	>
256	GossipSync<
257		&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
258		R,
259		G,
260		&'a (dyn UtxoLookup + Send + Sync),
261		L,
262	> where
263	L::Target: Logger,
264{
265	/// Initializes a new [`GossipSync::Rapid`] variant.
266	pub fn rapid(gossip_sync: R) -> Self {
267		GossipSync::Rapid(gossip_sync)
268	}
269}
270
271/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
272impl<'a, L: Deref>
273	GossipSync<
274		&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
275		&RapidGossipSync<&'a NetworkGraph<L>, L>,
276		&'a NetworkGraph<L>,
277		&'a (dyn UtxoLookup + Send + Sync),
278		L,
279	> where
280	L::Target: Logger,
281{
282	/// Initializes a new [`GossipSync::None`] variant.
283	pub fn none() -> Self {
284		GossipSync::None
285	}
286}
287
288fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
289where
290	L::Target: Logger,
291{
292	if let Event::PaymentPathFailed {
293		failure: PathFailure::OnPath { network_update: Some(ref upd) },
294		..
295	} = event
296	{
297		network_graph.handle_network_update(upd);
298	}
299}
300
301/// Updates scorer based on event and returns whether an update occurred so we can decide whether
302/// to persist.
303fn update_scorer<'a, S: Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
304	scorer: &'a S, event: &Event, duration_since_epoch: Duration,
305) -> bool {
306	match event {
307		Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
308			let mut score = scorer.write_lock();
309			score.payment_path_failed(path, *scid, duration_since_epoch);
310		},
311		Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
312			// Reached if the destination explicitly failed it back. We treat this as a successful probe
313			// because the payment made it all the way to the destination with sufficient liquidity.
314			let mut score = scorer.write_lock();
315			score.probe_successful(path, duration_since_epoch);
316		},
317		Event::PaymentPathSuccessful { path, .. } => {
318			let mut score = scorer.write_lock();
319			score.payment_path_successful(path, duration_since_epoch);
320		},
321		Event::ProbeSuccessful { path, .. } => {
322			let mut score = scorer.write_lock();
323			score.probe_successful(path, duration_since_epoch);
324		},
325		Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
326			let mut score = scorer.write_lock();
327			score.probe_failed(path, *scid, duration_since_epoch);
328		},
329		_ => return false,
330	}
331	true
332}
333
334#[cfg(all(not(c_bindings), feature = "std"))]
335type ScorerWrapper<T> = std::sync::RwLock<T>;
336
337#[cfg(all(not(c_bindings), not(feature = "std")))]
338type ScorerWrapper<T> = core::cell::RefCell<T>;
339
340#[cfg(not(c_bindings))]
341type DynRouter = lightning::routing::router::DefaultRouter<
342	&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
343	&'static (dyn Logger + Send + Sync),
344	&'static (dyn EntropySource + Send + Sync),
345	&'static ScorerWrapper<
346		lightning::routing::scoring::ProbabilisticScorer<
347			&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
348			&'static (dyn Logger + Send + Sync),
349		>,
350	>,
351	lightning::routing::scoring::ProbabilisticScoringFeeParameters,
352	lightning::routing::scoring::ProbabilisticScorer<
353		&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
354		&'static (dyn Logger + Send + Sync),
355	>,
356>;
357
358#[cfg(not(c_bindings))]
359type DynMessageRouter = lightning::onion_message::messenger::DefaultMessageRouter<
360	&'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
361	&'static (dyn Logger + Send + Sync),
362	&'static (dyn EntropySource + Send + Sync),
363>;
364
365#[cfg(all(not(c_bindings), not(taproot)))]
366type DynSignerProvider = dyn lightning::sign::SignerProvider<EcdsaSigner = lightning::sign::InMemorySigner>
367	+ Send
368	+ Sync;
369
370#[cfg(all(not(c_bindings), taproot))]
371type DynSignerProvider = (dyn lightning::sign::SignerProvider<
372	EcdsaSigner = lightning::sign::InMemorySigner,
373	TaprootSigner = lightning::sign::InMemorySigner,
374> + Send
375     + Sync);
376
377#[cfg(not(c_bindings))]
378type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
379	&'static (dyn chain::Watch<lightning::sign::InMemorySigner> + Send + Sync),
380	&'static (dyn BroadcasterInterface + Send + Sync),
381	&'static (dyn EntropySource + Send + Sync),
382	&'static (dyn lightning::sign::NodeSigner + Send + Sync),
383	&'static DynSignerProvider,
384	&'static (dyn FeeEstimator + Send + Sync),
385	&'static DynRouter,
386	&'static DynMessageRouter,
387	&'static (dyn Logger + Send + Sync),
388>;
389
390/// When initializing a background processor without an onion messenger, this can be used to avoid
391/// specifying a concrete `OnionMessenger` type.
392#[cfg(not(c_bindings))]
393pub const NO_ONION_MESSENGER: Option<
394	Arc<
395		dyn AOnionMessenger<
396				EntropySource = dyn EntropySource + Send + Sync,
397				ES = &(dyn EntropySource + Send + Sync),
398				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
399				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
400				Logger = dyn Logger + Send + Sync,
401				L = &'static (dyn Logger + Send + Sync),
402				NodeIdLookUp = DynChannelManager,
403				NL = &'static DynChannelManager,
404				MessageRouter = DynMessageRouter,
405				MR = &'static DynMessageRouter,
406				OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
407				OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
408				AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
409				APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
410				DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
411				DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
412				CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
413				CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
414			> + Send
415			+ Sync,
416	>,
417> = None;
418
419/// When initializing a background processor without a liquidity manager, this can be used to avoid
420/// specifying a concrete `LiquidityManager` type.
421#[cfg(not(c_bindings))]
422pub const NO_LIQUIDITY_MANAGER: Option<
423	Arc<
424		dyn ALiquidityManager<
425				EntropySource = dyn EntropySource + Send + Sync,
426				ES = &(dyn EntropySource + Send + Sync),
427				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
428				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
429				AChannelManager = DynChannelManager,
430				CM = &DynChannelManager,
431				Filter = dyn chain::Filter + Send + Sync,
432				C = &(dyn chain::Filter + Send + Sync),
433				KVStore = dyn lightning::util::persist::KVStore + Send + Sync,
434				K = &(dyn lightning::util::persist::KVStore + Send + Sync),
435				TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
436				TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
437				BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
438				                           + Send
439				                           + Sync,
440				T = &(dyn BroadcasterInterface + Send + Sync),
441			> + Send
442			+ Sync,
443	>,
444> = None;
445
446/// When initializing a background processor without a liquidity manager, this can be used to avoid
447/// specifying a concrete `LiquidityManagerSync` type.
448#[cfg(all(not(c_bindings), feature = "std"))]
449pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
450	Arc<
451		dyn ALiquidityManagerSync<
452				EntropySource = dyn EntropySource + Send + Sync,
453				ES = &(dyn EntropySource + Send + Sync),
454				NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
455				NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
456				AChannelManager = DynChannelManager,
457				CM = &DynChannelManager,
458				Filter = dyn chain::Filter + Send + Sync,
459				C = &(dyn chain::Filter + Send + Sync),
460				KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
461				KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
462				TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
463				TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
464				BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
465				                           + Send
466				                           + Sync,
467				T = &(dyn BroadcasterInterface + Send + Sync),
468			> + Send
469			+ Sync,
470	>,
471> = None;
472
473pub(crate) mod futures_util {
474	use core::future::Future;
475	use core::marker::Unpin;
476	use core::pin::Pin;
477	use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
478	pub(crate) struct Selector<
479		A: Future<Output = ()> + Unpin,
480		B: Future<Output = ()> + Unpin,
481		C: Future<Output = ()> + Unpin,
482		D: Future<Output = ()> + Unpin,
483		E: Future<Output = bool> + Unpin,
484	> {
485		pub a: A,
486		pub b: B,
487		pub c: C,
488		pub d: D,
489		pub e: E,
490	}
491
492	pub(crate) enum SelectorOutput {
493		A,
494		B,
495		C,
496		D,
497		E(bool),
498	}
499
500	impl<
501			A: Future<Output = ()> + Unpin,
502			B: Future<Output = ()> + Unpin,
503			C: Future<Output = ()> + Unpin,
504			D: Future<Output = ()> + Unpin,
505			E: Future<Output = bool> + Unpin,
506		> Future for Selector<A, B, C, D, E>
507	{
508		type Output = SelectorOutput;
509		fn poll(
510			mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
511		) -> Poll<SelectorOutput> {
512			match Pin::new(&mut self.a).poll(ctx) {
513				Poll::Ready(()) => {
514					return Poll::Ready(SelectorOutput::A);
515				},
516				Poll::Pending => {},
517			}
518			match Pin::new(&mut self.b).poll(ctx) {
519				Poll::Ready(()) => {
520					return Poll::Ready(SelectorOutput::B);
521				},
522				Poll::Pending => {},
523			}
524			match Pin::new(&mut self.c).poll(ctx) {
525				Poll::Ready(()) => {
526					return Poll::Ready(SelectorOutput::C);
527				},
528				Poll::Pending => {},
529			}
530			match Pin::new(&mut self.d).poll(ctx) {
531				Poll::Ready(()) => {
532					return Poll::Ready(SelectorOutput::D);
533				},
534				Poll::Pending => {},
535			}
536			match Pin::new(&mut self.e).poll(ctx) {
537				Poll::Ready(res) => {
538					return Poll::Ready(SelectorOutput::E(res));
539				},
540				Poll::Pending => {},
541			}
542			Poll::Pending
543		}
544	}
545
546	/// A selector that takes a future wrapped in an option that will be polled if it is `Some` and
547	/// will always be pending otherwise.
548	pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
549		pub optional_future: Option<F>,
550	}
551
552	impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
553		type Output = ();
554		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
555			match self.optional_future.as_mut() {
556				Some(f) => match Pin::new(f).poll(ctx) {
557					Poll::Ready(()) => {
558						self.optional_future.take();
559						Poll::Ready(())
560					},
561					Poll::Pending => Poll::Pending,
562				},
563				None => Poll::Pending,
564			}
565		}
566	}
567
568	// If we want to poll a future without an async context to figure out if it has completed or
569	// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
570	// but sadly there's a good bit of boilerplate here.
571	fn dummy_waker_clone(_: *const ()) -> RawWaker {
572		RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
573	}
574	fn dummy_waker_action(_: *const ()) {}
575
576	const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
577		dummy_waker_clone,
578		dummy_waker_action,
579		dummy_waker_action,
580		dummy_waker_action,
581	);
582	pub(crate) fn dummy_waker() -> Waker {
583		unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
584	}
585
586	enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
587		Pending(Option<F>),
588		Ready(Result<(), ERR>),
589	}
590
591	pub(crate) struct Joiner<
592		ERR,
593		A: Future<Output = Result<(), ERR>> + Unpin,
594		B: Future<Output = Result<(), ERR>> + Unpin,
595		C: Future<Output = Result<(), ERR>> + Unpin,
596		D: Future<Output = Result<(), ERR>> + Unpin,
597		E: Future<Output = Result<(), ERR>> + Unpin,
598	> {
599		a: JoinerResult<ERR, A>,
600		b: JoinerResult<ERR, B>,
601		c: JoinerResult<ERR, C>,
602		d: JoinerResult<ERR, D>,
603		e: JoinerResult<ERR, E>,
604	}
605
606	impl<
607			ERR,
608			A: Future<Output = Result<(), ERR>> + Unpin,
609			B: Future<Output = Result<(), ERR>> + Unpin,
610			C: Future<Output = Result<(), ERR>> + Unpin,
611			D: Future<Output = Result<(), ERR>> + Unpin,
612			E: Future<Output = Result<(), ERR>> + Unpin,
613		> Joiner<ERR, A, B, C, D, E>
614	{
615		pub(crate) fn new() -> Self {
616			Self {
617				a: JoinerResult::Pending(None),
618				b: JoinerResult::Pending(None),
619				c: JoinerResult::Pending(None),
620				d: JoinerResult::Pending(None),
621				e: JoinerResult::Pending(None),
622			}
623		}
624
625		pub(crate) fn set_a(&mut self, fut: A) {
626			self.a = JoinerResult::Pending(Some(fut));
627		}
628		pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
629			self.a = JoinerResult::Ready(res);
630		}
631		pub(crate) fn set_b(&mut self, fut: B) {
632			self.b = JoinerResult::Pending(Some(fut));
633		}
634		pub(crate) fn set_c(&mut self, fut: C) {
635			self.c = JoinerResult::Pending(Some(fut));
636		}
637		pub(crate) fn set_d(&mut self, fut: D) {
638			self.d = JoinerResult::Pending(Some(fut));
639		}
640		pub(crate) fn set_e(&mut self, fut: E) {
641			self.e = JoinerResult::Pending(Some(fut));
642		}
643	}
644
645	impl<
646			ERR,
647			A: Future<Output = Result<(), ERR>> + Unpin,
648			B: Future<Output = Result<(), ERR>> + Unpin,
649			C: Future<Output = Result<(), ERR>> + Unpin,
650			D: Future<Output = Result<(), ERR>> + Unpin,
651			E: Future<Output = Result<(), ERR>> + Unpin,
652		> Future for Joiner<ERR, A, B, C, D, E>
653	where
654		Joiner<ERR, A, B, C, D, E>: Unpin,
655	{
656		type Output = [Result<(), ERR>; 5];
657		fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
658			let mut all_complete = true;
659			macro_rules! handle {
660				($val: ident) => {
661					match &mut (self.$val) {
662						JoinerResult::Pending(None) => {
663							self.$val = JoinerResult::Ready(Ok(()));
664						},
665						JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
666							match Pin::new(val).poll(ctx) {
667								Poll::Ready(res) => {
668									self.$val = JoinerResult::Ready(res);
669								},
670								Poll::Pending => {
671									all_complete = false;
672								},
673							}
674						},
675						JoinerResult::Ready(_) => {},
676					}
677				};
678			}
679			handle!(a);
680			handle!(b);
681			handle!(c);
682			handle!(d);
683			handle!(e);
684
685			if all_complete {
686				let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
687				if let JoinerResult::Ready(ref mut val) = &mut self.a {
688					core::mem::swap(&mut res[0], val);
689				}
690				if let JoinerResult::Ready(ref mut val) = &mut self.b {
691					core::mem::swap(&mut res[1], val);
692				}
693				if let JoinerResult::Ready(ref mut val) = &mut self.c {
694					core::mem::swap(&mut res[2], val);
695				}
696				if let JoinerResult::Ready(ref mut val) = &mut self.d {
697					core::mem::swap(&mut res[3], val);
698				}
699				if let JoinerResult::Ready(ref mut val) = &mut self.e {
700					core::mem::swap(&mut res[4], val);
701				}
702				Poll::Ready(res)
703			} else {
704				Poll::Pending
705			}
706		}
707	}
708}
709use core::task;
710use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
711
712/// Processes background events in a future.
713///
714/// `sleeper` should return a future which completes in the given amount of time and returns a
715/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
716/// future which outputs `true`, the loop will exit and this function's future will complete.
717/// The `sleeper` future is free to return early after it has triggered the exit condition.
718///
719#[cfg_attr(
720	feature = "std",
721	doc = " See [`BackgroundProcessor::start`] for information on which actions this handles.\n"
722)]
723/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
724/// mobile device, where we may need to check for interruption of the application regularly. If you
725/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
726/// are hundreds or thousands of simultaneous process calls running.
727///
728/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
729/// no time is available, some features may be disabled, however the node will still operate fine.
730///
731/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
732/// could setup `process_events_async` like this:
733/// ```
734/// # use lightning::io;
735/// # use lightning::events::ReplayEvent;
736/// # use std::sync::{Arc, RwLock};
737/// # use std::sync::atomic::{AtomicBool, Ordering};
738/// # use std::time::SystemTime;
739/// # use lightning_background_processor::{process_events_async, GossipSync};
740/// # use core::future::Future;
741/// # use core::pin::Pin;
742/// # use lightning_liquidity::utils::time::TimeProvider;
743/// # struct Logger {}
744/// # impl lightning::util::logger::Logger for Logger {
745/// #     fn log(&self, _record: lightning::util::logger::Record) {}
746/// # }
747/// # struct StoreSync {}
748/// # impl lightning::util::persist::KVStoreSync for StoreSync {
749/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
750/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
751/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
752/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
753/// # }
754/// # struct Store {}
755/// # impl lightning::util::persist::KVStore for Store {
756/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
757/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
758/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
759/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
760/// # }
761/// # use core::time::Duration;
762/// # struct DefaultTimeProvider;
763/// #
764/// # impl TimeProvider for DefaultTimeProvider {
765/// #    fn duration_since_epoch(&self) -> Duration {
766/// #        use std::time::{SystemTime, UNIX_EPOCH};
767/// #        SystemTime::now().duration_since(UNIX_EPOCH).expect("system time before Unix epoch")
768/// #    }
769/// # }
770/// # struct EventHandler {}
771/// # impl EventHandler {
772/// #     async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
773/// # }
774/// # #[derive(Eq, PartialEq, Clone, Hash)]
775/// # struct SocketDescriptor {}
776/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
777/// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
778/// #     fn disconnect_socket(&mut self) {}
779/// # }
780/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
781/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
782/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
783/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
784/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
785/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>, Arc<Store>, Arc<DefaultTimeProvider>, Arc<B>>;
786/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
787/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
788/// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>;
789///
790/// # struct Node<
791/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
792/// #     F: lightning::chain::Filter + Send + Sync + 'static,
793/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
794/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
795/// #     D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
796/// #     O: lightning::sign::OutputSpender + Send + Sync + 'static,
797/// # > {
798/// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
799/// #     event_handler: Arc<EventHandler>,
800/// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
801/// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
802/// #     liquidity_manager: Arc<LiquidityManager<B, F, FE>>,
803/// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
804/// #     gossip_sync: Arc<P2PGossipSync<UL>>,
805/// #     persister: Arc<Store>,
806/// #     logger: Arc<Logger>,
807/// #     scorer: Arc<Scorer>,
808/// #     sweeper: Arc<OutputSweeper<B, D, FE, F, O>>,
809/// # }
810/// #
811/// # async fn setup_background_processing<
812/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
813/// #     F: lightning::chain::Filter + Send + Sync + 'static,
814/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
815/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
816/// #     D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
817/// #     O: lightning::sign::OutputSpender + Send + Sync + 'static,
818/// # >(node: Node<B, F, FE, UL, D, O>) {
819///	let background_persister = Arc::clone(&node.persister);
820///	let background_event_handler = Arc::clone(&node.event_handler);
821///	let background_chain_mon = Arc::clone(&node.chain_monitor);
822///	let background_chan_man = Arc::clone(&node.channel_manager);
823///	let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
824///	let background_peer_man = Arc::clone(&node.peer_manager);
825///	let background_onion_messenger = Arc::clone(&node.onion_messenger);
826///	let background_liquidity_manager = Arc::clone(&node.liquidity_manager);
827///	let background_logger = Arc::clone(&node.logger);
828///	let background_scorer = Arc::clone(&node.scorer);
829///	let background_sweeper = Arc::clone(&node.sweeper);
830///	// Setup the sleeper.
831#[cfg_attr(
832	feature = "std",
833	doc = "	let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());"
834)]
835#[cfg_attr(feature = "std", doc = "")]
836///	let sleeper = move |d| {
837#[cfg_attr(feature = "std", doc = "		let mut receiver = stop_receiver.clone();")]
838///		Box::pin(async move {
839///			tokio::select!{
840///				_ = tokio::time::sleep(d) => false,
841#[cfg_attr(feature = "std", doc = "				_ = receiver.changed() => true,")]
842///			}
843///		})
844///	};
845///
846///	let mobile_interruptable_platform = false;
847///
848#[cfg_attr(feature = "std", doc = "	let handle = tokio::spawn(async move {")]
849#[cfg_attr(
850	not(feature = "std"),
851	doc = "	let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
852)]
853#[cfg_attr(not(feature = "std"), doc = "	rt.block_on(async move {")]
854///		process_events_async(
855///			background_persister,
856///			|e| background_event_handler.handle_event(e),
857///			background_chain_mon,
858///			background_chan_man,
859///			Some(background_onion_messenger),
860///			background_gossip_sync,
861///			background_peer_man,
862///			Some(background_liquidity_manager),
863///			Some(background_sweeper),
864///			background_logger,
865///			Some(background_scorer),
866///			sleeper,
867///			mobile_interruptable_platform,
868///			|| Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
869///		)
870///		.await
871///		.expect("Failed to process events");
872///	});
873///
874///	// Stop the background processing.
875#[cfg_attr(feature = "std", doc = "	stop_sender.send(()).unwrap();")]
876#[cfg_attr(feature = "std", doc = "	handle.await.unwrap()")]
877///	# }
878///```
879pub async fn process_events_async<
880	'a,
881	UL: Deref,
882	CF: Deref,
883	T: Deref,
884	F: Deref,
885	G: Deref<Target = NetworkGraph<L>>,
886	L: Deref,
887	P: Deref,
888	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
889	EventHandler: Fn(Event) -> EventHandlerFuture,
890	ES: Deref + Send,
891	M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
892		+ Send
893		+ Sync,
894	CM: Deref,
895	OM: Deref,
896	PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
897	RGS: Deref<Target = RapidGossipSync<G, L>>,
898	PM: Deref,
899	LM: Deref,
900	D: Deref,
901	O: Deref,
902	K: Deref,
903	OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
904	S: Deref<Target = SC> + Send + Sync,
905	SC: for<'b> WriteableScore<'b>,
906	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
907	Sleeper: Fn(Duration) -> SleepFuture,
908	FetchTime: Fn() -> Option<Duration>,
909>(
910	kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
911	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
912	liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
913	sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
914) -> Result<(), lightning::io::Error>
915where
916	UL::Target: UtxoLookup,
917	CF::Target: chain::Filter,
918	T::Target: BroadcasterInterface,
919	F::Target: FeeEstimator,
920	L::Target: Logger,
921	P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
922	ES::Target: EntropySource,
923	CM::Target: AChannelManager,
924	OM::Target: AOnionMessenger,
925	PM::Target: APeerManager,
926	LM::Target: ALiquidityManager,
927	O::Target: OutputSpender,
928	D::Target: ChangeDestinationSource,
929	K::Target: KVStore,
930{
931	let async_event_handler = |event| {
932		let network_graph = gossip_sync.network_graph();
933		let event_handler = &event_handler;
934		let scorer = &scorer;
935		let logger = &logger;
936		let kv_store = &kv_store;
937		let fetch_time = &fetch_time;
938		// We should be able to drop the Box once our MSRV is 1.68
939		Box::pin(async move {
940			if let Some(network_graph) = network_graph {
941				handle_network_graph_update(network_graph, &event)
942			}
943			if let Some(ref scorer) = scorer {
944				if let Some(duration_since_epoch) = fetch_time() {
945					if update_scorer(scorer, &event, duration_since_epoch) {
946						log_trace!(logger, "Persisting scorer after update");
947						if let Err(e) = kv_store
948							.write(
949								SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
950								SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
951								SCORER_PERSISTENCE_KEY,
952								scorer.encode(),
953							)
954							.await
955						{
956							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
957							// We opt not to abort early on persistence failure here as persisting
958							// the scorer is non-critical and we still hope that it will have
959							// resolved itself when it is potentially critical in event handling
960							// below.
961						}
962					}
963				}
964			}
965			event_handler(event).await
966		})
967	};
968	let mut batch_delay = BatchDelay::new();
969
970	log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
971	channel_manager.get_cm().timer_tick_occurred();
972	log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
973	chain_monitor.rebroadcast_pending_claims();
974
975	let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
976	let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
977	let mut last_ping_call = sleeper(PING_TIMER);
978	let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER);
979	let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
980	let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
981	let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
982	let mut have_pruned = false;
983	let mut have_decayed_scorer = false;
984
985	let mut last_forwards_processing_call = sleeper(batch_delay.get());
986
987	loop {
988		channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
989		chain_monitor.process_pending_events_async(async_event_handler).await;
990		if let Some(om) = &onion_messenger {
991			om.get_om().process_pending_events_async(async_event_handler).await
992		}
993
994		// Note that the PeerManager::process_events may block on ChannelManager's locks,
995		// hence it comes last here. When the ChannelManager finishes whatever it's doing,
996		// we want to ensure we get into `persist_manager` as quickly as we can, especially
997		// without running the normal event processing above and handing events to users.
998		//
999		// Specifically, on an *extremely* slow machine, we may see ChannelManager start
1000		// processing a message effectively at any point during this loop. In order to
1001		// minimize the time between such processing completing and persisting the updated
1002		// ChannelManager, we want to minimize methods blocking on a ChannelManager
1003		// generally, and as a fallback place such blocking only immediately before
1004		// persistence.
1005		peer_manager.as_ref().process_events();
1006		match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
1007			sleeper(batch_delay.next())
1008		}) {
1009			Some(false) => {
1010				channel_manager.get_cm().process_pending_htlc_forwards();
1011			},
1012			Some(true) => break,
1013			None => {},
1014		}
1015
1016		// We wait up to 100ms, but track how long it takes to detect being put to sleep,
1017		// see `await_start`'s use below.
1018		let mut await_start = None;
1019		if mobile_interruptable_platform {
1020			await_start = Some(sleeper(Duration::from_secs(1)));
1021		}
1022		let om_fut = if let Some(om) = onion_messenger.as_ref() {
1023			let fut = om.get_om().get_update_future();
1024			OptionalSelector { optional_future: Some(fut) }
1025		} else {
1026			OptionalSelector { optional_future: None }
1027		};
1028		let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1029			let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1030			OptionalSelector { optional_future: Some(fut) }
1031		} else {
1032			OptionalSelector { optional_future: None }
1033		};
1034		let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
1035		let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
1036			(true, true) => batch_delay.get().min(Duration::from_millis(100)),
1037			(true, false) => batch_delay.get().min(FASTEST_TIMER),
1038			(false, true) => Duration::from_millis(100),
1039			(false, false) => FASTEST_TIMER,
1040		};
1041		let fut = Selector {
1042			a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1043			b: chain_monitor.get_update_future(),
1044			c: om_fut,
1045			d: lm_fut,
1046			e: sleeper(sleep_delay),
1047		};
1048		match fut.await {
1049			SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
1050			SelectorOutput::E(exit) => {
1051				if exit {
1052					break;
1053				}
1054			},
1055		}
1056
1057		let await_slow = if mobile_interruptable_platform {
1058			// Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next
1059			// loop iteration.
1060			match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
1061				Some(true) => break,
1062				Some(false) => true,
1063				None => false,
1064			}
1065		} else {
1066			false
1067		};
1068		match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
1069			Some(false) => {
1070				log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1071				channel_manager.get_cm().timer_tick_occurred();
1072			},
1073			Some(true) => break,
1074			None => {},
1075		}
1076
1077		let mut futures = Joiner::new();
1078
1079		if channel_manager.get_cm().get_and_clear_needs_persistence() {
1080			log_trace!(logger, "Persisting ChannelManager...");
1081
1082			let fut = async {
1083				kv_store
1084					.write(
1085						CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1086						CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1087						CHANNEL_MANAGER_PERSISTENCE_KEY,
1088						channel_manager.get_cm().encode(),
1089					)
1090					.await
1091			};
1092			// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1093			let mut fut = Box::pin(fut);
1094
1095			// Because persisting the ChannelManager is important to avoid accidental
1096			// force-closures, go ahead and poll the future once before we do slightly more
1097			// CPU-intensive tasks in the form of NetworkGraph pruning or scorer time-stepping
1098			// below. This will get it moving but won't block us for too long if the underlying
1099			// future is actually async.
1100			use core::future::Future;
1101			let mut waker = dummy_waker();
1102			let mut ctx = task::Context::from_waker(&mut waker);
1103			match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1104				task::Poll::Ready(res) => futures.set_a_res(res),
1105				task::Poll::Pending => futures.set_a(fut),
1106			}
1107
1108			log_trace!(logger, "Done persisting ChannelManager.");
1109		}
1110
1111		// Note that we want to run a graph prune once not long after startup before
1112		// falling back to our usual hourly prunes. This avoids short-lived clients never
1113		// pruning their network graph. We run once 60 seconds after startup before
1114		// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1115		// we prune after an initial sync completes.
1116		let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
1117			NETWORK_PRUNE_TIMER
1118		} else {
1119			FIRST_NETWORK_PRUNE_TIMER
1120		};
1121		let prune_timer_elapsed = {
1122			match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
1123				Some(false) => true,
1124				Some(true) => break,
1125				None => false,
1126			}
1127		};
1128
1129		let should_prune = match gossip_sync {
1130			GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1131			_ => prune_timer_elapsed,
1132		};
1133		if should_prune {
1134			// The network graph must not be pruned while rapid sync completion is pending
1135			if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1136				if let Some(duration_since_epoch) = fetch_time() {
1137					log_trace!(logger, "Pruning and persisting network graph.");
1138					network_graph.remove_stale_channels_and_tracking_with_time(
1139						duration_since_epoch.as_secs(),
1140					);
1141				} else {
1142					log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
1143					log_trace!(logger, "Persisting network graph.");
1144				}
1145				let fut = async {
1146					if let Err(e) = kv_store
1147						.write(
1148							NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1149							NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1150							NETWORK_GRAPH_PERSISTENCE_KEY,
1151							network_graph.encode(),
1152						)
1153						.await
1154					{
1155						log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1156					}
1157
1158					Ok(())
1159				};
1160
1161				// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1162				futures.set_b(Box::pin(fut));
1163
1164				have_pruned = true;
1165			}
1166		}
1167		if !have_decayed_scorer {
1168			if let Some(ref scorer) = scorer {
1169				if let Some(duration_since_epoch) = fetch_time() {
1170					log_trace!(logger, "Calling time_passed on scorer at startup");
1171					scorer.write_lock().time_passed(duration_since_epoch);
1172				}
1173			}
1174			have_decayed_scorer = true;
1175		}
1176		match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1177			sleeper(SCORER_PERSIST_TIMER)
1178		}) {
1179			Some(false) => {
1180				if let Some(ref scorer) = scorer {
1181					if let Some(duration_since_epoch) = fetch_time() {
1182						log_trace!(logger, "Calling time_passed and persisting scorer");
1183						scorer.write_lock().time_passed(duration_since_epoch);
1184					} else {
1185						log_trace!(logger, "Persisting scorer");
1186					}
1187					let fut = async {
1188						if let Err(e) = kv_store
1189							.write(
1190								SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1191								SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1192								SCORER_PERSISTENCE_KEY,
1193								scorer.encode(),
1194							)
1195							.await
1196						{
1197							log_error!(
1198							logger,
1199							"Error: Failed to persist scorer, check your disk and permissions {}",
1200							e
1201						);
1202						}
1203
1204						Ok(())
1205					};
1206
1207					// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1208					futures.set_c(Box::pin(fut));
1209				}
1210			},
1211			Some(true) => break,
1212			None => {},
1213		}
1214		match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1215			Some(false) => {
1216				log_trace!(logger, "Regenerating sweeper spends if necessary");
1217				if let Some(ref sweeper) = sweeper {
1218					let fut = async {
1219						let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1220
1221						Ok(())
1222					};
1223
1224					// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1225					futures.set_d(Box::pin(fut));
1226				}
1227			},
1228			Some(true) => break,
1229			None => {},
1230		}
1231
1232		if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1233			log_trace!(logger, "Persisting LiquidityManager...");
1234			let fut = async {
1235				liquidity_manager.get_lm().persist().await.map_err(|e| {
1236					log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1237					e
1238				})
1239			};
1240			futures.set_e(Box::pin(fut));
1241		}
1242
1243		// Run persistence tasks in parallel and exit if any of them returns an error.
1244		for res in futures.await {
1245			res?;
1246		}
1247
1248		match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1249			sleeper(ONION_MESSAGE_HANDLER_TIMER)
1250		}) {
1251			Some(false) => {
1252				if let Some(om) = &onion_messenger {
1253					log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1254					om.get_om().timer_tick_occurred();
1255				}
1256			},
1257			Some(true) => break,
1258			None => {},
1259		}
1260
1261		// Peer manager timer tick. If we were interrupted on a mobile platform, we disconnect all peers.
1262		if await_slow {
1263			// On various platforms, we may be starved of CPU cycles for several reasons.
1264			// E.g. on iOS, if we've been in the background, we will be entirely paused.
1265			// Similarly, if we're on a desktop platform and the device has been asleep, we
1266			// may not get any cycles.
1267			// We detect this by checking if our max-100ms-sleep, above, ran longer than a
1268			// full second, at which point we assume sockets may have been killed (they
1269			// appear to be at least on some platforms, even if it has only been a second).
1270			// Note that we have to take care to not get here just because user event
1271			// processing was slow at the top of the loop. For example, the sample client
1272			// may call Bitcoin Core RPCs during event handling, which very often takes
1273			// more than a handful of seconds to complete, and shouldn't disconnect all our
1274			// peers.
1275			log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
1276			peer_manager.as_ref().disconnect_all_peers();
1277			last_ping_call = sleeper(PING_TIMER);
1278		} else {
1279			match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
1280				Some(false) => {
1281					log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1282					peer_manager.as_ref().timer_tick_occurred();
1283				},
1284				Some(true) => break,
1285				_ => {},
1286			}
1287		}
1288
1289		// Rebroadcast pending claims.
1290		match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
1291			Some(false) => {
1292				log_trace!(logger, "Rebroadcasting monitor's pending claims");
1293				chain_monitor.rebroadcast_pending_claims();
1294			},
1295			Some(true) => break,
1296			None => {},
1297		}
1298	}
1299	log_trace!(logger, "Terminating background processor.");
1300
1301	// After we exit, ensure we persist the ChannelManager one final time - this avoids
1302	// some races where users quit while channel updates were in-flight, with
1303	// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1304	kv_store
1305		.write(
1306			CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1307			CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1308			CHANNEL_MANAGER_PERSISTENCE_KEY,
1309			channel_manager.get_cm().encode(),
1310		)
1311		.await?;
1312	if let Some(ref scorer) = scorer {
1313		kv_store
1314			.write(
1315				SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1316				SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1317				SCORER_PERSISTENCE_KEY,
1318				scorer.encode(),
1319			)
1320			.await?;
1321	}
1322	if let Some(network_graph) = gossip_sync.network_graph() {
1323		kv_store
1324			.write(
1325				NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1326				NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1327				NETWORK_GRAPH_PERSISTENCE_KEY,
1328				network_graph.encode(),
1329			)
1330			.await?;
1331	}
1332	Ok(())
1333}
1334
1335fn check_and_reset_sleeper<
1336	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1337>(
1338	fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
1339) -> Option<bool> {
1340	let mut waker = dummy_waker();
1341	let mut ctx = task::Context::from_waker(&mut waker);
1342	match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1343		task::Poll::Ready(exit) => {
1344			*fut = new_sleeper();
1345			Some(exit)
1346		},
1347		task::Poll::Pending => None,
1348	}
1349}
1350
1351/// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for
1352/// synchronous background persistence.
1353pub async fn process_events_async_with_kv_store_sync<
1354	UL: Deref,
1355	CF: Deref,
1356	T: Deref,
1357	F: Deref,
1358	G: Deref<Target = NetworkGraph<L>>,
1359	L: Deref + Send + Sync,
1360	P: Deref,
1361	EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1362	EventHandler: Fn(Event) -> EventHandlerFuture,
1363	ES: Deref + Send,
1364	M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1365		+ Send
1366		+ Sync,
1367	CM: Deref + Send + Sync,
1368	OM: Deref,
1369	PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1370	RGS: Deref<Target = RapidGossipSync<G, L>>,
1371	PM: Deref,
1372	LM: Deref,
1373	D: Deref,
1374	O: Deref,
1375	K: Deref,
1376	OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1377	S: Deref<Target = SC> + Send + Sync,
1378	SC: for<'b> WriteableScore<'b>,
1379	SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1380	Sleeper: Fn(Duration) -> SleepFuture,
1381	FetchTime: Fn() -> Option<Duration>,
1382>(
1383	kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1384	onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1385	liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1386	sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1387) -> Result<(), lightning::io::Error>
1388where
1389	UL::Target: UtxoLookup,
1390	CF::Target: chain::Filter,
1391	T::Target: BroadcasterInterface,
1392	F::Target: FeeEstimator,
1393	L::Target: Logger,
1394	P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1395	ES::Target: EntropySource,
1396	CM::Target: AChannelManager,
1397	OM::Target: AOnionMessenger,
1398	PM::Target: APeerManager,
1399	LM::Target: ALiquidityManager,
1400	O::Target: OutputSpender,
1401	D::Target: ChangeDestinationSourceSync,
1402	K::Target: KVStoreSync,
1403{
1404	let kv_store = KVStoreSyncWrapper(kv_store);
1405	process_events_async(
1406		kv_store,
1407		event_handler,
1408		chain_monitor,
1409		channel_manager,
1410		onion_messenger,
1411		gossip_sync,
1412		peer_manager,
1413		liquidity_manager,
1414		sweeper.as_ref().map(|os| os.sweeper_async()),
1415		logger,
1416		scorer,
1417		sleeper,
1418		mobile_interruptable_platform,
1419		fetch_time,
1420	)
1421	.await
1422}
1423
1424#[cfg(feature = "std")]
1425impl BackgroundProcessor {
1426	/// Start a background thread that takes care of responsibilities enumerated in the [top-level
1427	/// documentation].
1428	///
1429	/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
1430	/// [`KVStoreSync`] returns an error. In case of an error, the error is retrieved by calling
1431	/// either [`join`] or [`stop`].
1432	///
1433	/// # Data Persistence
1434	///
1435	/// [`KVStoreSync`] is responsible for writing out the [`ChannelManager`] to disk, and/or
1436	/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
1437	/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
1438	/// provided implementation.
1439	///
1440	/// [`KVStoreSync`] is also responsible for writing out the [`NetworkGraph`] to disk, if
1441	/// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
1442	/// See the `lightning-persister` crate for LDK's provided implementation.
1443	///
1444	/// Typically, users should either implement [`KVStoreSync`] to never return an
1445	/// error or call [`join`] and handle any error that may arise. For the latter case,
1446	/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
1447	///
1448	/// # Event Handling
1449	///
1450	/// `event_handler` is responsible for handling events that users should be notified of (e.g.,
1451	/// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
1452	/// functionality implemented by other handlers.
1453	/// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
1454	///
1455	/// # Rapid Gossip Sync
1456	///
1457	/// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
1458	/// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
1459	/// until the [`RapidGossipSync`] instance completes its first sync.
1460	///
1461	/// [top-level documentation]: BackgroundProcessor
1462	/// [`join`]: Self::join
1463	/// [`stop`]: Self::stop
1464	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1465	/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
1466	/// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
1467	/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
1468	pub fn start<
1469		'a,
1470		UL: 'static + Deref,
1471		CF: 'static + Deref,
1472		T: 'static + Deref,
1473		F: 'static + Deref + Send,
1474		G: 'static + Deref<Target = NetworkGraph<L>>,
1475		L: 'static + Deref + Send,
1476		P: 'static + Deref,
1477		EH: 'static + EventHandler + Send,
1478		ES: 'static + Deref + Send,
1479		M: 'static
1480			+ Deref<
1481				Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1482			>
1483			+ Send
1484			+ Sync,
1485		CM: 'static + Deref + Send,
1486		OM: 'static + Deref + Send,
1487		PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
1488		RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1489		PM: 'static + Deref + Send,
1490		LM: 'static + Deref + Send,
1491		S: 'static + Deref<Target = SC> + Send + Sync,
1492		SC: for<'b> WriteableScore<'b>,
1493		D: 'static + Deref,
1494		O: 'static + Deref,
1495		K: 'static + Deref + Send,
1496		OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1497	>(
1498		kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
1499		onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1500		liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1501	) -> Self
1502	where
1503		UL::Target: 'static + UtxoLookup,
1504		CF::Target: 'static + chain::Filter,
1505		T::Target: 'static + BroadcasterInterface,
1506		F::Target: 'static + FeeEstimator,
1507		L::Target: 'static + Logger,
1508		P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1509		ES::Target: 'static + EntropySource,
1510		CM::Target: AChannelManager,
1511		OM::Target: AOnionMessenger,
1512		PM::Target: APeerManager,
1513		LM::Target: ALiquidityManagerSync,
1514		D::Target: ChangeDestinationSourceSync,
1515		O::Target: 'static + OutputSpender,
1516		K::Target: 'static + KVStoreSync,
1517	{
1518		let stop_thread = Arc::new(AtomicBool::new(false));
1519		let stop_thread_clone = Arc::clone(&stop_thread);
1520		let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1521			let event_handler = |event| {
1522				let network_graph = gossip_sync.network_graph();
1523				if let Some(network_graph) = network_graph {
1524					handle_network_graph_update(network_graph, &event)
1525				}
1526				if let Some(ref scorer) = scorer {
1527					use std::time::SystemTime;
1528					let duration_since_epoch = SystemTime::now()
1529						.duration_since(SystemTime::UNIX_EPOCH)
1530						.expect("Time should be sometime after 1970");
1531					if update_scorer(scorer, &event, duration_since_epoch) {
1532						log_trace!(logger, "Persisting scorer after update");
1533						if let Err(e) = kv_store.write(
1534							SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1535							SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1536							SCORER_PERSISTENCE_KEY,
1537							scorer.encode(),
1538						) {
1539							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1540						}
1541					}
1542				}
1543				event_handler.handle_event(event)
1544			};
1545			let mut batch_delay = BatchDelay::new();
1546
1547			log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
1548			channel_manager.get_cm().timer_tick_occurred();
1549			log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1550			chain_monitor.rebroadcast_pending_claims();
1551
1552			let mut last_freshness_call = Instant::now();
1553			let mut last_onion_message_handler_call = Instant::now();
1554			let mut last_ping_call = Instant::now();
1555			let mut last_prune_call = Instant::now();
1556			let mut last_scorer_persist_call = Instant::now();
1557			let mut last_rebroadcast_call = Instant::now();
1558			let mut last_sweeper_call = Instant::now();
1559			let mut have_pruned = false;
1560			let mut have_decayed_scorer = false;
1561
1562			let mut cur_batch_delay = batch_delay.get();
1563			let mut last_forwards_processing_call = Instant::now();
1564
1565			loop {
1566				channel_manager.get_cm().process_pending_events(&event_handler);
1567				chain_monitor.process_pending_events(&event_handler);
1568				if let Some(om) = &onion_messenger {
1569					om.get_om().process_pending_events(&event_handler)
1570				};
1571
1572				// Note that the PeerManager::process_events may block on ChannelManager's locks,
1573				// hence it comes last here. When the ChannelManager finishes whatever it's doing,
1574				// we want to ensure we get into `persist_manager` as quickly as we can, especially
1575				// without running the normal event processing above and handing events to users.
1576				//
1577				// Specifically, on an *extremely* slow machine, we may see ChannelManager start
1578				// processing a message effectively at any point during this loop. In order to
1579				// minimize the time between such processing completing and persisting the updated
1580				// ChannelManager, we want to minimize methods blocking on a ChannelManager
1581				// generally, and as a fallback place such blocking only immediately before
1582				// persistence.
1583				peer_manager.as_ref().process_events();
1584				if last_forwards_processing_call.elapsed() > cur_batch_delay {
1585					channel_manager.get_cm().process_pending_htlc_forwards();
1586					cur_batch_delay = batch_delay.next();
1587					last_forwards_processing_call = Instant::now();
1588				}
1589				if stop_thread.load(Ordering::Acquire) {
1590					log_trace!(logger, "Terminating background processor.");
1591					break;
1592				}
1593				let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1594					(Some(om), Some(lm)) => Sleeper::from_four_futures(
1595						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1596						&chain_monitor.get_update_future(),
1597						&om.get_om().get_update_future(),
1598						&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1599					),
1600					(Some(om), None) => Sleeper::from_three_futures(
1601						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1602						&chain_monitor.get_update_future(),
1603						&om.get_om().get_update_future(),
1604					),
1605					(None, Some(lm)) => Sleeper::from_three_futures(
1606						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1607						&chain_monitor.get_update_future(),
1608						&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1609					),
1610					(None, None) => Sleeper::from_two_futures(
1611						&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1612						&chain_monitor.get_update_future(),
1613					),
1614				};
1615				let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1616					batch_delay.get()
1617				} else {
1618					Duration::MAX
1619				};
1620				let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1621				sleeper.wait_timeout(fastest_timeout);
1622				if stop_thread.load(Ordering::Acquire) {
1623					log_trace!(logger, "Terminating background processor.");
1624					break;
1625				}
1626				if last_freshness_call.elapsed() > FRESHNESS_TIMER {
1627					log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1628					channel_manager.get_cm().timer_tick_occurred();
1629					last_freshness_call = Instant::now();
1630				}
1631				if channel_manager.get_cm().get_and_clear_needs_persistence() {
1632					log_trace!(logger, "Persisting ChannelManager...");
1633					(kv_store.write(
1634						CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1635						CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1636						CHANNEL_MANAGER_PERSISTENCE_KEY,
1637						channel_manager.get_cm().encode(),
1638					))?;
1639					log_trace!(logger, "Done persisting ChannelManager.");
1640				}
1641
1642				if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1643					log_trace!(logger, "Persisting LiquidityManager...");
1644					let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1645						log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1646					});
1647				}
1648
1649				// Note that we want to run a graph prune once not long after startup before
1650				// falling back to our usual hourly prunes. This avoids short-lived clients never
1651				// pruning their network graph. We run once 60 seconds after startup before
1652				// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
1653				// we prune after an initial sync completes.
1654				let prune_timer =
1655					if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1656				let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
1657				let should_prune = match gossip_sync {
1658					GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1659					_ => prune_timer_elapsed,
1660				};
1661				if should_prune {
1662					// The network graph must not be pruned while rapid sync completion is pending
1663					if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1664						let duration_since_epoch = std::time::SystemTime::now()
1665							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1666							.expect("Time should be sometime after 1970");
1667
1668						log_trace!(logger, "Pruning and persisting network graph.");
1669						network_graph.remove_stale_channels_and_tracking_with_time(
1670							duration_since_epoch.as_secs(),
1671						);
1672						if let Err(e) = kv_store.write(
1673							NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1674							NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1675							NETWORK_GRAPH_PERSISTENCE_KEY,
1676							network_graph.encode(),
1677						) {
1678							log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
1679						}
1680						have_pruned = true;
1681					}
1682					last_prune_call = Instant::now();
1683				}
1684				if !have_decayed_scorer {
1685					if let Some(ref scorer) = scorer {
1686						let duration_since_epoch = std::time::SystemTime::now()
1687							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1688							.expect("Time should be sometime after 1970");
1689						log_trace!(logger, "Calling time_passed on scorer at startup");
1690						scorer.write_lock().time_passed(duration_since_epoch);
1691					}
1692					have_decayed_scorer = true;
1693				}
1694				if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER {
1695					if let Some(ref scorer) = scorer {
1696						let duration_since_epoch = std::time::SystemTime::now()
1697							.duration_since(std::time::SystemTime::UNIX_EPOCH)
1698							.expect("Time should be sometime after 1970");
1699						log_trace!(logger, "Calling time_passed and persisting scorer");
1700						scorer.write_lock().time_passed(duration_since_epoch);
1701						if let Err(e) = kv_store.write(
1702							SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1703							SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1704							SCORER_PERSISTENCE_KEY,
1705							scorer.encode(),
1706						) {
1707							log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
1708						}
1709					}
1710					last_scorer_persist_call = Instant::now();
1711				}
1712				if last_sweeper_call.elapsed() > SWEEPER_TIMER {
1713					log_trace!(logger, "Regenerating sweeper spends if necessary");
1714					if let Some(ref sweeper) = sweeper {
1715						let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1716					}
1717					last_sweeper_call = Instant::now();
1718				}
1719				if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
1720					if let Some(om) = &onion_messenger {
1721						log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1722						om.get_om().timer_tick_occurred();
1723					}
1724					last_onion_message_handler_call = Instant::now();
1725				}
1726				if last_ping_call.elapsed() > PING_TIMER {
1727					log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1728					peer_manager.as_ref().timer_tick_occurred();
1729					last_ping_call = Instant::now();
1730				}
1731				if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
1732					log_trace!(logger, "Rebroadcasting monitor's pending claims");
1733					chain_monitor.rebroadcast_pending_claims();
1734					last_rebroadcast_call = Instant::now();
1735				}
1736			}
1737
1738			// After we exit, ensure we persist the ChannelManager one final time - this avoids
1739			// some races where users quit while channel updates were in-flight, with
1740			// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
1741			kv_store.write(
1742				CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1743				CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1744				CHANNEL_MANAGER_PERSISTENCE_KEY,
1745				channel_manager.get_cm().encode(),
1746			)?;
1747			if let Some(ref scorer) = scorer {
1748				kv_store.write(
1749					SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1750					SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1751					SCORER_PERSISTENCE_KEY,
1752					scorer.encode(),
1753				)?;
1754			}
1755			if let Some(network_graph) = gossip_sync.network_graph() {
1756				kv_store.write(
1757					NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1758					NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1759					NETWORK_GRAPH_PERSISTENCE_KEY,
1760					network_graph.encode(),
1761				)?;
1762			}
1763			Ok(())
1764		});
1765		Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1766	}
1767
1768	/// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1769	/// [`ChannelManager`].
1770	///
1771	/// # Panics
1772	///
1773	/// This function panics if the background thread has panicked such as while persisting or
1774	/// handling events.
1775	///
1776	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1777	pub fn join(mut self) -> Result<(), std::io::Error> {
1778		assert!(self.thread_handle.is_some());
1779		self.join_thread()
1780	}
1781
1782	/// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
1783	/// [`ChannelManager`].
1784	///
1785	/// # Panics
1786	///
1787	/// This function panics if the background thread has panicked such as while persisting or
1788	/// handling events.
1789	///
1790	/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1791	pub fn stop(mut self) -> Result<(), std::io::Error> {
1792		assert!(self.thread_handle.is_some());
1793		self.stop_and_join_thread()
1794	}
1795
1796	fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1797		self.stop_thread.store(true, Ordering::Release);
1798		self.join_thread()
1799	}
1800
1801	fn join_thread(&mut self) -> Result<(), std::io::Error> {
1802		match self.thread_handle.take() {
1803			Some(handle) => handle.join().unwrap(),
1804			None => Ok(()),
1805		}
1806	}
1807}
1808
1809#[cfg(feature = "std")]
1810impl Drop for BackgroundProcessor {
1811	fn drop(&mut self) {
1812		self.stop_and_join_thread().unwrap();
1813	}
1814}
1815
1816#[cfg(all(feature = "std", test))]
1817mod tests {
1818	use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1819	use bitcoin::constants::{genesis_block, ChainHash};
1820	use bitcoin::hashes::Hash;
1821	use bitcoin::locktime::absolute::LockTime;
1822	use bitcoin::network::Network;
1823	use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1824	use bitcoin::transaction::Version;
1825	use bitcoin::transaction::{Transaction, TxOut};
1826	use bitcoin::{Amount, ScriptBuf, Txid};
1827	use core::sync::atomic::{AtomicBool, Ordering};
1828	use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1829	use lightning::chain::transaction::OutPoint;
1830	use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1831	use lightning::events::{Event, PathFailure, ReplayEvent};
1832	use lightning::ln::channelmanager;
1833	use lightning::ln::channelmanager::{
1834		ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1835	};
1836	use lightning::ln::functional_test_utils::*;
1837	use lightning::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, MessageSendEvent};
1838	use lightning::ln::peer_handler::{
1839		IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1840	};
1841	use lightning::ln::types::ChannelId;
1842	use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1843	use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1844	use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1845	use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1846	use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager, NodeSigner};
1847	use lightning::types::features::{ChannelFeatures, NodeFeatures};
1848	use lightning::types::payment::PaymentHash;
1849	use lightning::util::config::UserConfig;
1850	use lightning::util::persist::{
1851		KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
1852		CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1853		CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1854		NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1855		SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1856		SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1857	};
1858	use lightning::util::ser::Writeable;
1859	use lightning::util::sweep::{
1860		OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1861	};
1862	use lightning::util::test_utils;
1863	use lightning::{get_event, get_event_msg};
1864	use lightning_liquidity::utils::time::DefaultTimeProvider;
1865	use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
1866	use lightning_persister::fs_store::FilesystemStore;
1867	use lightning_rapid_gossip_sync::RapidGossipSync;
1868	use std::collections::VecDeque;
1869	use std::path::PathBuf;
1870	use std::sync::mpsc::SyncSender;
1871	use std::sync::Arc;
1872	use std::time::Duration;
1873	use std::{env, fs};
1874
1875	const EVENT_DEADLINE: Duration =
1876		Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));
1877
1878	#[derive(Clone, Hash, PartialEq, Eq)]
1879	struct TestDescriptor {}
1880	impl SocketDescriptor for TestDescriptor {
1881		fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
1882			0
1883		}
1884
1885		fn disconnect_socket(&mut self) {}
1886	}
1887
1888	#[cfg(c_bindings)]
1889	type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1890	#[cfg(not(c_bindings))]
1891	type LockingWrapper<T> = std::sync::Mutex<T>;
1892
1893	type ChannelManager = channelmanager::ChannelManager<
1894		Arc<ChainMonitor>,
1895		Arc<test_utils::TestBroadcaster>,
1896		Arc<KeysManager>,
1897		Arc<KeysManager>,
1898		Arc<KeysManager>,
1899		Arc<test_utils::TestFeeEstimator>,
1900		Arc<
1901			DefaultRouter<
1902				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1903				Arc<test_utils::TestLogger>,
1904				Arc<KeysManager>,
1905				Arc<LockingWrapper<TestScorer>>,
1906				(),
1907				TestScorer,
1908			>,
1909		>,
1910		Arc<
1911			DefaultMessageRouter<
1912				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1913				Arc<test_utils::TestLogger>,
1914				Arc<KeysManager>,
1915			>,
1916		>,
1917		Arc<test_utils::TestLogger>,
1918	>;
1919
1920	type ChainMonitor = chainmonitor::ChainMonitor<
1921		InMemorySigner,
1922		Arc<test_utils::TestChainSource>,
1923		Arc<test_utils::TestBroadcaster>,
1924		Arc<test_utils::TestFeeEstimator>,
1925		Arc<test_utils::TestLogger>,
1926		Arc<Persister>,
1927		Arc<KeysManager>,
1928	>;
1929
1930	type PGS = Arc<
1931		P2PGossipSync<
1932			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1933			Arc<test_utils::TestChainSource>,
1934			Arc<test_utils::TestLogger>,
1935		>,
1936	>;
1937	type RGS = Arc<
1938		RapidGossipSync<
1939			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1940			Arc<test_utils::TestLogger>,
1941		>,
1942	>;
1943
1944	type OM = OnionMessenger<
1945		Arc<KeysManager>,
1946		Arc<KeysManager>,
1947		Arc<test_utils::TestLogger>,
1948		Arc<ChannelManager>,
1949		Arc<
1950			DefaultMessageRouter<
1951				Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1952				Arc<test_utils::TestLogger>,
1953				Arc<KeysManager>,
1954			>,
1955		>,
1956		IgnoringMessageHandler,
1957		Arc<ChannelManager>,
1958		IgnoringMessageHandler,
1959		IgnoringMessageHandler,
1960	>;
1961
1962	type LM = LiquidityManagerSync<
1963		Arc<KeysManager>,
1964		Arc<KeysManager>,
1965		Arc<ChannelManager>,
1966		Arc<dyn Filter + Sync + Send>,
1967		Arc<Persister>,
1968		DefaultTimeProvider,
1969		Arc<test_utils::TestBroadcaster>,
1970	>;
1971
1972	struct Node {
1973		node: Arc<ChannelManager>,
1974		messenger: Arc<OM>,
1975		p2p_gossip_sync: PGS,
1976		rapid_gossip_sync: RGS,
1977		peer_manager: Arc<
1978			PeerManager<
1979				TestDescriptor,
1980				Arc<test_utils::TestChannelMessageHandler>,
1981				Arc<test_utils::TestRoutingMessageHandler>,
1982				Arc<OM>,
1983				Arc<test_utils::TestLogger>,
1984				IgnoringMessageHandler,
1985				Arc<KeysManager>,
1986				IgnoringMessageHandler,
1987			>,
1988		>,
1989		liquidity_manager: Arc<LM>,
1990		chain_monitor: Arc<ChainMonitor>,
1991		kv_store: Arc<Persister>,
1992		tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1993		network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1994		logger: Arc<test_utils::TestLogger>,
1995		best_block: BestBlock,
1996		scorer: Arc<LockingWrapper<TestScorer>>,
1997		sweeper: Arc<
1998			OutputSweeperSync<
1999				Arc<test_utils::TestBroadcaster>,
2000				Arc<TestWallet>,
2001				Arc<test_utils::TestFeeEstimator>,
2002				Arc<test_utils::TestChainSource>,
2003				Arc<Persister>,
2004				Arc<test_utils::TestLogger>,
2005				Arc<KeysManager>,
2006			>,
2007		>,
2008	}
2009
2010	impl Node {
2011		fn p2p_gossip_sync(
2012			&self,
2013		) -> GossipSync<
2014			PGS,
2015			RGS,
2016			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2017			Arc<test_utils::TestChainSource>,
2018			Arc<test_utils::TestLogger>,
2019		> {
2020			GossipSync::P2P(Arc::clone(&self.p2p_gossip_sync))
2021		}
2022
2023		fn rapid_gossip_sync(
2024			&self,
2025		) -> GossipSync<
2026			PGS,
2027			RGS,
2028			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2029			Arc<test_utils::TestChainSource>,
2030			Arc<test_utils::TestLogger>,
2031		> {
2032			GossipSync::Rapid(Arc::clone(&self.rapid_gossip_sync))
2033		}
2034
2035		fn no_gossip_sync(
2036			&self,
2037		) -> GossipSync<
2038			PGS,
2039			RGS,
2040			Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2041			Arc<test_utils::TestChainSource>,
2042			Arc<test_utils::TestLogger>,
2043		> {
2044			GossipSync::None
2045		}
2046	}
2047
2048	impl Drop for Node {
2049		fn drop(&mut self) {
2050			let data_dir = self.kv_store.get_data_dir();
2051			match fs::remove_dir_all(data_dir.clone()) {
2052				Err(e) => {
2053					println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
2054				},
2055				_ => {},
2056			}
2057		}
2058	}
2059
2060	struct Persister {
2061		graph_error: Option<(std::io::ErrorKind, &'static str)>,
2062		graph_persistence_notifier: Option<SyncSender<()>>,
2063		manager_error: Option<(std::io::ErrorKind, &'static str)>,
2064		scorer_error: Option<(std::io::ErrorKind, &'static str)>,
2065		kv_store: FilesystemStore,
2066	}
2067
2068	impl Persister {
2069		fn new(data_dir: PathBuf) -> Self {
2070			let kv_store = FilesystemStore::new(data_dir);
2071			Self {
2072				graph_error: None,
2073				graph_persistence_notifier: None,
2074				manager_error: None,
2075				scorer_error: None,
2076				kv_store,
2077			}
2078		}
2079
2080		fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2081			Self { graph_error: Some((error, message)), ..self }
2082		}
2083
2084		fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
2085			Self { graph_persistence_notifier: Some(sender), ..self }
2086		}
2087
2088		fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2089			Self { manager_error: Some((error, message)), ..self }
2090		}
2091
2092		fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2093			Self { scorer_error: Some((error, message)), ..self }
2094		}
2095
2096		pub fn get_data_dir(&self) -> PathBuf {
2097			self.kv_store.get_data_dir()
2098		}
2099	}
2100
2101	impl KVStoreSync for Persister {
2102		fn read(
2103			&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2104		) -> lightning::io::Result<Vec<u8>> {
2105			self.kv_store.read(primary_namespace, secondary_namespace, key)
2106		}
2107
2108		fn write(
2109			&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
2110		) -> lightning::io::Result<()> {
2111			if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
2112				&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
2113				&& key == CHANNEL_MANAGER_PERSISTENCE_KEY
2114			{
2115				if let Some((error, message)) = self.manager_error {
2116					return Err(std::io::Error::new(error, message).into());
2117				}
2118			}
2119
2120			if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
2121				&& secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
2122				&& key == NETWORK_GRAPH_PERSISTENCE_KEY
2123			{
2124				if let Some(sender) = &self.graph_persistence_notifier {
2125					match sender.send(()) {
2126						Ok(()) => {},
2127						Err(std::sync::mpsc::SendError(())) => {
2128							println!("Persister failed to notify as receiver went away.")
2129						},
2130					}
2131				};
2132
2133				if let Some((error, message)) = self.graph_error {
2134					return Err(std::io::Error::new(error, message).into());
2135				}
2136			}
2137
2138			if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
2139				&& secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
2140				&& key == SCORER_PERSISTENCE_KEY
2141			{
2142				if let Some((error, message)) = self.scorer_error {
2143					return Err(std::io::Error::new(error, message).into());
2144				}
2145			}
2146
2147			self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
2148		}
2149
2150		fn remove(
2151			&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2152		) -> lightning::io::Result<()> {
2153			self.kv_store.remove(primary_namespace, secondary_namespace, key)
2154		}
2155
2156		fn list(
2157			&self, primary_namespace: &str, secondary_namespace: &str,
2158		) -> lightning::io::Result<Vec<String>> {
2159			self.kv_store.list(primary_namespace, secondary_namespace)
2160		}
2161	}
2162
2163	struct TestScorer {
2164		event_expectations: Option<VecDeque<TestResult>>,
2165	}
2166
2167	#[derive(Debug)]
2168	enum TestResult {
2169		PaymentFailure { path: Path, short_channel_id: u64 },
2170		PaymentSuccess { path: Path },
2171		ProbeFailure { path: Path },
2172		ProbeSuccess { path: Path },
2173	}
2174
2175	impl TestScorer {
2176		fn new() -> Self {
2177			Self { event_expectations: None }
2178		}
2179
2180		fn expect(&mut self, expectation: TestResult) {
2181			self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
2182		}
2183	}
2184
2185	impl lightning::util::ser::Writeable for TestScorer {
2186		fn write<W: lightning::util::ser::Writer>(
2187			&self, _: &mut W,
2188		) -> Result<(), lightning::io::Error> {
2189			Ok(())
2190		}
2191	}
2192
2193	impl ScoreLookUp for TestScorer {
2194		type ScoreParams = ();
2195		fn channel_penalty_msat(
2196			&self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
2197			_score_params: &Self::ScoreParams,
2198		) -> u64 {
2199			unimplemented!();
2200		}
2201	}
2202
2203	impl ScoreUpdate for TestScorer {
2204		fn payment_path_failed(
2205			&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
2206		) {
2207			if let Some(expectations) = &mut self.event_expectations {
2208				match expectations.pop_front().unwrap() {
2209					TestResult::PaymentFailure { path, short_channel_id } => {
2210						assert_eq!(actual_path, &path);
2211						assert_eq!(actual_short_channel_id, short_channel_id);
2212					},
2213					TestResult::PaymentSuccess { path } => {
2214						panic!("Unexpected successful payment path: {:?}", path)
2215					},
2216					TestResult::ProbeFailure { path } => {
2217						panic!("Unexpected probe failure: {:?}", path)
2218					},
2219					TestResult::ProbeSuccess { path } => {
2220						panic!("Unexpected probe success: {:?}", path)
2221					},
2222				}
2223			}
2224		}
2225
2226		fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
2227			if let Some(expectations) = &mut self.event_expectations {
2228				match expectations.pop_front().unwrap() {
2229					TestResult::PaymentFailure { path, .. } => {
2230						panic!("Unexpected payment path failure: {:?}", path)
2231					},
2232					TestResult::PaymentSuccess { path } => {
2233						assert_eq!(actual_path, &path);
2234					},
2235					TestResult::ProbeFailure { path } => {
2236						panic!("Unexpected probe failure: {:?}", path)
2237					},
2238					TestResult::ProbeSuccess { path } => {
2239						panic!("Unexpected probe success: {:?}", path)
2240					},
2241				}
2242			}
2243		}
2244
2245		fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
2246			if let Some(expectations) = &mut self.event_expectations {
2247				match expectations.pop_front().unwrap() {
2248					TestResult::PaymentFailure { path, .. } => {
2249						panic!("Unexpected payment path failure: {:?}", path)
2250					},
2251					TestResult::PaymentSuccess { path } => {
2252						panic!("Unexpected payment path success: {:?}", path)
2253					},
2254					TestResult::ProbeFailure { path } => {
2255						assert_eq!(actual_path, &path);
2256					},
2257					TestResult::ProbeSuccess { path } => {
2258						panic!("Unexpected probe success: {:?}", path)
2259					},
2260				}
2261			}
2262		}
2263		fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
2264			if let Some(expectations) = &mut self.event_expectations {
2265				match expectations.pop_front().unwrap() {
2266					TestResult::PaymentFailure { path, .. } => {
2267						panic!("Unexpected payment path failure: {:?}", path)
2268					},
2269					TestResult::PaymentSuccess { path } => {
2270						panic!("Unexpected payment path success: {:?}", path)
2271					},
2272					TestResult::ProbeFailure { path } => {
2273						panic!("Unexpected probe failure: {:?}", path)
2274					},
2275					TestResult::ProbeSuccess { path } => {
2276						assert_eq!(actual_path, &path);
2277					},
2278				}
2279			}
2280		}
2281		fn time_passed(&mut self, _: Duration) {}
2282	}
2283
2284	#[cfg(c_bindings)]
2285	impl lightning::routing::scoring::Score for TestScorer {}
2286
2287	impl Drop for TestScorer {
2288		fn drop(&mut self) {
2289			if std::thread::panicking() {
2290				return;
2291			}
2292
2293			if let Some(event_expectations) = &self.event_expectations {
2294				if !event_expectations.is_empty() {
2295					panic!("Unsatisfied event expectations: {:?}", event_expectations);
2296				}
2297			}
2298		}
2299	}
2300
2301	struct TestWallet {}
2302
2303	impl ChangeDestinationSourceSync for TestWallet {
2304		fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
2305			Ok(ScriptBuf::new())
2306		}
2307	}
2308
2309	fn get_full_filepath(filepath: String, filename: String) -> String {
2310		let mut path = PathBuf::from(filepath);
2311		path.push(filename);
2312		path.to_str().unwrap().to_string()
2313	}
2314
2315	fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
2316		let persist_temp_path = env::temp_dir().join(persist_dir);
2317		let persist_dir = persist_temp_path.to_string_lossy().to_string();
2318		let network = Network::Bitcoin;
2319		let mut nodes = Vec::new();
2320		for i in 0..num_nodes {
2321			let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
2322			let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
2323			let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
2324			let genesis_block = genesis_block(network);
2325			let network_graph = Arc::new(NetworkGraph::new(network, Arc::clone(&logger)));
2326			let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
2327			let now = Duration::from_secs(genesis_block.header.time as u64);
2328			let seed = [i as u8; 32];
2329			let keys_manager =
2330				Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2331			let router = Arc::new(DefaultRouter::new(
2332				Arc::clone(&network_graph),
2333				Arc::clone(&logger),
2334				Arc::clone(&keys_manager),
2335				Arc::clone(&scorer),
2336				Default::default(),
2337			));
2338			let msg_router = Arc::new(DefaultMessageRouter::new(
2339				Arc::clone(&network_graph),
2340				Arc::clone(&keys_manager),
2341			));
2342			let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
2343			let kv_store =
2344				Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
2345			let now = Duration::from_secs(genesis_block.header.time as u64);
2346			let keys_manager =
2347				Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2348			let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2349				Some(Arc::clone(&chain_source)),
2350				Arc::clone(&tx_broadcaster),
2351				Arc::clone(&logger),
2352				Arc::clone(&fee_estimator),
2353				Arc::clone(&kv_store),
2354				Arc::clone(&keys_manager),
2355				keys_manager.get_peer_storage_key(),
2356			));
2357			let best_block = BestBlock::from_network(network);
2358			let params = ChainParameters { network, best_block };
2359			let manager = Arc::new(ChannelManager::new(
2360				Arc::clone(&fee_estimator),
2361				Arc::clone(&chain_monitor),
2362				Arc::clone(&tx_broadcaster),
2363				Arc::clone(&router),
2364				Arc::clone(&msg_router),
2365				Arc::clone(&logger),
2366				Arc::clone(&keys_manager),
2367				Arc::clone(&keys_manager),
2368				Arc::clone(&keys_manager),
2369				UserConfig::default(),
2370				params,
2371				genesis_block.header.time,
2372			));
2373			let messenger = Arc::new(OnionMessenger::new(
2374				Arc::clone(&keys_manager),
2375				Arc::clone(&keys_manager),
2376				Arc::clone(&logger),
2377				Arc::clone(&manager),
2378				Arc::clone(&msg_router),
2379				IgnoringMessageHandler {},
2380				Arc::clone(&manager),
2381				IgnoringMessageHandler {},
2382				IgnoringMessageHandler {},
2383			));
2384			let wallet = Arc::new(TestWallet {});
2385			let sweeper = Arc::new(OutputSweeperSync::new(
2386				best_block,
2387				Arc::clone(&tx_broadcaster),
2388				Arc::clone(&fee_estimator),
2389				None::<Arc<test_utils::TestChainSource>>,
2390				Arc::clone(&keys_manager),
2391				wallet,
2392				Arc::clone(&kv_store),
2393				Arc::clone(&logger),
2394			));
2395			let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
2396				Arc::clone(&network_graph),
2397				Some(Arc::clone(&chain_source)),
2398				Arc::clone(&logger),
2399			));
2400			let rapid_gossip_sync =
2401				Arc::new(RapidGossipSync::new(Arc::clone(&network_graph), Arc::clone(&logger)));
2402			let msg_handler = MessageHandler {
2403				chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
2404					ChainHash::using_genesis_block(Network::Testnet),
2405				)),
2406				route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
2407				onion_message_handler: Arc::clone(&messenger),
2408				custom_message_handler: IgnoringMessageHandler {},
2409				send_only_message_handler: IgnoringMessageHandler {},
2410			};
2411			let peer_manager = Arc::new(PeerManager::new(
2412				msg_handler,
2413				0,
2414				&seed,
2415				Arc::clone(&logger),
2416				Arc::clone(&keys_manager),
2417			));
2418			let liquidity_manager = Arc::new(
2419				LiquidityManagerSync::new(
2420					Arc::clone(&keys_manager),
2421					Arc::clone(&keys_manager),
2422					Arc::clone(&manager),
2423					None,
2424					None,
2425					Arc::clone(&kv_store),
2426					Arc::clone(&tx_broadcaster),
2427					None,
2428					None,
2429				)
2430				.unwrap(),
2431			);
2432			let node = Node {
2433				node: manager,
2434				p2p_gossip_sync,
2435				rapid_gossip_sync,
2436				peer_manager,
2437				liquidity_manager,
2438				chain_monitor,
2439				kv_store,
2440				tx_broadcaster,
2441				network_graph,
2442				logger,
2443				best_block,
2444				scorer,
2445				sweeper,
2446				messenger,
2447			};
2448			nodes.push(node);
2449		}
2450
2451		for i in 0..num_nodes {
2452			for j in (i + 1)..num_nodes {
2453				let init_i = Init {
2454					features: nodes[j].node.init_features(),
2455					networks: None,
2456					remote_network_address: None,
2457				};
2458				nodes[i]
2459					.node
2460					.peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
2461					.unwrap();
2462				let init_j = Init {
2463					features: nodes[i].node.init_features(),
2464					networks: None,
2465					remote_network_address: None,
2466				};
2467				nodes[j]
2468					.node
2469					.peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
2470					.unwrap();
2471			}
2472		}
2473
2474		(persist_dir, nodes)
2475	}
2476
2477	macro_rules! open_channel {
2478		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2479			begin_open_channel!($node_a, $node_b, $channel_value);
2480			let events = $node_a.node.get_and_clear_pending_events();
2481			assert_eq!(events.len(), 1);
2482			let (temporary_channel_id, tx) =
2483				handle_funding_generation_ready!(events[0], $channel_value);
2484			$node_a
2485				.node
2486				.funding_transaction_generated(
2487					temporary_channel_id,
2488					$node_b.node.get_our_node_id(),
2489					tx.clone(),
2490				)
2491				.unwrap();
2492			let msg_a = get_event_msg!(
2493				$node_a,
2494				MessageSendEvent::SendFundingCreated,
2495				$node_b.node.get_our_node_id()
2496			);
2497			$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2498			get_event!($node_b, Event::ChannelPending);
2499			let msg_b = get_event_msg!(
2500				$node_b,
2501				MessageSendEvent::SendFundingSigned,
2502				$node_a.node.get_our_node_id()
2503			);
2504			$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2505			get_event!($node_a, Event::ChannelPending);
2506			tx
2507		}};
2508	}
2509
2510	macro_rules! begin_open_channel {
2511		($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2512			$node_a
2513				.node
2514				.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
2515				.unwrap();
2516			let msg_a = get_event_msg!(
2517				$node_a,
2518				MessageSendEvent::SendOpenChannel,
2519				$node_b.node.get_our_node_id()
2520			);
2521			$node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
2522			let msg_b = get_event_msg!(
2523				$node_b,
2524				MessageSendEvent::SendAcceptChannel,
2525				$node_a.node.get_our_node_id()
2526			);
2527			$node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
2528		}};
2529	}
2530
2531	macro_rules! handle_funding_generation_ready {
2532		($event: expr, $channel_value: expr) => {{
2533			match $event {
2534				Event::FundingGenerationReady {
2535					temporary_channel_id,
2536					channel_value_satoshis,
2537					ref output_script,
2538					user_channel_id,
2539					..
2540				} => {
2541					assert_eq!(channel_value_satoshis, $channel_value);
2542					assert_eq!(user_channel_id, 42);
2543
2544					let tx = Transaction {
2545						version: Version::ONE,
2546						lock_time: LockTime::ZERO,
2547						input: Vec::new(),
2548						output: vec![TxOut {
2549							value: Amount::from_sat(channel_value_satoshis),
2550							script_pubkey: output_script.clone(),
2551						}],
2552					};
2553					(temporary_channel_id, tx)
2554				},
2555				_ => panic!("Unexpected event"),
2556			}
2557		}};
2558	}
2559
2560	fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
2561		for i in 1..=depth {
2562			let prev_blockhash = node.best_block.block_hash;
2563			let height = node.best_block.height + 1;
2564			let header = create_dummy_header(prev_blockhash, height);
2565			let txdata = vec![(0, tx)];
2566			node.best_block = BestBlock::new(header.block_hash(), height);
2567			match i {
2568				1 => {
2569					node.node.transactions_confirmed(&header, &txdata, height);
2570					node.chain_monitor.transactions_confirmed(&header, &txdata, height);
2571					node.sweeper.transactions_confirmed(&header, &txdata, height);
2572				},
2573				x if x == depth => {
2574					// We need the TestBroadcaster to know about the new height so that it doesn't think
2575					// we're violating the time lock requirements of transactions broadcasted at that
2576					// point.
2577					let block = (genesis_block(Network::Bitcoin), height);
2578					node.tx_broadcaster.blocks.lock().unwrap().push(block);
2579					node.node.best_block_updated(&header, height);
2580					node.chain_monitor.best_block_updated(&header, height);
2581					node.sweeper.best_block_updated(&header, height);
2582				},
2583				_ => {},
2584			}
2585		}
2586	}
2587
2588	fn advance_chain(node: &mut Node, num_blocks: u32) {
2589		for i in 1..=num_blocks {
2590			let prev_blockhash = node.best_block.block_hash;
2591			let height = node.best_block.height + 1;
2592			let header = create_dummy_header(prev_blockhash, height);
2593			node.best_block = BestBlock::new(header.block_hash(), height);
2594			if i == num_blocks {
2595				// We need the TestBroadcaster to know about the new height so that it doesn't think
2596				// we're violating the time lock requirements of transactions broadcasted at that
2597				// point.
2598				let block = (genesis_block(Network::Bitcoin), height);
2599				node.tx_broadcaster.blocks.lock().unwrap().push(block);
2600				node.node.best_block_updated(&header, height);
2601				node.chain_monitor.best_block_updated(&header, height);
2602				node.sweeper.best_block_updated(&header, height);
2603			}
2604		}
2605	}
2606
2607	fn confirm_transaction(node: &mut Node, tx: &Transaction) {
2608		confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
2609	}
2610
2611	#[test]
2612	fn test_background_processor() {
2613		// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
2614		// updates. Also test that when new updates are available, the manager signals that it needs
2615		// re-persistence and is successfully re-persisted.
2616		let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
2617
2618		// Go through the channel creation process so that each node has something to persist. Since
2619		// open_channel consumes events, it must complete before starting BackgroundProcessor to
2620		// avoid a race with processing events.
2621		let tx = open_channel!(nodes[0], nodes[1], 100000);
2622
2623		// Initiate the background processors to watch each node.
2624		let data_dir = nodes[0].kv_store.get_data_dir();
2625		let persister = Arc::new(Persister::new(data_dir));
2626		let event_handler = |_: _| Ok(());
2627		let bg_processor = BackgroundProcessor::start(
2628			persister,
2629			event_handler,
2630			Arc::clone(&nodes[0].chain_monitor),
2631			Arc::clone(&nodes[0].node),
2632			Some(Arc::clone(&nodes[0].messenger)),
2633			nodes[0].p2p_gossip_sync(),
2634			Arc::clone(&nodes[0].peer_manager),
2635			Some(Arc::clone(&nodes[0].liquidity_manager)),
2636			Some(Arc::clone(&nodes[0].sweeper)),
2637			Arc::clone(&nodes[0].logger),
2638			Some(Arc::clone(&nodes[0].scorer)),
2639		);
2640
2641		macro_rules! check_persisted_data {
2642			($node: expr, $filepath: expr) => {
2643				let mut expected_bytes = Vec::new();
2644				loop {
2645					expected_bytes.clear();
2646					match $node.write(&mut expected_bytes) {
2647						Ok(()) => match std::fs::read($filepath) {
2648							Ok(bytes) => {
2649								if bytes == expected_bytes {
2650									break;
2651								} else {
2652									continue;
2653								}
2654							},
2655							Err(_) => continue,
2656						},
2657						Err(e) => panic!("Unexpected error: {}", e),
2658					}
2659				}
2660			};
2661		}
2662
2663		// Check that the initial channel manager data is persisted as expected.
2664		let filepath =
2665			get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2666		check_persisted_data!(nodes[0].node, filepath.clone());
2667
2668		loop {
2669			if !nodes[0].node.get_event_or_persist_condvar_value() {
2670				break;
2671			}
2672		}
2673
2674		// Force-close the channel.
2675		let error_message = "Channel force-closed";
2676		nodes[0]
2677			.node
2678			.force_close_broadcasting_latest_txn(
2679				&ChannelId::v1_from_funding_outpoint(OutPoint {
2680					txid: tx.compute_txid(),
2681					index: 0,
2682				}),
2683				&nodes[1].node.get_our_node_id(),
2684				error_message.to_string(),
2685			)
2686			.unwrap();
2687
2688		// Check that the force-close updates are persisted.
2689		check_persisted_data!(nodes[0].node, filepath.clone());
2690		loop {
2691			if !nodes[0].node.get_event_or_persist_condvar_value() {
2692				break;
2693			}
2694		}
2695
2696		// Check network graph is persisted
2697		let filepath =
2698			get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2699		check_persisted_data!(nodes[0].network_graph, filepath.clone());
2700
2701		// Check scorer is persisted
2702		let filepath =
2703			get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2704		check_persisted_data!(nodes[0].scorer, filepath.clone());
2705
2706		if !std::thread::panicking() {
2707			bg_processor.stop().unwrap();
2708		}
2709	}
2710
2711	#[test]
2712	fn test_timer_tick_called() {
2713		// Test that:
2714		// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
2715		// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
2716		// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
2717		// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
2718		let (_, nodes) = create_nodes(1, "test_timer_tick_called");
2719		let data_dir = nodes[0].kv_store.get_data_dir();
2720		let persister = Arc::new(Persister::new(data_dir));
2721		let event_handler = |_: _| Ok(());
2722		let bg_processor = BackgroundProcessor::start(
2723			persister,
2724			event_handler,
2725			Arc::clone(&nodes[0].chain_monitor),
2726			Arc::clone(&nodes[0].node),
2727			Some(Arc::clone(&nodes[0].messenger)),
2728			nodes[0].no_gossip_sync(),
2729			Arc::clone(&nodes[0].peer_manager),
2730			Some(Arc::clone(&nodes[0].liquidity_manager)),
2731			Some(Arc::clone(&nodes[0].sweeper)),
2732			Arc::clone(&nodes[0].logger),
2733			Some(Arc::clone(&nodes[0].scorer)),
2734		);
2735		loop {
2736			let log_entries = nodes[0].logger.lines.lock().unwrap();
2737			let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
2738			let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
2739			let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
2740			let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
2741			if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
2742				&& log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
2743				&& log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
2744				&& log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
2745			{
2746				break;
2747			}
2748		}
2749
2750		if !std::thread::panicking() {
2751			bg_processor.stop().unwrap();
2752		}
2753	}
2754
2755	#[test]
2756	fn test_channel_manager_persist_error() {
2757		// Test that if we encounter an error during manager persistence, the thread panics.
2758		let (_, nodes) = create_nodes(2, "test_persist_error");
2759		open_channel!(nodes[0], nodes[1], 100000);
2760
2761		let data_dir = nodes[0].kv_store.get_data_dir();
2762		let persister = Arc::new(
2763			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2764		);
2765		let event_handler = |_: _| Ok(());
2766		let bg_processor = BackgroundProcessor::start(
2767			persister,
2768			event_handler,
2769			Arc::clone(&nodes[0].chain_monitor),
2770			Arc::clone(&nodes[0].node),
2771			Some(Arc::clone(&nodes[0].messenger)),
2772			nodes[0].no_gossip_sync(),
2773			Arc::clone(&nodes[0].peer_manager),
2774			Some(Arc::clone(&nodes[0].liquidity_manager)),
2775			Some(Arc::clone(&nodes[0].sweeper)),
2776			Arc::clone(&nodes[0].logger),
2777			Some(Arc::clone(&nodes[0].scorer)),
2778		);
2779		match bg_processor.join() {
2780			Ok(_) => panic!("Expected error persisting manager"),
2781			Err(e) => {
2782				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2783				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2784			},
2785		}
2786	}
2787
2788	#[tokio::test]
2789	async fn test_channel_manager_persist_error_async() {
2790		// Test that if we encounter an error during manager persistence, the thread panics.
2791		let (_, nodes) = create_nodes(2, "test_persist_error_sync");
2792		open_channel!(nodes[0], nodes[1], 100000);
2793
2794		let data_dir = nodes[0].kv_store.get_data_dir();
2795		let kv_store_sync = Arc::new(
2796			Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2797		);
2798		let kv_store = KVStoreSyncWrapper(kv_store_sync);
2799
2800		// Yes, you can unsafe { turn off the borrow checker }
2801		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
2802			&*(nodes[0].liquidity_manager.get_lm_async()
2803				as *const LiquidityManager<_, _, _, _, _, _, _>)
2804				as &'static LiquidityManager<_, _, _, _, _, _, _>
2805		};
2806		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2807			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2808				as &'static OutputSweeper<_, _, _, _, _, _, _>
2809		};
2810
2811		let bp_future = super::process_events_async(
2812			kv_store,
2813			|_: _| async { Ok(()) },
2814			Arc::clone(&nodes[0].chain_monitor),
2815			Arc::clone(&nodes[0].node),
2816			Some(Arc::clone(&nodes[0].messenger)),
2817			nodes[0].rapid_gossip_sync(),
2818			Arc::clone(&nodes[0].peer_manager),
2819			Some(lm_async),
2820			Some(sweeper_async),
2821			Arc::clone(&nodes[0].logger),
2822			Some(Arc::clone(&nodes[0].scorer)),
2823			move |dur: Duration| {
2824				Box::pin(async move {
2825					tokio::time::sleep(dur).await;
2826					false // Never exit
2827				})
2828			},
2829			false,
2830			|| Some(Duration::ZERO),
2831		);
2832		match bp_future.await {
2833			Ok(_) => panic!("Expected error persisting manager"),
2834			Err(e) => {
2835				assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2836				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2837			},
2838		}
2839	}
2840
2841	#[test]
2842	fn test_network_graph_persist_error() {
2843		// Test that if we encounter an error during network graph persistence, an error gets returned.
2844		let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2845		let data_dir = nodes[0].kv_store.get_data_dir();
2846		let persister =
2847			Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2848		let event_handler = |_: _| Ok(());
2849		let bg_processor = BackgroundProcessor::start(
2850			persister,
2851			event_handler,
2852			Arc::clone(&nodes[0].chain_monitor),
2853			Arc::clone(&nodes[0].node),
2854			Some(Arc::clone(&nodes[0].messenger)),
2855			nodes[0].p2p_gossip_sync(),
2856			Arc::clone(&nodes[0].peer_manager),
2857			Some(Arc::clone(&nodes[0].liquidity_manager)),
2858			Some(Arc::clone(&nodes[0].sweeper)),
2859			Arc::clone(&nodes[0].logger),
2860			Some(Arc::clone(&nodes[0].scorer)),
2861		);
2862
2863		match bg_processor.stop() {
2864			Ok(_) => panic!("Expected error persisting network graph"),
2865			Err(e) => {
2866				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2867				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2868			},
2869		}
2870	}
2871
2872	#[test]
2873	fn test_scorer_persist_error() {
2874		// Test that if we encounter an error during scorer persistence, an error gets returned.
2875		let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2876		let data_dir = nodes[0].kv_store.get_data_dir();
2877		let persister =
2878			Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2879		let event_handler = |_: _| Ok(());
2880		let bg_processor = BackgroundProcessor::start(
2881			persister,
2882			event_handler,
2883			Arc::clone(&nodes[0].chain_monitor),
2884			Arc::clone(&nodes[0].node),
2885			Some(Arc::clone(&nodes[0].messenger)),
2886			nodes[0].no_gossip_sync(),
2887			Arc::clone(&nodes[0].peer_manager),
2888			Some(Arc::clone(&nodes[0].liquidity_manager)),
2889			Some(Arc::clone(&nodes[0].sweeper)),
2890			Arc::clone(&nodes[0].logger),
2891			Some(Arc::clone(&nodes[0].scorer)),
2892		);
2893
2894		match bg_processor.stop() {
2895			Ok(_) => panic!("Expected error persisting scorer"),
2896			Err(e) => {
2897				assert_eq!(e.kind(), std::io::ErrorKind::Other);
2898				assert_eq!(e.get_ref().unwrap().to_string(), "test");
2899			},
2900		}
2901	}
2902
2903	#[test]
2904	fn test_background_event_handling() {
2905		let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2906		let node_0_id = nodes[0].node.get_our_node_id();
2907		let node_1_id = nodes[1].node.get_our_node_id();
2908
2909		let channel_value = 100000;
2910		let data_dir = nodes[0].kv_store.get_data_dir();
2911		let persister = Arc::new(Persister::new(data_dir.clone()));
2912
2913		// Set up a background event handler for FundingGenerationReady events.
2914		let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2915		let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2916		let event_handler = move |event: Event| {
2917			match event {
2918				Event::FundingGenerationReady { .. } => funding_generation_send
2919					.send(handle_funding_generation_ready!(event, channel_value))
2920					.unwrap(),
2921				Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2922				Event::ChannelReady { .. } => {},
2923				_ => panic!("Unexpected event: {:?}", event),
2924			}
2925			Ok(())
2926		};
2927
2928		let bg_processor = BackgroundProcessor::start(
2929			persister,
2930			event_handler,
2931			Arc::clone(&nodes[0].chain_monitor),
2932			Arc::clone(&nodes[0].node),
2933			Some(Arc::clone(&nodes[0].messenger)),
2934			nodes[0].no_gossip_sync(),
2935			Arc::clone(&nodes[0].peer_manager),
2936			Some(Arc::clone(&nodes[0].liquidity_manager)),
2937			Some(Arc::clone(&nodes[0].sweeper)),
2938			Arc::clone(&nodes[0].logger),
2939			Some(Arc::clone(&nodes[0].scorer)),
2940		);
2941
2942		// Open a channel and check that the FundingGenerationReady event was handled.
2943		begin_open_channel!(nodes[0], nodes[1], channel_value);
2944		let (temporary_channel_id, funding_tx) = funding_generation_recv
2945			.recv_timeout(EVENT_DEADLINE)
2946			.expect("FundingGenerationReady not handled within deadline");
2947		nodes[0]
2948			.node
2949			.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2950			.unwrap();
2951		let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2952		nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2953		get_event!(nodes[1], Event::ChannelPending);
2954		let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2955		nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2956		channel_pending_recv
2957			.recv_timeout(EVENT_DEADLINE)
2958			.expect("ChannelPending not handled within deadline");
2959
2960		// Confirm the funding transaction.
2961		confirm_transaction(&mut nodes[0], &funding_tx);
2962		let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2963		confirm_transaction(&mut nodes[1], &funding_tx);
2964		let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2965		nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2966		let _as_channel_update =
2967			get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2968		nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2969		let _bs_channel_update =
2970			get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2971		let broadcast_funding =
2972			nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2973		assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2974		assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2975
2976		if !std::thread::panicking() {
2977			bg_processor.stop().unwrap();
2978		}
2979
2980		// Set up a background event handler for SpendableOutputs events.
2981		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2982		let event_handler = move |event: Event| {
2983			match event {
2984				Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2985				Event::ChannelReady { .. } => {},
2986				Event::ChannelClosed { .. } => {},
2987				_ => panic!("Unexpected event: {:?}", event),
2988			}
2989			Ok(())
2990		};
2991		let persister = Arc::new(Persister::new(data_dir));
2992		let bg_processor = BackgroundProcessor::start(
2993			persister,
2994			event_handler,
2995			Arc::clone(&nodes[0].chain_monitor),
2996			Arc::clone(&nodes[0].node),
2997			Some(Arc::clone(&nodes[0].messenger)),
2998			nodes[0].no_gossip_sync(),
2999			Arc::clone(&nodes[0].peer_manager),
3000			Some(Arc::clone(&nodes[0].liquidity_manager)),
3001			Some(Arc::clone(&nodes[0].sweeper)),
3002			Arc::clone(&nodes[0].logger),
3003			Some(Arc::clone(&nodes[0].scorer)),
3004		);
3005
3006		// Force close the channel and check that the SpendableOutputs event was handled.
3007		let error_message = "Channel force-closed";
3008		nodes[0]
3009			.node
3010			.force_close_broadcasting_latest_txn(
3011				&nodes[0].node.list_channels()[0].channel_id,
3012				&node_1_id,
3013				error_message.to_string(),
3014			)
3015			.unwrap();
3016		let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
3017		confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
3018
3019		let event =
3020			receiver.recv_timeout(EVENT_DEADLINE).expect("Events not handled within deadline");
3021		match event {
3022			Event::SpendableOutputs { outputs, channel_id } => {
3023				nodes[0]
3024					.sweeper
3025					.track_spendable_outputs(outputs, channel_id, false, Some(153))
3026					.unwrap();
3027			},
3028			_ => panic!("Unexpected event: {:?}", event),
3029		}
3030
3031		// Check we don't generate an initial sweeping tx until we reach the required height.
3032		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3033		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3034		if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
3035			assert!(!tracked_output.is_spent_in(&sweep_tx_0));
3036			match tracked_output.status {
3037				OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
3038					assert_eq!(delayed_until_height, Some(153));
3039				},
3040				_ => panic!("Unexpected status"),
3041			}
3042		}
3043
3044		advance_chain(&mut nodes[0], 3);
3045
3046		let tx_broadcaster = Arc::clone(&nodes[0].tx_broadcaster);
3047		let wait_for_sweep_tx = || -> Transaction {
3048			loop {
3049				let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
3050				if let Some(sweep_tx) = sweep_tx {
3051					return sweep_tx;
3052				}
3053
3054				std::thread::sleep(Duration::from_millis(10));
3055			}
3056		};
3057
3058		// Check we generate an initial sweeping tx.
3059		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3060		let sweep_tx_0 = wait_for_sweep_tx();
3061		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3062		match tracked_output.status {
3063			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3064				assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
3065			},
3066			_ => panic!("Unexpected status"),
3067		}
3068
3069		// Check we regenerate and rebroadcast the sweeping tx each block.
3070		advance_chain(&mut nodes[0], 1);
3071		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3072		let sweep_tx_1 = wait_for_sweep_tx();
3073		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3074		match tracked_output.status {
3075			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3076				assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
3077			},
3078			_ => panic!("Unexpected status"),
3079		}
3080		assert_ne!(sweep_tx_0, sweep_tx_1);
3081
3082		advance_chain(&mut nodes[0], 1);
3083		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3084		let sweep_tx_2 = wait_for_sweep_tx();
3085		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3086		match tracked_output.status {
3087			OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3088				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3089			},
3090			_ => panic!("Unexpected status"),
3091		}
3092		assert_ne!(sweep_tx_0, sweep_tx_2);
3093		assert_ne!(sweep_tx_1, sweep_tx_2);
3094
3095		// Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
3096		confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
3097		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3098		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3099		match tracked_output.status {
3100			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3101				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3102			},
3103			_ => panic!("Unexpected status"),
3104		}
3105
3106		// Check we still see the transaction as confirmed if we unconfirm any untracked
3107		// transaction. (We previously had a bug that would mark tracked transactions as
3108		// unconfirmed if any transaction at an unknown block height would be unconfirmed.)
3109		let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
3110		nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
3111
3112		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3113		let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3114		match tracked_output.status {
3115			OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3116				assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3117			},
3118			_ => panic!("Unexpected status"),
3119		}
3120
3121		// Check we stop tracking the spendable outputs when one of the txs reaches
3122		// PRUNE_DELAY_BLOCKS confirmations.
3123		confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, PRUNE_DELAY_BLOCKS);
3124		assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
3125
3126		if !std::thread::panicking() {
3127			bg_processor.stop().unwrap();
3128		}
3129	}
3130
3131	#[test]
3132	fn test_event_handling_failures_are_replayed() {
3133		let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
3134		let channel_value = 100000;
3135		let data_dir = nodes[0].kv_store.get_data_dir();
3136		let persister = Arc::new(Persister::new(data_dir.clone()));
3137
3138		let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
3139		let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
3140		let should_fail_event_handling = Arc::new(AtomicBool::new(true));
3141		let event_handler = move |event: Event| {
3142			if let Ok(true) = should_fail_event_handling.compare_exchange(
3143				true,
3144				false,
3145				Ordering::Acquire,
3146				Ordering::Relaxed,
3147			) {
3148				first_event_send.send(event).unwrap();
3149				return Err(ReplayEvent());
3150			}
3151
3152			second_event_send.send(event).unwrap();
3153			Ok(())
3154		};
3155
3156		let bg_processor = BackgroundProcessor::start(
3157			persister,
3158			event_handler,
3159			Arc::clone(&nodes[0].chain_monitor),
3160			Arc::clone(&nodes[0].node),
3161			Some(Arc::clone(&nodes[0].messenger)),
3162			nodes[0].no_gossip_sync(),
3163			Arc::clone(&nodes[0].peer_manager),
3164			Some(Arc::clone(&nodes[0].liquidity_manager)),
3165			Some(Arc::clone(&nodes[0].sweeper)),
3166			Arc::clone(&nodes[0].logger),
3167			Some(Arc::clone(&nodes[0].scorer)),
3168		);
3169
3170		begin_open_channel!(nodes[0], nodes[1], channel_value);
3171		assert_eq!(
3172			first_event_recv.recv_timeout(EVENT_DEADLINE).unwrap(),
3173			second_event_recv.recv_timeout(EVENT_DEADLINE).unwrap()
3174		);
3175
3176		if !std::thread::panicking() {
3177			bg_processor.stop().unwrap();
3178		}
3179	}
3180
3181	#[test]
3182	fn test_scorer_persistence() {
3183		let (_, nodes) = create_nodes(2, "test_scorer_persistence");
3184		let data_dir = nodes[0].kv_store.get_data_dir();
3185		let persister = Arc::new(Persister::new(data_dir));
3186		let event_handler = |_: _| Ok(());
3187		let bg_processor = BackgroundProcessor::start(
3188			persister,
3189			event_handler,
3190			Arc::clone(&nodes[0].chain_monitor),
3191			Arc::clone(&nodes[0].node),
3192			Some(Arc::clone(&nodes[0].messenger)),
3193			nodes[0].no_gossip_sync(),
3194			Arc::clone(&nodes[0].peer_manager),
3195			Some(Arc::clone(&nodes[0].liquidity_manager)),
3196			Some(Arc::clone(&nodes[0].sweeper)),
3197			Arc::clone(&nodes[0].logger),
3198			Some(Arc::clone(&nodes[0].scorer)),
3199		);
3200
3201		loop {
3202			let log_entries = nodes[0].logger.lines.lock().unwrap();
3203			let expected_log = "Calling time_passed and persisting scorer".to_string();
3204			if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
3205				break;
3206			}
3207		}
3208
3209		if !std::thread::panicking() {
3210			bg_processor.stop().unwrap();
3211		}
3212	}
3213
3214	macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
3215		($nodes: expr, $receive: expr, $sleep: expr) => {
3216			let features = ChannelFeatures::empty();
3217			$nodes[0]
3218				.network_graph
3219				.add_channel_from_partial_announcement(
3220					42,
3221					None,
3222					53,
3223					features,
3224					$nodes[0].node.get_our_node_id().into(),
3225					$nodes[1].node.get_our_node_id().into(),
3226				)
3227				.expect("Failed to update channel from partial announcement");
3228			let original_graph_description = $nodes[0].network_graph.to_string();
3229			assert!(original_graph_description.contains("42: features: 0000, node_one:"));
3230			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
3231
3232			loop {
3233				$sleep;
3234				let log_entries = $nodes[0].logger.lines.lock().unwrap();
3235				let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
3236				if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
3237					> 1
3238				{
3239					// Wait until the loop has gone around at least twice.
3240					break;
3241				}
3242			}
3243
3244			let initialization_input = vec![
3245				76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
3246				247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
3247				98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
3248				250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
3249				67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
3250				115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
3251				1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
3252				65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
3253				149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
3254				175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
3255				27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
3256				0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
3257				8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
3258				11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
3259			];
3260			$nodes[0]
3261				.rapid_gossip_sync
3262				.update_network_graph_no_std(&initialization_input[..], Some(1642291930))
3263				.unwrap();
3264
3265			// this should have added two channels and pruned the previous one.
3266			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
3267
3268			$receive.expect("Network graph not pruned within deadline");
3269
3270			// all channels should now be pruned
3271			assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
3272		};
3273	}
3274
3275	#[test]
3276	fn test_not_pruning_network_graph_until_graph_sync_completion() {
3277		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3278
3279		let (_, nodes) =
3280			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
3281		let data_dir = nodes[0].kv_store.get_data_dir();
3282		let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3283
3284		let event_handler = |_: _| Ok(());
3285		let background_processor = BackgroundProcessor::start(
3286			persister,
3287			event_handler,
3288			Arc::clone(&nodes[0].chain_monitor),
3289			Arc::clone(&nodes[0].node),
3290			Some(Arc::clone(&nodes[0].messenger)),
3291			nodes[0].rapid_gossip_sync(),
3292			Arc::clone(&nodes[0].peer_manager),
3293			Some(Arc::clone(&nodes[0].liquidity_manager)),
3294			Some(Arc::clone(&nodes[0].sweeper)),
3295			Arc::clone(&nodes[0].logger),
3296			Some(Arc::clone(&nodes[0].scorer)),
3297		);
3298
3299		do_test_not_pruning_network_graph_until_graph_sync_completion!(
3300			nodes,
3301			receiver.recv_timeout(super::FIRST_NETWORK_PRUNE_TIMER * 5),
3302			std::thread::sleep(Duration::from_millis(1))
3303		);
3304
3305		background_processor.stop().unwrap();
3306	}
3307
3308	#[tokio::test]
3309	async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
3310		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3311
3312		let (_, nodes) =
3313			create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
3314		let data_dir = nodes[0].kv_store.get_data_dir();
3315		let kv_store_sync =
3316			Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3317		let kv_store = KVStoreSyncWrapper(kv_store_sync);
3318
3319		// Yes, you can unsafe { turn off the borrow checker }
3320		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3321			&*(nodes[0].liquidity_manager.get_lm_async()
3322				as *const LiquidityManager<_, _, _, _, _, _, _>)
3323				as &'static LiquidityManager<_, _, _, _, _, _, _>
3324		};
3325		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3326			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3327				as &'static OutputSweeper<_, _, _, _, _, _, _>
3328		};
3329
3330		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3331		let bp_future = super::process_events_async(
3332			kv_store,
3333			|_: _| async { Ok(()) },
3334			Arc::clone(&nodes[0].chain_monitor),
3335			Arc::clone(&nodes[0].node),
3336			Some(Arc::clone(&nodes[0].messenger)),
3337			nodes[0].rapid_gossip_sync(),
3338			Arc::clone(&nodes[0].peer_manager),
3339			Some(lm_async),
3340			Some(sweeper_async),
3341			Arc::clone(&nodes[0].logger),
3342			Some(Arc::clone(&nodes[0].scorer)),
3343			move |dur: Duration| {
3344				let mut exit_receiver = exit_receiver.clone();
3345				Box::pin(async move {
3346					tokio::select! {
3347						_ = tokio::time::sleep(dur) => false,
3348						_ = exit_receiver.changed() => true,
3349					}
3350				})
3351			},
3352			false,
3353			|| Some(Duration::from_secs(1696300000)),
3354		);
3355
3356		let t1 = tokio::spawn(bp_future);
3357		let t2 = tokio::spawn(async move {
3358			do_test_not_pruning_network_graph_until_graph_sync_completion!(
3359				nodes,
3360				{
3361					let mut i = 0;
3362					loop {
3363						tokio::time::sleep(super::FIRST_NETWORK_PRUNE_TIMER).await;
3364						if let Ok(()) = receiver.try_recv() {
3365							break Ok::<(), ()>(());
3366						}
3367						assert!(i < 5);
3368						i += 1;
3369					}
3370				},
3371				tokio::time::sleep(Duration::from_millis(1)).await
3372			);
3373			exit_sender.send(()).unwrap();
3374		});
3375		let (r1, r2) = tokio::join!(t1, t2);
3376		r1.unwrap().unwrap();
3377		r2.unwrap()
3378	}
3379
3380	macro_rules! do_test_payment_path_scoring {
3381		($nodes: expr, $receive: expr) => {
3382			// Ensure that we update the scorer when relevant events are processed. In this case, we ensure
3383			// that we update the scorer upon a payment path succeeding (note that the channel must be
3384			// public or else we won't score it).
3385			// A background event handler for FundingGenerationReady events must be hooked up to a
3386			// running background processor.
3387			let scored_scid = 4242;
3388			let secp_ctx = Secp256k1::new();
3389			let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
3390			let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
3391
3392			let path = Path { hops: vec![RouteHop {
3393				pubkey: node_1_id,
3394				node_features: NodeFeatures::empty(),
3395				short_channel_id: scored_scid,
3396				channel_features: ChannelFeatures::empty(),
3397				fee_msat: 0,
3398				cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
3399				maybe_announced_channel: true,
3400			}], blinded_tail: None };
3401
3402			$nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
3403			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3404				payment_id: None,
3405				payment_hash: PaymentHash([42; 32]),
3406				payment_failed_permanently: false,
3407				failure: PathFailure::OnPath { network_update: None },
3408				path: path.clone(),
3409				short_channel_id: Some(scored_scid),
3410				error_code: None,
3411				error_data: None,
3412				hold_times: Vec::new(),
3413			});
3414			let event = $receive.expect("PaymentPathFailed not handled within deadline");
3415			match event {
3416				Event::PaymentPathFailed { .. } => {},
3417				_ => panic!("Unexpected event"),
3418			}
3419
3420			// Ensure we'll score payments that were explicitly failed back by the destination as
3421			// ProbeSuccess.
3422			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3423			$nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3424				payment_id: None,
3425				payment_hash: PaymentHash([42; 32]),
3426				payment_failed_permanently: true,
3427				failure: PathFailure::OnPath { network_update: None },
3428				path: path.clone(),
3429				short_channel_id: None,
3430				error_code: None,
3431				error_data: None,
3432				hold_times: Vec::new(),
3433			});
3434			let event = $receive.expect("PaymentPathFailed not handled within deadline");
3435			match event {
3436				Event::PaymentPathFailed { .. } => {},
3437				_ => panic!("Unexpected event"),
3438			}
3439
3440			$nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
3441			$nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
3442				payment_id: PaymentId([42; 32]),
3443				payment_hash: None,
3444				path: path.clone(),
3445				hold_times: Vec::new(),
3446			});
3447			let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
3448			match event {
3449				Event::PaymentPathSuccessful { .. } => {},
3450				_ => panic!("Unexpected event"),
3451			}
3452
3453			$nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3454			$nodes[0].node.push_pending_event(Event::ProbeSuccessful {
3455				payment_id: PaymentId([42; 32]),
3456				payment_hash: PaymentHash([42; 32]),
3457				path: path.clone(),
3458			});
3459			let event = $receive.expect("ProbeSuccessful not handled within deadline");
3460			match event {
3461				Event::ProbeSuccessful  { .. } => {},
3462				_ => panic!("Unexpected event"),
3463			}
3464
3465			$nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
3466			$nodes[0].node.push_pending_event(Event::ProbeFailed {
3467				payment_id: PaymentId([42; 32]),
3468				payment_hash: PaymentHash([42; 32]),
3469				path,
3470				short_channel_id: Some(scored_scid),
3471			});
3472			let event = $receive.expect("ProbeFailure not handled within deadline");
3473			match event {
3474				Event::ProbeFailed { .. } => {},
3475				_ => panic!("Unexpected event"),
3476			}
3477		}
3478	}
3479
3480	#[test]
3481	fn test_payment_path_scoring() {
3482		let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3483		let event_handler = move |event: Event| {
3484			match event {
3485				Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
3486				Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
3487				Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
3488				Event::ProbeFailed { .. } => sender.send(event).unwrap(),
3489				_ => panic!("Unexpected event: {:?}", event),
3490			}
3491			Ok(())
3492		};
3493
3494		let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
3495		let data_dir = nodes[0].kv_store.get_data_dir();
3496		let persister = Arc::new(Persister::new(data_dir));
3497		let bg_processor = BackgroundProcessor::start(
3498			persister,
3499			event_handler,
3500			Arc::clone(&nodes[0].chain_monitor),
3501			Arc::clone(&nodes[0].node),
3502			Some(Arc::clone(&nodes[0].messenger)),
3503			nodes[0].no_gossip_sync(),
3504			Arc::clone(&nodes[0].peer_manager),
3505			Some(Arc::clone(&nodes[0].liquidity_manager)),
3506			Some(Arc::clone(&nodes[0].sweeper)),
3507			Arc::clone(&nodes[0].logger),
3508			Some(Arc::clone(&nodes[0].scorer)),
3509		);
3510
3511		do_test_payment_path_scoring!(nodes, receiver.recv_timeout(EVENT_DEADLINE));
3512
3513		if !std::thread::panicking() {
3514			bg_processor.stop().unwrap();
3515		}
3516
3517		let log_entries = nodes[0].logger.lines.lock().unwrap();
3518		let expected_log = "Persisting scorer after update".to_string();
3519		assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
3520	}
3521
3522	#[tokio::test]
3523	async fn test_payment_path_scoring_async() {
3524		let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
3525		let event_handler = move |event: Event| {
3526			let sender_ref = sender.clone();
3527			async move {
3528				match event {
3529					Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
3530					Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3531					Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3532					Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
3533					_ => panic!("Unexpected event: {:?}", event),
3534				}
3535				Ok(())
3536			}
3537		};
3538
3539		let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
3540		let data_dir = nodes[0].kv_store.get_data_dir();
3541		let kv_store_sync = Arc::new(Persister::new(data_dir));
3542		let kv_store = KVStoreSyncWrapper(kv_store_sync);
3543
3544		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3545
3546		// Yes, you can unsafe { turn off the borrow checker }
3547		let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3548			&*(nodes[0].liquidity_manager.get_lm_async()
3549				as *const LiquidityManager<_, _, _, _, _, _, _>)
3550				as &'static LiquidityManager<_, _, _, _, _, _, _>
3551		};
3552		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3553			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3554				as &'static OutputSweeper<_, _, _, _, _, _, _>
3555		};
3556
3557		let bp_future = super::process_events_async(
3558			kv_store,
3559			event_handler,
3560			Arc::clone(&nodes[0].chain_monitor),
3561			Arc::clone(&nodes[0].node),
3562			Some(Arc::clone(&nodes[0].messenger)),
3563			nodes[0].no_gossip_sync(),
3564			Arc::clone(&nodes[0].peer_manager),
3565			Some(lm_async),
3566			Some(sweeper_async),
3567			Arc::clone(&nodes[0].logger),
3568			Some(Arc::clone(&nodes[0].scorer)),
3569			move |dur: Duration| {
3570				let mut exit_receiver = exit_receiver.clone();
3571				Box::pin(async move {
3572					tokio::select! {
3573						_ = tokio::time::sleep(dur) => false,
3574						_ = exit_receiver.changed() => true,
3575					}
3576				})
3577			},
3578			false,
3579			|| Some(Duration::ZERO),
3580		);
3581		let t1 = tokio::spawn(bp_future);
3582		let t2 = tokio::spawn(async move {
3583			do_test_payment_path_scoring!(nodes, receiver.recv().await);
3584			exit_sender.send(()).unwrap();
3585
3586			let log_entries = nodes[0].logger.lines.lock().unwrap();
3587			let expected_log = "Persisting scorer after update".to_string();
3588			assert_eq!(
3589				*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
3590				5
3591			);
3592		});
3593
3594		let (r1, r2) = tokio::join!(t1, t2);
3595		r1.unwrap().unwrap();
3596		r2.unwrap()
3597	}
3598
3599	#[tokio::test]
3600	#[cfg(not(c_bindings))]
3601	async fn test_no_consts() {
3602		// Compile-test the NO_* constants can be used.
3603		let (_, nodes) = create_nodes(1, "test_no_consts");
3604		let bg_processor = BackgroundProcessor::start(
3605			Arc::clone(&nodes[0].kv_store),
3606			move |_: Event| Ok(()),
3607			Arc::clone(&nodes[0].chain_monitor),
3608			Arc::clone(&nodes[0].node),
3609			crate::NO_ONION_MESSENGER,
3610			nodes[0].no_gossip_sync(),
3611			Arc::clone(&nodes[0].peer_manager),
3612			crate::NO_LIQUIDITY_MANAGER_SYNC,
3613			Some(Arc::clone(&nodes[0].sweeper)),
3614			Arc::clone(&nodes[0].logger),
3615			Some(Arc::clone(&nodes[0].scorer)),
3616		);
3617
3618		if !std::thread::panicking() {
3619			bg_processor.stop().unwrap();
3620		}
3621
3622		let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
3623		let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3624		let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3625			&*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3626				as &'static OutputSweeper<_, _, _, _, _, _, _>
3627		};
3628		let bp_future = super::process_events_async(
3629			kv_store,
3630			move |_: Event| async move { Ok(()) },
3631			Arc::clone(&nodes[0].chain_monitor),
3632			Arc::clone(&nodes[0].node),
3633			crate::NO_ONION_MESSENGER,
3634			nodes[0].no_gossip_sync(),
3635			Arc::clone(&nodes[0].peer_manager),
3636			crate::NO_LIQUIDITY_MANAGER,
3637			Some(sweeper_async),
3638			Arc::clone(&nodes[0].logger),
3639			Some(Arc::clone(&nodes[0].scorer)),
3640			move |dur: Duration| {
3641				let mut exit_receiver = exit_receiver.clone();
3642				Box::pin(async move {
3643					tokio::select! {
3644						_ = tokio::time::sleep(dur) => false,
3645						_ = exit_receiver.changed() => true,
3646					}
3647				})
3648			},
3649			false,
3650			|| Some(Duration::ZERO),
3651		);
3652		let t1 = tokio::spawn(bp_future);
3653		exit_sender.send(()).unwrap();
3654		t1.await.unwrap().unwrap();
3655	}
3656}