1#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
13#![deny(rustdoc::broken_intra_doc_links)]
14#![deny(rustdoc::private_intra_doc_links)]
15#![deny(missing_docs)]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
18
19#[cfg(any(test, feature = "std"))]
20extern crate core;
21
22#[cfg(not(feature = "std"))]
23extern crate alloc;
24
25#[macro_use]
26extern crate lightning;
27extern crate lightning_rapid_gossip_sync;
28
29mod fwd_batch;
30
31use fwd_batch::BatchDelay;
32
33use lightning::chain;
34use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35use lightning::chain::chainmonitor::{ChainMonitor, Persist};
36#[cfg(feature = "std")]
37use lightning::events::EventHandler;
38#[cfg(feature = "std")]
39use lightning::events::EventsProvider;
40use lightning::events::ReplayEvent;
41use lightning::events::{Event, PathFailure};
42use lightning::util::ser::Writeable;
43
44use lightning::ln::channelmanager::AChannelManager;
45use lightning::ln::msgs::OnionMessageHandler;
46use lightning::ln::peer_handler::APeerManager;
47use lightning::onion_message::messenger::AOnionMessenger;
48use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
49use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
50use lightning::routing::utxo::UtxoLookup;
51use lightning::sign::{
52 ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53};
54use lightning::util::logger::Logger;
55use lightning::util::persist::{
56 KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
57 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
58 NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
59 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
60 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
61};
62use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
63#[cfg(feature = "std")]
64use lightning::util::wakers::Sleeper;
65use lightning_rapid_gossip_sync::RapidGossipSync;
66
67use lightning_liquidity::ALiquidityManager;
68#[cfg(feature = "std")]
69use lightning_liquidity::ALiquidityManagerSync;
70
71use core::ops::Deref;
72use core::time::Duration;
73
74#[cfg(feature = "std")]
75use core::sync::atomic::{AtomicBool, Ordering};
76#[cfg(feature = "std")]
77use std::sync::Arc;
78#[cfg(feature = "std")]
79use std::thread::{self, JoinHandle};
80#[cfg(feature = "std")]
81use std::time::Instant;
82
83#[cfg(not(feature = "std"))]
84use alloc::boxed::Box;
85#[cfg(all(not(c_bindings), not(feature = "std")))]
86use alloc::sync::Arc;
87
88#[cfg(feature = "std")]
117#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
118pub struct BackgroundProcessor {
119 stop_thread: Arc<AtomicBool>,
120 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
121}
122
123#[cfg(not(test))]
124const FRESHNESS_TIMER: Duration = Duration::from_secs(60);
125#[cfg(test)]
126const FRESHNESS_TIMER: Duration = Duration::from_secs(1);
127
128#[cfg(all(not(test), not(debug_assertions)))]
129const PING_TIMER: Duration = Duration::from_secs(10);
130#[cfg(all(not(test), debug_assertions))]
134const PING_TIMER: Duration = Duration::from_secs(30);
135#[cfg(test)]
136const PING_TIMER: Duration = Duration::from_secs(1);
137
138#[cfg(not(test))]
139const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(10);
140#[cfg(test)]
141const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(1);
142
143const NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60 * 60);
145
146#[cfg(not(test))]
147const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(60 * 5);
148#[cfg(test)]
149const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(1);
150
151#[cfg(not(test))]
152const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60);
153#[cfg(test)]
154const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(1);
155
156#[cfg(not(test))]
157const REBROADCAST_TIMER: Duration = Duration::from_secs(30);
158#[cfg(test)]
159const REBROADCAST_TIMER: Duration = Duration::from_secs(1);
160
161#[cfg(not(test))]
162const SWEEPER_TIMER: Duration = Duration::from_secs(30);
163#[cfg(test)]
164const SWEEPER_TIMER: Duration = Duration::from_secs(1);
165
166const fn min_duration(a: Duration, b: Duration) -> Duration {
168 if a.as_nanos() < b.as_nanos() {
169 a
170 } else {
171 b
172 }
173}
174const FASTEST_TIMER: Duration = min_duration(
175 min_duration(FRESHNESS_TIMER, PING_TIMER),
176 min_duration(SCORER_PERSIST_TIMER, min_duration(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
177);
178
179pub enum GossipSync<
181 P: Deref<Target = P2PGossipSync<G, U, L>>,
182 R: Deref<Target = RapidGossipSync<G, L>>,
183 G: Deref<Target = NetworkGraph<L>>,
184 U: Deref,
185 L: Deref,
186> where
187 U::Target: UtxoLookup,
188 L::Target: Logger,
189{
190 P2P(P),
192 Rapid(R),
194 None,
196}
197
198impl<
199 P: Deref<Target = P2PGossipSync<G, U, L>>,
200 R: Deref<Target = RapidGossipSync<G, L>>,
201 G: Deref<Target = NetworkGraph<L>>,
202 U: Deref,
203 L: Deref,
204 > GossipSync<P, R, G, U, L>
205where
206 U::Target: UtxoLookup,
207 L::Target: Logger,
208{
209 fn network_graph(&self) -> Option<&G> {
210 match self {
211 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
212 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
213 GossipSync::None => None,
214 }
215 }
216
217 fn prunable_network_graph(&self) -> Option<&G> {
218 match self {
219 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
220 GossipSync::Rapid(gossip_sync) => {
221 if gossip_sync.is_initial_sync_complete() {
222 Some(gossip_sync.network_graph())
223 } else {
224 None
225 }
226 },
227 GossipSync::None => None,
228 }
229 }
230}
231
232impl<
234 P: Deref<Target = P2PGossipSync<G, U, L>>,
235 G: Deref<Target = NetworkGraph<L>>,
236 U: Deref,
237 L: Deref,
238 > GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
239where
240 U::Target: UtxoLookup,
241 L::Target: Logger,
242{
243 pub fn p2p(gossip_sync: P) -> Self {
245 GossipSync::P2P(gossip_sync)
246 }
247}
248
249impl<
251 'a,
252 R: Deref<Target = RapidGossipSync<G, L>>,
253 G: Deref<Target = NetworkGraph<L>>,
254 L: Deref,
255 >
256 GossipSync<
257 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
258 R,
259 G,
260 &'a (dyn UtxoLookup + Send + Sync),
261 L,
262 > where
263 L::Target: Logger,
264{
265 pub fn rapid(gossip_sync: R) -> Self {
267 GossipSync::Rapid(gossip_sync)
268 }
269}
270
271impl<'a, L: Deref>
273 GossipSync<
274 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
275 &RapidGossipSync<&'a NetworkGraph<L>, L>,
276 &'a NetworkGraph<L>,
277 &'a (dyn UtxoLookup + Send + Sync),
278 L,
279 > where
280 L::Target: Logger,
281{
282 pub fn none() -> Self {
284 GossipSync::None
285 }
286}
287
288fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
289where
290 L::Target: Logger,
291{
292 if let Event::PaymentPathFailed {
293 failure: PathFailure::OnPath { network_update: Some(ref upd) },
294 ..
295 } = event
296 {
297 network_graph.handle_network_update(upd);
298 }
299}
300
301fn update_scorer<'a, S: Deref<Target = SC>, SC: 'a + WriteableScore<'a>>(
304 scorer: &'a S, event: &Event, duration_since_epoch: Duration,
305) -> bool {
306 match event {
307 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
308 let mut score = scorer.write_lock();
309 score.payment_path_failed(path, *scid, duration_since_epoch);
310 },
311 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
312 let mut score = scorer.write_lock();
315 score.probe_successful(path, duration_since_epoch);
316 },
317 Event::PaymentPathSuccessful { path, .. } => {
318 let mut score = scorer.write_lock();
319 score.payment_path_successful(path, duration_since_epoch);
320 },
321 Event::ProbeSuccessful { path, .. } => {
322 let mut score = scorer.write_lock();
323 score.probe_successful(path, duration_since_epoch);
324 },
325 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
326 let mut score = scorer.write_lock();
327 score.probe_failed(path, *scid, duration_since_epoch);
328 },
329 _ => return false,
330 }
331 true
332}
333
334#[cfg(all(not(c_bindings), feature = "std"))]
335type ScorerWrapper<T> = std::sync::RwLock<T>;
336
337#[cfg(all(not(c_bindings), not(feature = "std")))]
338type ScorerWrapper<T> = core::cell::RefCell<T>;
339
340#[cfg(not(c_bindings))]
341type DynRouter = lightning::routing::router::DefaultRouter<
342 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
343 &'static (dyn Logger + Send + Sync),
344 &'static (dyn EntropySource + Send + Sync),
345 &'static ScorerWrapper<
346 lightning::routing::scoring::ProbabilisticScorer<
347 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
348 &'static (dyn Logger + Send + Sync),
349 >,
350 >,
351 lightning::routing::scoring::ProbabilisticScoringFeeParameters,
352 lightning::routing::scoring::ProbabilisticScorer<
353 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
354 &'static (dyn Logger + Send + Sync),
355 >,
356>;
357
358#[cfg(not(c_bindings))]
359type DynMessageRouter = lightning::onion_message::messenger::DefaultMessageRouter<
360 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
361 &'static (dyn Logger + Send + Sync),
362 &'static (dyn EntropySource + Send + Sync),
363>;
364
365#[cfg(all(not(c_bindings), not(taproot)))]
366type DynSignerProvider = dyn lightning::sign::SignerProvider<EcdsaSigner = lightning::sign::InMemorySigner>
367 + Send
368 + Sync;
369
370#[cfg(all(not(c_bindings), taproot))]
371type DynSignerProvider = (dyn lightning::sign::SignerProvider<
372 EcdsaSigner = lightning::sign::InMemorySigner,
373 TaprootSigner = lightning::sign::InMemorySigner,
374> + Send
375 + Sync);
376
377#[cfg(not(c_bindings))]
378type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
379 &'static (dyn chain::Watch<lightning::sign::InMemorySigner> + Send + Sync),
380 &'static (dyn BroadcasterInterface + Send + Sync),
381 &'static (dyn EntropySource + Send + Sync),
382 &'static (dyn lightning::sign::NodeSigner + Send + Sync),
383 &'static DynSignerProvider,
384 &'static (dyn FeeEstimator + Send + Sync),
385 &'static DynRouter,
386 &'static DynMessageRouter,
387 &'static (dyn Logger + Send + Sync),
388>;
389
390#[cfg(not(c_bindings))]
393pub const NO_ONION_MESSENGER: Option<
394 Arc<
395 dyn AOnionMessenger<
396 EntropySource = dyn EntropySource + Send + Sync,
397 ES = &(dyn EntropySource + Send + Sync),
398 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
399 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
400 Logger = dyn Logger + Send + Sync,
401 L = &'static (dyn Logger + Send + Sync),
402 NodeIdLookUp = DynChannelManager,
403 NL = &'static DynChannelManager,
404 MessageRouter = DynMessageRouter,
405 MR = &'static DynMessageRouter,
406 OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
407 OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
408 AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
409 APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
410 DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
411 DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
412 CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
413 CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
414 > + Send
415 + Sync,
416 >,
417> = None;
418
419#[cfg(not(c_bindings))]
422pub const NO_LIQUIDITY_MANAGER: Option<
423 Arc<
424 dyn ALiquidityManager<
425 EntropySource = dyn EntropySource + Send + Sync,
426 ES = &(dyn EntropySource + Send + Sync),
427 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
428 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
429 AChannelManager = DynChannelManager,
430 CM = &DynChannelManager,
431 Filter = dyn chain::Filter + Send + Sync,
432 C = &(dyn chain::Filter + Send + Sync),
433 KVStore = dyn lightning::util::persist::KVStore + Send + Sync,
434 K = &(dyn lightning::util::persist::KVStore + Send + Sync),
435 TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
436 TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
437 BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
438 + Send
439 + Sync,
440 T = &(dyn BroadcasterInterface + Send + Sync),
441 > + Send
442 + Sync,
443 >,
444> = None;
445
446#[cfg(all(not(c_bindings), feature = "std"))]
449pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
450 Arc<
451 dyn ALiquidityManagerSync<
452 EntropySource = dyn EntropySource + Send + Sync,
453 ES = &(dyn EntropySource + Send + Sync),
454 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
455 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
456 AChannelManager = DynChannelManager,
457 CM = &DynChannelManager,
458 Filter = dyn chain::Filter + Send + Sync,
459 C = &(dyn chain::Filter + Send + Sync),
460 KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
461 KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
462 TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
463 TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
464 BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
465 + Send
466 + Sync,
467 T = &(dyn BroadcasterInterface + Send + Sync),
468 > + Send
469 + Sync,
470 >,
471> = None;
472
473pub(crate) mod futures_util {
474 use core::future::Future;
475 use core::marker::Unpin;
476 use core::pin::Pin;
477 use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
478 pub(crate) struct Selector<
479 A: Future<Output = bool> + Unpin,
480 B: Future<Output = ()> + Unpin,
481 C: Future<Output = ()> + Unpin,
482 D: Future<Output = ()> + Unpin,
483 E: Future<Output = ()> + Unpin,
484 > {
485 pub a: A,
486 pub b: B,
487 pub c: C,
488 pub d: D,
489 pub e: E,
490 }
491
492 pub(crate) enum SelectorOutput {
493 A(bool),
494 B,
495 C,
496 D,
497 E,
498 }
499
500 impl<
501 A: Future<Output = bool> + Unpin,
502 B: Future<Output = ()> + Unpin,
503 C: Future<Output = ()> + Unpin,
504 D: Future<Output = ()> + Unpin,
505 E: Future<Output = ()> + Unpin,
506 > Future for Selector<A, B, C, D, E>
507 {
508 type Output = SelectorOutput;
509 fn poll(
510 mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
511 ) -> Poll<SelectorOutput> {
512 match Pin::new(&mut self.a).poll(ctx) {
515 Poll::Ready(res) => {
516 return Poll::Ready(SelectorOutput::A(res));
517 },
518 Poll::Pending => {},
519 }
520 match Pin::new(&mut self.b).poll(ctx) {
521 Poll::Ready(()) => {
522 return Poll::Ready(SelectorOutput::B);
523 },
524 Poll::Pending => {},
525 }
526 match Pin::new(&mut self.c).poll(ctx) {
527 Poll::Ready(()) => {
528 return Poll::Ready(SelectorOutput::C);
529 },
530 Poll::Pending => {},
531 }
532 match Pin::new(&mut self.d).poll(ctx) {
533 Poll::Ready(()) => {
534 return Poll::Ready(SelectorOutput::D);
535 },
536 Poll::Pending => {},
537 }
538 match Pin::new(&mut self.e).poll(ctx) {
539 Poll::Ready(()) => {
540 return Poll::Ready(SelectorOutput::E);
541 },
542 Poll::Pending => {},
543 }
544 Poll::Pending
545 }
546 }
547
548 pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
551 pub optional_future: Option<F>,
552 }
553
554 impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
555 type Output = ();
556 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
557 match self.optional_future.as_mut() {
558 Some(f) => match Pin::new(f).poll(ctx) {
559 Poll::Ready(()) => {
560 self.optional_future.take();
561 Poll::Ready(())
562 },
563 Poll::Pending => Poll::Pending,
564 },
565 None => Poll::Pending,
566 }
567 }
568 }
569
570 fn dummy_waker_clone(_: *const ()) -> RawWaker {
574 RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
575 }
576 fn dummy_waker_action(_: *const ()) {}
577
578 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
579 dummy_waker_clone,
580 dummy_waker_action,
581 dummy_waker_action,
582 dummy_waker_action,
583 );
584 pub(crate) fn dummy_waker() -> Waker {
585 unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
586 }
587
588 enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
589 Pending(Option<F>),
590 Ready(Result<(), ERR>),
591 }
592
593 pub(crate) struct Joiner<
594 ERR,
595 A: Future<Output = Result<(), ERR>> + Unpin,
596 B: Future<Output = Result<(), ERR>> + Unpin,
597 C: Future<Output = Result<(), ERR>> + Unpin,
598 D: Future<Output = Result<(), ERR>> + Unpin,
599 E: Future<Output = Result<(), ERR>> + Unpin,
600 > {
601 a: JoinerResult<ERR, A>,
602 b: JoinerResult<ERR, B>,
603 c: JoinerResult<ERR, C>,
604 d: JoinerResult<ERR, D>,
605 e: JoinerResult<ERR, E>,
606 }
607
608 impl<
609 ERR,
610 A: Future<Output = Result<(), ERR>> + Unpin,
611 B: Future<Output = Result<(), ERR>> + Unpin,
612 C: Future<Output = Result<(), ERR>> + Unpin,
613 D: Future<Output = Result<(), ERR>> + Unpin,
614 E: Future<Output = Result<(), ERR>> + Unpin,
615 > Joiner<ERR, A, B, C, D, E>
616 {
617 pub(crate) fn new() -> Self {
618 Self {
619 a: JoinerResult::Pending(None),
620 b: JoinerResult::Pending(None),
621 c: JoinerResult::Pending(None),
622 d: JoinerResult::Pending(None),
623 e: JoinerResult::Pending(None),
624 }
625 }
626
627 pub(crate) fn set_a(&mut self, fut: A) {
628 self.a = JoinerResult::Pending(Some(fut));
629 }
630 pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
631 self.a = JoinerResult::Ready(res);
632 }
633 pub(crate) fn set_b(&mut self, fut: B) {
634 self.b = JoinerResult::Pending(Some(fut));
635 }
636 pub(crate) fn set_c(&mut self, fut: C) {
637 self.c = JoinerResult::Pending(Some(fut));
638 }
639 pub(crate) fn set_d(&mut self, fut: D) {
640 self.d = JoinerResult::Pending(Some(fut));
641 }
642 pub(crate) fn set_e(&mut self, fut: E) {
643 self.e = JoinerResult::Pending(Some(fut));
644 }
645 }
646
647 impl<
648 ERR,
649 A: Future<Output = Result<(), ERR>> + Unpin,
650 B: Future<Output = Result<(), ERR>> + Unpin,
651 C: Future<Output = Result<(), ERR>> + Unpin,
652 D: Future<Output = Result<(), ERR>> + Unpin,
653 E: Future<Output = Result<(), ERR>> + Unpin,
654 > Future for Joiner<ERR, A, B, C, D, E>
655 where
656 Joiner<ERR, A, B, C, D, E>: Unpin,
657 {
658 type Output = [Result<(), ERR>; 5];
659 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
660 let mut all_complete = true;
661 macro_rules! handle {
662 ($val: ident) => {
663 match &mut (self.$val) {
664 JoinerResult::Pending(None) => {
665 self.$val = JoinerResult::Ready(Ok(()));
666 },
667 JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
668 match Pin::new(val).poll(ctx) {
669 Poll::Ready(res) => {
670 self.$val = JoinerResult::Ready(res);
671 },
672 Poll::Pending => {
673 all_complete = false;
674 },
675 }
676 },
677 JoinerResult::Ready(_) => {},
678 }
679 };
680 }
681 handle!(a);
682 handle!(b);
683 handle!(c);
684 handle!(d);
685 handle!(e);
686
687 if all_complete {
688 let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
689 if let JoinerResult::Ready(ref mut val) = &mut self.a {
690 core::mem::swap(&mut res[0], val);
691 }
692 if let JoinerResult::Ready(ref mut val) = &mut self.b {
693 core::mem::swap(&mut res[1], val);
694 }
695 if let JoinerResult::Ready(ref mut val) = &mut self.c {
696 core::mem::swap(&mut res[2], val);
697 }
698 if let JoinerResult::Ready(ref mut val) = &mut self.d {
699 core::mem::swap(&mut res[3], val);
700 }
701 if let JoinerResult::Ready(ref mut val) = &mut self.e {
702 core::mem::swap(&mut res[4], val);
703 }
704 Poll::Ready(res)
705 } else {
706 Poll::Pending
707 }
708 }
709 }
710}
711use core::task;
712use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
713
714#[cfg_attr(
722 feature = "std",
723 doc = " See [`BackgroundProcessor::start`] for information on which actions this handles.\n"
724)]
725#[cfg_attr(
834 feature = "std",
835 doc = " let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());"
836)]
837#[cfg_attr(feature = "std", doc = "")]
838#[cfg_attr(feature = "std", doc = " let mut receiver = stop_receiver.clone();")]
840#[cfg_attr(feature = "std", doc = " _ = receiver.changed() => true,")]
844#[cfg_attr(feature = "std", doc = " let handle = tokio::spawn(async move {")]
851#[cfg_attr(
852 not(feature = "std"),
853 doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
854)]
855#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
856#[cfg_attr(feature = "std", doc = " stop_sender.send(()).unwrap();")]
878#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
879pub async fn process_events_async<
882 'a,
883 UL: Deref,
884 CF: Deref,
885 T: Deref,
886 F: Deref,
887 G: Deref<Target = NetworkGraph<L>>,
888 L: Deref,
889 P: Deref,
890 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
891 EventHandler: Fn(Event) -> EventHandlerFuture,
892 ES: Deref,
893 M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
894 CM: Deref,
895 OM: Deref,
896 PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
897 RGS: Deref<Target = RapidGossipSync<G, L>>,
898 PM: Deref,
899 LM: Deref,
900 D: Deref,
901 O: Deref,
902 K: Deref,
903 OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
904 S: Deref<Target = SC>,
905 SC: for<'b> WriteableScore<'b>,
906 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
907 Sleeper: Fn(Duration) -> SleepFuture,
908 FetchTime: Fn() -> Option<Duration>,
909>(
910 kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
911 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
912 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
913 sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
914) -> Result<(), lightning::io::Error>
915where
916 UL::Target: UtxoLookup,
917 CF::Target: chain::Filter,
918 T::Target: BroadcasterInterface,
919 F::Target: FeeEstimator,
920 L::Target: Logger,
921 P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
922 ES::Target: EntropySource,
923 CM::Target: AChannelManager,
924 OM::Target: AOnionMessenger,
925 PM::Target: APeerManager,
926 LM::Target: ALiquidityManager,
927 O::Target: OutputSpender,
928 D::Target: ChangeDestinationSource,
929 K::Target: KVStore,
930{
931 let async_event_handler = |event| {
932 let network_graph = gossip_sync.network_graph();
933 let event_handler = &event_handler;
934 let scorer = &scorer;
935 let logger = &logger;
936 let kv_store = &kv_store;
937 let fetch_time = &fetch_time;
938 Box::pin(async move {
940 if let Some(network_graph) = network_graph {
941 handle_network_graph_update(network_graph, &event)
942 }
943 if let Some(ref scorer) = scorer {
944 if let Some(duration_since_epoch) = fetch_time() {
945 if update_scorer(scorer, &event, duration_since_epoch) {
946 log_trace!(logger, "Persisting scorer after update");
947 if let Err(e) = kv_store
948 .write(
949 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
950 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
951 SCORER_PERSISTENCE_KEY,
952 scorer.encode(),
953 )
954 .await
955 {
956 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
957 }
962 }
963 }
964 }
965 event_handler(event).await
966 })
967 };
968 let mut batch_delay = BatchDelay::new();
969
970 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
971 channel_manager.get_cm().timer_tick_occurred();
972 log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
973 chain_monitor.rebroadcast_pending_claims();
974
975 let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
976 let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
977 let mut last_ping_call = sleeper(PING_TIMER);
978 let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER);
979 let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
980 let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
981 let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
982 let mut have_pruned = false;
983 let mut have_decayed_scorer = false;
984
985 let mut last_forwards_processing_call = sleeper(batch_delay.get());
986
987 loop {
988 channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
989 chain_monitor.process_pending_events_async(async_event_handler).await;
990 if let Some(om) = &onion_messenger {
991 om.get_om().process_pending_events_async(async_event_handler).await
992 }
993
994 peer_manager.as_ref().process_events();
1006 match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
1007 sleeper(batch_delay.next())
1008 }) {
1009 Some(false) => {
1010 channel_manager.get_cm().process_pending_htlc_forwards();
1011 },
1012 Some(true) => break,
1013 None => {},
1014 }
1015
1016 let mut await_start = None;
1019 if mobile_interruptable_platform {
1020 await_start = Some(sleeper(Duration::from_secs(1)));
1021 }
1022 let om_fut = if let Some(om) = onion_messenger.as_ref() {
1023 let fut = om.get_om().get_update_future();
1024 OptionalSelector { optional_future: Some(fut) }
1025 } else {
1026 OptionalSelector { optional_future: None }
1027 };
1028 let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1029 let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1030 OptionalSelector { optional_future: Some(fut) }
1031 } else {
1032 OptionalSelector { optional_future: None }
1033 };
1034 let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
1035 let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
1036 (true, true) => batch_delay.get().min(Duration::from_millis(100)),
1037 (true, false) => batch_delay.get().min(FASTEST_TIMER),
1038 (false, true) => Duration::from_millis(100),
1039 (false, false) => FASTEST_TIMER,
1040 };
1041 let fut = Selector {
1042 a: sleeper(sleep_delay),
1043 b: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1044 c: chain_monitor.get_update_future(),
1045 d: om_fut,
1046 e: lm_fut,
1047 };
1048 match fut.await {
1049 SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {},
1050 SelectorOutput::A(exit) => {
1051 if exit {
1052 break;
1053 }
1054 },
1055 }
1056
1057 let await_slow = if mobile_interruptable_platform {
1058 match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
1061 Some(true) => break,
1062 Some(false) => true,
1063 None => false,
1064 }
1065 } else {
1066 false
1067 };
1068 match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
1069 Some(false) => {
1070 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1071 channel_manager.get_cm().timer_tick_occurred();
1072 },
1073 Some(true) => break,
1074 None => {},
1075 }
1076
1077 let mut futures = Joiner::new();
1078
1079 if channel_manager.get_cm().get_and_clear_needs_persistence() {
1080 log_trace!(logger, "Persisting ChannelManager...");
1081
1082 let fut = async {
1083 kv_store
1084 .write(
1085 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1086 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1087 CHANNEL_MANAGER_PERSISTENCE_KEY,
1088 channel_manager.get_cm().encode(),
1089 )
1090 .await
1091 };
1092 let mut fut = Box::pin(fut);
1094
1095 use core::future::Future;
1101 let mut waker = dummy_waker();
1102 let mut ctx = task::Context::from_waker(&mut waker);
1103 match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1104 task::Poll::Ready(res) => futures.set_a_res(res),
1105 task::Poll::Pending => futures.set_a(fut),
1106 }
1107
1108 log_trace!(logger, "Done persisting ChannelManager.");
1109 }
1110
1111 let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
1117 NETWORK_PRUNE_TIMER
1118 } else {
1119 FIRST_NETWORK_PRUNE_TIMER
1120 };
1121 let prune_timer_elapsed = {
1122 match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
1123 Some(false) => true,
1124 Some(true) => break,
1125 None => false,
1126 }
1127 };
1128
1129 let should_prune = match gossip_sync {
1130 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1131 _ => prune_timer_elapsed,
1132 };
1133 if should_prune {
1134 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1136 if let Some(duration_since_epoch) = fetch_time() {
1137 log_trace!(logger, "Pruning and persisting network graph.");
1138 network_graph.remove_stale_channels_and_tracking_with_time(
1139 duration_since_epoch.as_secs(),
1140 );
1141 } else {
1142 log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
1143 log_trace!(logger, "Persisting network graph.");
1144 }
1145 let fut = async {
1146 if let Err(e) = kv_store
1147 .write(
1148 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1149 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1150 NETWORK_GRAPH_PERSISTENCE_KEY,
1151 network_graph.encode(),
1152 )
1153 .await
1154 {
1155 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1156 }
1157
1158 Ok(())
1159 };
1160
1161 futures.set_b(Box::pin(fut));
1163
1164 have_pruned = true;
1165 }
1166 }
1167 if !have_decayed_scorer {
1168 if let Some(ref scorer) = scorer {
1169 if let Some(duration_since_epoch) = fetch_time() {
1170 log_trace!(logger, "Calling time_passed on scorer at startup");
1171 scorer.write_lock().time_passed(duration_since_epoch);
1172 }
1173 }
1174 have_decayed_scorer = true;
1175 }
1176 match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1177 sleeper(SCORER_PERSIST_TIMER)
1178 }) {
1179 Some(false) => {
1180 if let Some(ref scorer) = scorer {
1181 if let Some(duration_since_epoch) = fetch_time() {
1182 log_trace!(logger, "Calling time_passed and persisting scorer");
1183 scorer.write_lock().time_passed(duration_since_epoch);
1184 } else {
1185 log_trace!(logger, "Persisting scorer");
1186 }
1187 let fut = async {
1188 if let Err(e) = kv_store
1189 .write(
1190 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1191 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1192 SCORER_PERSISTENCE_KEY,
1193 scorer.encode(),
1194 )
1195 .await
1196 {
1197 log_error!(
1198 logger,
1199 "Error: Failed to persist scorer, check your disk and permissions {}",
1200 e
1201 );
1202 }
1203
1204 Ok(())
1205 };
1206
1207 futures.set_c(Box::pin(fut));
1209 }
1210 },
1211 Some(true) => break,
1212 None => {},
1213 }
1214 match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1215 Some(false) => {
1216 log_trace!(logger, "Regenerating sweeper spends if necessary");
1217 if let Some(ref sweeper) = sweeper {
1218 let fut = async {
1219 let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1220
1221 Ok(())
1222 };
1223
1224 futures.set_d(Box::pin(fut));
1226 }
1227 },
1228 Some(true) => break,
1229 None => {},
1230 }
1231
1232 if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1233 let fut = async {
1234 liquidity_manager
1235 .get_lm()
1236 .persist()
1237 .await
1238 .map(|did_persist| {
1239 if did_persist {
1240 log_trace!(logger, "Persisted LiquidityManager.");
1241 }
1242 })
1243 .map_err(|e| {
1244 log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1245 e
1246 })
1247 };
1248 futures.set_e(Box::pin(fut));
1249 }
1250
1251 for res in futures.await {
1253 res?;
1254 }
1255
1256 match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1257 sleeper(ONION_MESSAGE_HANDLER_TIMER)
1258 }) {
1259 Some(false) => {
1260 if let Some(om) = &onion_messenger {
1261 log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1262 om.get_om().timer_tick_occurred();
1263 }
1264 },
1265 Some(true) => break,
1266 None => {},
1267 }
1268
1269 if await_slow {
1271 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
1284 peer_manager.as_ref().disconnect_all_peers();
1285 last_ping_call = sleeper(PING_TIMER);
1286 } else {
1287 match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
1288 Some(false) => {
1289 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1290 peer_manager.as_ref().timer_tick_occurred();
1291 },
1292 Some(true) => break,
1293 _ => {},
1294 }
1295 }
1296
1297 match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
1299 Some(false) => {
1300 log_trace!(logger, "Rebroadcasting monitor's pending claims");
1301 chain_monitor.rebroadcast_pending_claims();
1302 },
1303 Some(true) => break,
1304 None => {},
1305 }
1306 }
1307 log_trace!(logger, "Terminating background processor.");
1308
1309 kv_store
1313 .write(
1314 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1315 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1316 CHANNEL_MANAGER_PERSISTENCE_KEY,
1317 channel_manager.get_cm().encode(),
1318 )
1319 .await?;
1320 if let Some(ref scorer) = scorer {
1321 kv_store
1322 .write(
1323 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1324 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1325 SCORER_PERSISTENCE_KEY,
1326 scorer.encode(),
1327 )
1328 .await?;
1329 }
1330 if let Some(network_graph) = gossip_sync.network_graph() {
1331 kv_store
1332 .write(
1333 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1334 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1335 NETWORK_GRAPH_PERSISTENCE_KEY,
1336 network_graph.encode(),
1337 )
1338 .await?;
1339 }
1340 Ok(())
1341}
1342
1343fn check_and_reset_sleeper<
1344 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1345>(
1346 fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
1347) -> Option<bool> {
1348 let mut waker = dummy_waker();
1349 let mut ctx = task::Context::from_waker(&mut waker);
1350 match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1351 task::Poll::Ready(exit) => {
1352 *fut = new_sleeper();
1353 Some(exit)
1354 },
1355 task::Poll::Pending => None,
1356 }
1357}
1358
1359pub async fn process_events_async_with_kv_store_sync<
1362 UL: Deref,
1363 CF: Deref,
1364 T: Deref,
1365 F: Deref,
1366 G: Deref<Target = NetworkGraph<L>>,
1367 L: Deref,
1368 P: Deref,
1369 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1370 EventHandler: Fn(Event) -> EventHandlerFuture,
1371 ES: Deref,
1372 M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1373 CM: Deref,
1374 OM: Deref,
1375 PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1376 RGS: Deref<Target = RapidGossipSync<G, L>>,
1377 PM: Deref,
1378 LM: Deref,
1379 D: Deref,
1380 O: Deref,
1381 K: Deref,
1382 OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1383 S: Deref<Target = SC>,
1384 SC: for<'b> WriteableScore<'b>,
1385 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1386 Sleeper: Fn(Duration) -> SleepFuture,
1387 FetchTime: Fn() -> Option<Duration>,
1388>(
1389 kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1390 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1391 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1392 sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1393) -> Result<(), lightning::io::Error>
1394where
1395 UL::Target: UtxoLookup,
1396 CF::Target: chain::Filter,
1397 T::Target: BroadcasterInterface,
1398 F::Target: FeeEstimator,
1399 L::Target: Logger,
1400 P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1401 ES::Target: EntropySource,
1402 CM::Target: AChannelManager,
1403 OM::Target: AOnionMessenger,
1404 PM::Target: APeerManager,
1405 LM::Target: ALiquidityManager,
1406 O::Target: OutputSpender,
1407 D::Target: ChangeDestinationSourceSync,
1408 K::Target: KVStoreSync,
1409{
1410 let kv_store = KVStoreSyncWrapper(kv_store);
1411 process_events_async(
1412 kv_store,
1413 event_handler,
1414 chain_monitor,
1415 channel_manager,
1416 onion_messenger,
1417 gossip_sync,
1418 peer_manager,
1419 liquidity_manager,
1420 sweeper.as_ref().map(|os| os.sweeper_async()),
1421 logger,
1422 scorer,
1423 sleeper,
1424 mobile_interruptable_platform,
1425 fetch_time,
1426 )
1427 .await
1428}
1429
1430#[cfg(feature = "std")]
1431impl BackgroundProcessor {
1432 pub fn start<
1475 'a,
1476 UL: 'static + Deref,
1477 CF: 'static + Deref,
1478 T: 'static + Deref,
1479 F: 'static + Deref + Send,
1480 G: 'static + Deref<Target = NetworkGraph<L>>,
1481 L: 'static + Deref + Send,
1482 P: 'static + Deref,
1483 EH: 'static + EventHandler + Send,
1484 ES: 'static + Deref + Send,
1485 M: 'static
1486 + Deref<
1487 Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1488 >
1489 + Send
1490 + Sync,
1491 CM: 'static + Deref + Send,
1492 OM: 'static + Deref + Send,
1493 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
1494 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1495 PM: 'static + Deref + Send,
1496 LM: 'static + Deref + Send,
1497 S: 'static + Deref<Target = SC> + Send + Sync,
1498 SC: for<'b> WriteableScore<'b>,
1499 D: 'static + Deref,
1500 O: 'static + Deref,
1501 K: 'static + Deref + Send,
1502 OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1503 >(
1504 kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
1505 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1506 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1507 ) -> Self
1508 where
1509 UL::Target: 'static + UtxoLookup,
1510 CF::Target: 'static + chain::Filter,
1511 T::Target: 'static + BroadcasterInterface,
1512 F::Target: 'static + FeeEstimator,
1513 L::Target: 'static + Logger,
1514 P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1515 ES::Target: 'static + EntropySource,
1516 CM::Target: AChannelManager,
1517 OM::Target: AOnionMessenger,
1518 PM::Target: APeerManager,
1519 LM::Target: ALiquidityManagerSync,
1520 D::Target: ChangeDestinationSourceSync,
1521 O::Target: 'static + OutputSpender,
1522 K::Target: 'static + KVStoreSync,
1523 {
1524 let stop_thread = Arc::new(AtomicBool::new(false));
1525 let stop_thread_clone = Arc::clone(&stop_thread);
1526 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1527 let event_handler = |event| {
1528 let network_graph = gossip_sync.network_graph();
1529 if let Some(network_graph) = network_graph {
1530 handle_network_graph_update(network_graph, &event)
1531 }
1532 if let Some(ref scorer) = scorer {
1533 use std::time::SystemTime;
1534 let duration_since_epoch = SystemTime::now()
1535 .duration_since(SystemTime::UNIX_EPOCH)
1536 .expect("Time should be sometime after 1970");
1537 if update_scorer(scorer, &event, duration_since_epoch) {
1538 log_trace!(logger, "Persisting scorer after update");
1539 if let Err(e) = kv_store.write(
1540 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1541 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1542 SCORER_PERSISTENCE_KEY,
1543 scorer.encode(),
1544 ) {
1545 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1546 }
1547 }
1548 }
1549 event_handler.handle_event(event)
1550 };
1551 let mut batch_delay = BatchDelay::new();
1552
1553 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
1554 channel_manager.get_cm().timer_tick_occurred();
1555 log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1556 chain_monitor.rebroadcast_pending_claims();
1557
1558 let mut last_freshness_call = Instant::now();
1559 let mut last_onion_message_handler_call = Instant::now();
1560 let mut last_ping_call = Instant::now();
1561 let mut last_prune_call = Instant::now();
1562 let mut last_scorer_persist_call = Instant::now();
1563 let mut last_rebroadcast_call = Instant::now();
1564 let mut last_sweeper_call = Instant::now();
1565 let mut have_pruned = false;
1566 let mut have_decayed_scorer = false;
1567
1568 let mut cur_batch_delay = batch_delay.get();
1569 let mut last_forwards_processing_call = Instant::now();
1570
1571 loop {
1572 channel_manager.get_cm().process_pending_events(&event_handler);
1573 chain_monitor.process_pending_events(&event_handler);
1574 if let Some(om) = &onion_messenger {
1575 om.get_om().process_pending_events(&event_handler)
1576 };
1577
1578 peer_manager.as_ref().process_events();
1590 if last_forwards_processing_call.elapsed() > cur_batch_delay {
1591 channel_manager.get_cm().process_pending_htlc_forwards();
1592 cur_batch_delay = batch_delay.next();
1593 last_forwards_processing_call = Instant::now();
1594 }
1595 if stop_thread.load(Ordering::Acquire) {
1596 log_trace!(logger, "Terminating background processor.");
1597 break;
1598 }
1599 let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1600 (Some(om), Some(lm)) => Sleeper::from_four_futures(
1601 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1602 &chain_monitor.get_update_future(),
1603 &om.get_om().get_update_future(),
1604 &lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1605 ),
1606 (Some(om), None) => Sleeper::from_three_futures(
1607 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1608 &chain_monitor.get_update_future(),
1609 &om.get_om().get_update_future(),
1610 ),
1611 (None, Some(lm)) => Sleeper::from_three_futures(
1612 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1613 &chain_monitor.get_update_future(),
1614 &lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1615 ),
1616 (None, None) => Sleeper::from_two_futures(
1617 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1618 &chain_monitor.get_update_future(),
1619 ),
1620 };
1621 let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1622 batch_delay.get()
1623 } else {
1624 Duration::MAX
1625 };
1626 let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1627 sleeper.wait_timeout(fastest_timeout);
1628 if stop_thread.load(Ordering::Acquire) {
1629 log_trace!(logger, "Terminating background processor.");
1630 break;
1631 }
1632 if last_freshness_call.elapsed() > FRESHNESS_TIMER {
1633 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1634 channel_manager.get_cm().timer_tick_occurred();
1635 last_freshness_call = Instant::now();
1636 }
1637 if channel_manager.get_cm().get_and_clear_needs_persistence() {
1638 log_trace!(logger, "Persisting ChannelManager...");
1639 (kv_store.write(
1640 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1641 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1642 CHANNEL_MANAGER_PERSISTENCE_KEY,
1643 channel_manager.get_cm().encode(),
1644 ))?;
1645 log_trace!(logger, "Done persisting ChannelManager.");
1646 }
1647
1648 if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1649 log_trace!(logger, "Persisting LiquidityManager...");
1650 let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1651 log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1652 });
1653 }
1654
1655 let prune_timer =
1661 if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1662 let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
1663 let should_prune = match gossip_sync {
1664 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1665 _ => prune_timer_elapsed,
1666 };
1667 if should_prune {
1668 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1670 let duration_since_epoch = std::time::SystemTime::now()
1671 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1672 .expect("Time should be sometime after 1970");
1673
1674 log_trace!(logger, "Pruning and persisting network graph.");
1675 network_graph.remove_stale_channels_and_tracking_with_time(
1676 duration_since_epoch.as_secs(),
1677 );
1678 if let Err(e) = kv_store.write(
1679 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1680 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1681 NETWORK_GRAPH_PERSISTENCE_KEY,
1682 network_graph.encode(),
1683 ) {
1684 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
1685 }
1686 have_pruned = true;
1687 }
1688 last_prune_call = Instant::now();
1689 }
1690 if !have_decayed_scorer {
1691 if let Some(ref scorer) = scorer {
1692 let duration_since_epoch = std::time::SystemTime::now()
1693 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1694 .expect("Time should be sometime after 1970");
1695 log_trace!(logger, "Calling time_passed on scorer at startup");
1696 scorer.write_lock().time_passed(duration_since_epoch);
1697 }
1698 have_decayed_scorer = true;
1699 }
1700 if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER {
1701 if let Some(ref scorer) = scorer {
1702 let duration_since_epoch = std::time::SystemTime::now()
1703 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1704 .expect("Time should be sometime after 1970");
1705 log_trace!(logger, "Calling time_passed and persisting scorer");
1706 scorer.write_lock().time_passed(duration_since_epoch);
1707 if let Err(e) = kv_store.write(
1708 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1709 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1710 SCORER_PERSISTENCE_KEY,
1711 scorer.encode(),
1712 ) {
1713 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
1714 }
1715 }
1716 last_scorer_persist_call = Instant::now();
1717 }
1718 if last_sweeper_call.elapsed() > SWEEPER_TIMER {
1719 log_trace!(logger, "Regenerating sweeper spends if necessary");
1720 if let Some(ref sweeper) = sweeper {
1721 let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1722 }
1723 last_sweeper_call = Instant::now();
1724 }
1725 if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
1726 if let Some(om) = &onion_messenger {
1727 log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1728 om.get_om().timer_tick_occurred();
1729 }
1730 last_onion_message_handler_call = Instant::now();
1731 }
1732 if last_ping_call.elapsed() > PING_TIMER {
1733 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1734 peer_manager.as_ref().timer_tick_occurred();
1735 last_ping_call = Instant::now();
1736 }
1737 if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
1738 log_trace!(logger, "Rebroadcasting monitor's pending claims");
1739 chain_monitor.rebroadcast_pending_claims();
1740 last_rebroadcast_call = Instant::now();
1741 }
1742 }
1743
1744 kv_store.write(
1748 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1749 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1750 CHANNEL_MANAGER_PERSISTENCE_KEY,
1751 channel_manager.get_cm().encode(),
1752 )?;
1753 if let Some(ref scorer) = scorer {
1754 kv_store.write(
1755 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1756 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1757 SCORER_PERSISTENCE_KEY,
1758 scorer.encode(),
1759 )?;
1760 }
1761 if let Some(network_graph) = gossip_sync.network_graph() {
1762 kv_store.write(
1763 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1764 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1765 NETWORK_GRAPH_PERSISTENCE_KEY,
1766 network_graph.encode(),
1767 )?;
1768 }
1769 Ok(())
1770 });
1771 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1772 }
1773
1774 pub fn join(mut self) -> Result<(), std::io::Error> {
1784 assert!(self.thread_handle.is_some());
1785 self.join_thread()
1786 }
1787
1788 pub fn stop(mut self) -> Result<(), std::io::Error> {
1798 assert!(self.thread_handle.is_some());
1799 self.stop_and_join_thread()
1800 }
1801
1802 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1803 self.stop_thread.store(true, Ordering::Release);
1804 self.join_thread()
1805 }
1806
1807 fn join_thread(&mut self) -> Result<(), std::io::Error> {
1808 match self.thread_handle.take() {
1809 Some(handle) => handle.join().unwrap(),
1810 None => Ok(()),
1811 }
1812 }
1813}
1814
1815#[cfg(feature = "std")]
1816impl Drop for BackgroundProcessor {
1817 fn drop(&mut self) {
1818 self.stop_and_join_thread().unwrap();
1819 }
1820}
1821
1822#[cfg(all(feature = "std", test))]
1823mod tests {
1824 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1825 use bitcoin::constants::{genesis_block, ChainHash};
1826 use bitcoin::hashes::Hash;
1827 use bitcoin::locktime::absolute::LockTime;
1828 use bitcoin::network::Network;
1829 use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1830 use bitcoin::transaction::Version;
1831 use bitcoin::transaction::{Transaction, TxOut};
1832 use bitcoin::{Amount, ScriptBuf, Txid};
1833 use core::sync::atomic::{AtomicBool, Ordering};
1834 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1835 use lightning::chain::transaction::OutPoint;
1836 use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1837 use lightning::events::{Event, PathFailure, ReplayEvent};
1838 use lightning::ln::channelmanager;
1839 use lightning::ln::channelmanager::{
1840 ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1841 };
1842 use lightning::ln::functional_test_utils::*;
1843 use lightning::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, MessageSendEvent};
1844 use lightning::ln::peer_handler::{
1845 IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1846 };
1847 use lightning::ln::types::ChannelId;
1848 use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1849 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1850 use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1851 use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1852 use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager, NodeSigner};
1853 use lightning::types::features::{ChannelFeatures, NodeFeatures};
1854 use lightning::types::payment::PaymentHash;
1855 use lightning::util::config::UserConfig;
1856 use lightning::util::persist::{
1857 KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
1858 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1859 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1860 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1861 SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1862 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1863 };
1864 use lightning::util::ser::Writeable;
1865 use lightning::util::sweep::{
1866 OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1867 };
1868 use lightning::util::test_utils;
1869 use lightning::{get_event, get_event_msg};
1870 use lightning_liquidity::utils::time::DefaultTimeProvider;
1871 use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
1872 use lightning_persister::fs_store::FilesystemStore;
1873 use lightning_rapid_gossip_sync::RapidGossipSync;
1874 use std::collections::VecDeque;
1875 use std::path::PathBuf;
1876 use std::sync::mpsc::SyncSender;
1877 use std::sync::Arc;
1878 use std::time::Duration;
1879 use std::{env, fs};
1880
1881 const EVENT_DEADLINE: Duration =
1882 Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));
1883
1884 #[derive(Clone, Hash, PartialEq, Eq)]
1885 struct TestDescriptor {}
1886 impl SocketDescriptor for TestDescriptor {
1887 fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
1888 0
1889 }
1890
1891 fn disconnect_socket(&mut self) {}
1892 }
1893
1894 #[cfg(c_bindings)]
1895 type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1896 #[cfg(not(c_bindings))]
1897 type LockingWrapper<T> = std::sync::Mutex<T>;
1898
1899 type ChannelManager = channelmanager::ChannelManager<
1900 Arc<ChainMonitor>,
1901 Arc<test_utils::TestBroadcaster>,
1902 Arc<KeysManager>,
1903 Arc<KeysManager>,
1904 Arc<KeysManager>,
1905 Arc<test_utils::TestFeeEstimator>,
1906 Arc<
1907 DefaultRouter<
1908 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1909 Arc<test_utils::TestLogger>,
1910 Arc<KeysManager>,
1911 Arc<LockingWrapper<TestScorer>>,
1912 (),
1913 TestScorer,
1914 >,
1915 >,
1916 Arc<
1917 DefaultMessageRouter<
1918 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1919 Arc<test_utils::TestLogger>,
1920 Arc<KeysManager>,
1921 >,
1922 >,
1923 Arc<test_utils::TestLogger>,
1924 >;
1925
1926 type ChainMonitor = chainmonitor::ChainMonitor<
1927 InMemorySigner,
1928 Arc<test_utils::TestChainSource>,
1929 Arc<test_utils::TestBroadcaster>,
1930 Arc<test_utils::TestFeeEstimator>,
1931 Arc<test_utils::TestLogger>,
1932 Arc<Persister>,
1933 Arc<KeysManager>,
1934 >;
1935
1936 type PGS = Arc<
1937 P2PGossipSync<
1938 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1939 Arc<test_utils::TestChainSource>,
1940 Arc<test_utils::TestLogger>,
1941 >,
1942 >;
1943 type RGS = Arc<
1944 RapidGossipSync<
1945 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1946 Arc<test_utils::TestLogger>,
1947 >,
1948 >;
1949
1950 type OM = OnionMessenger<
1951 Arc<KeysManager>,
1952 Arc<KeysManager>,
1953 Arc<test_utils::TestLogger>,
1954 Arc<ChannelManager>,
1955 Arc<
1956 DefaultMessageRouter<
1957 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1958 Arc<test_utils::TestLogger>,
1959 Arc<KeysManager>,
1960 >,
1961 >,
1962 IgnoringMessageHandler,
1963 Arc<ChannelManager>,
1964 IgnoringMessageHandler,
1965 IgnoringMessageHandler,
1966 >;
1967
1968 type LM = LiquidityManagerSync<
1969 Arc<KeysManager>,
1970 Arc<KeysManager>,
1971 Arc<ChannelManager>,
1972 Arc<dyn Filter + Sync + Send>,
1973 Arc<Persister>,
1974 DefaultTimeProvider,
1975 Arc<test_utils::TestBroadcaster>,
1976 >;
1977
1978 struct Node {
1979 node: Arc<ChannelManager>,
1980 messenger: Arc<OM>,
1981 p2p_gossip_sync: PGS,
1982 rapid_gossip_sync: RGS,
1983 peer_manager: Arc<
1984 PeerManager<
1985 TestDescriptor,
1986 Arc<test_utils::TestChannelMessageHandler>,
1987 Arc<test_utils::TestRoutingMessageHandler>,
1988 Arc<OM>,
1989 Arc<test_utils::TestLogger>,
1990 IgnoringMessageHandler,
1991 Arc<KeysManager>,
1992 IgnoringMessageHandler,
1993 >,
1994 >,
1995 liquidity_manager: Arc<LM>,
1996 chain_monitor: Arc<ChainMonitor>,
1997 kv_store: Arc<Persister>,
1998 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1999 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2000 logger: Arc<test_utils::TestLogger>,
2001 best_block: BestBlock,
2002 scorer: Arc<LockingWrapper<TestScorer>>,
2003 sweeper: Arc<
2004 OutputSweeperSync<
2005 Arc<test_utils::TestBroadcaster>,
2006 Arc<TestWallet>,
2007 Arc<test_utils::TestFeeEstimator>,
2008 Arc<test_utils::TestChainSource>,
2009 Arc<Persister>,
2010 Arc<test_utils::TestLogger>,
2011 Arc<KeysManager>,
2012 >,
2013 >,
2014 }
2015
2016 impl Node {
2017 fn p2p_gossip_sync(
2018 &self,
2019 ) -> GossipSync<
2020 PGS,
2021 RGS,
2022 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2023 Arc<test_utils::TestChainSource>,
2024 Arc<test_utils::TestLogger>,
2025 > {
2026 GossipSync::P2P(Arc::clone(&self.p2p_gossip_sync))
2027 }
2028
2029 fn rapid_gossip_sync(
2030 &self,
2031 ) -> GossipSync<
2032 PGS,
2033 RGS,
2034 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2035 Arc<test_utils::TestChainSource>,
2036 Arc<test_utils::TestLogger>,
2037 > {
2038 GossipSync::Rapid(Arc::clone(&self.rapid_gossip_sync))
2039 }
2040
2041 fn no_gossip_sync(
2042 &self,
2043 ) -> GossipSync<
2044 PGS,
2045 RGS,
2046 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2047 Arc<test_utils::TestChainSource>,
2048 Arc<test_utils::TestLogger>,
2049 > {
2050 GossipSync::None
2051 }
2052 }
2053
2054 impl Drop for Node {
2055 fn drop(&mut self) {
2056 let data_dir = self.kv_store.get_data_dir();
2057 match fs::remove_dir_all(data_dir.clone()) {
2058 Err(e) => {
2059 println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
2060 },
2061 _ => {},
2062 }
2063 }
2064 }
2065
2066 struct Persister {
2067 graph_error: Option<(std::io::ErrorKind, &'static str)>,
2068 graph_persistence_notifier: Option<SyncSender<()>>,
2069 manager_error: Option<(std::io::ErrorKind, &'static str)>,
2070 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
2071 kv_store: FilesystemStore,
2072 }
2073
2074 impl Persister {
2075 fn new(data_dir: PathBuf) -> Self {
2076 let kv_store = FilesystemStore::new(data_dir);
2077 Self {
2078 graph_error: None,
2079 graph_persistence_notifier: None,
2080 manager_error: None,
2081 scorer_error: None,
2082 kv_store,
2083 }
2084 }
2085
2086 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2087 Self { graph_error: Some((error, message)), ..self }
2088 }
2089
2090 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
2091 Self { graph_persistence_notifier: Some(sender), ..self }
2092 }
2093
2094 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2095 Self { manager_error: Some((error, message)), ..self }
2096 }
2097
2098 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2099 Self { scorer_error: Some((error, message)), ..self }
2100 }
2101
2102 pub fn get_data_dir(&self) -> PathBuf {
2103 self.kv_store.get_data_dir()
2104 }
2105 }
2106
2107 impl KVStoreSync for Persister {
2108 fn read(
2109 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2110 ) -> lightning::io::Result<Vec<u8>> {
2111 self.kv_store.read(primary_namespace, secondary_namespace, key)
2112 }
2113
2114 fn write(
2115 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
2116 ) -> lightning::io::Result<()> {
2117 if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
2118 && secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
2119 && key == CHANNEL_MANAGER_PERSISTENCE_KEY
2120 {
2121 if let Some((error, message)) = self.manager_error {
2122 return Err(std::io::Error::new(error, message).into());
2123 }
2124 }
2125
2126 if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
2127 && secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
2128 && key == NETWORK_GRAPH_PERSISTENCE_KEY
2129 {
2130 if let Some(sender) = &self.graph_persistence_notifier {
2131 match sender.send(()) {
2132 Ok(()) => {},
2133 Err(std::sync::mpsc::SendError(())) => {
2134 println!("Persister failed to notify as receiver went away.")
2135 },
2136 }
2137 };
2138
2139 if let Some((error, message)) = self.graph_error {
2140 return Err(std::io::Error::new(error, message).into());
2141 }
2142 }
2143
2144 if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
2145 && secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
2146 && key == SCORER_PERSISTENCE_KEY
2147 {
2148 if let Some((error, message)) = self.scorer_error {
2149 return Err(std::io::Error::new(error, message).into());
2150 }
2151 }
2152
2153 self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
2154 }
2155
2156 fn remove(
2157 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
2158 ) -> lightning::io::Result<()> {
2159 self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
2160 }
2161
2162 fn list(
2163 &self, primary_namespace: &str, secondary_namespace: &str,
2164 ) -> lightning::io::Result<Vec<String>> {
2165 self.kv_store.list(primary_namespace, secondary_namespace)
2166 }
2167 }
2168
2169 struct TestScorer {
2170 event_expectations: Option<VecDeque<TestResult>>,
2171 }
2172
2173 #[derive(Debug)]
2174 enum TestResult {
2175 PaymentFailure { path: Path, short_channel_id: u64 },
2176 PaymentSuccess { path: Path },
2177 ProbeFailure { path: Path },
2178 ProbeSuccess { path: Path },
2179 }
2180
2181 impl TestScorer {
2182 fn new() -> Self {
2183 Self { event_expectations: None }
2184 }
2185
2186 fn expect(&mut self, expectation: TestResult) {
2187 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
2188 }
2189 }
2190
2191 impl lightning::util::ser::Writeable for TestScorer {
2192 fn write<W: lightning::util::ser::Writer>(
2193 &self, _: &mut W,
2194 ) -> Result<(), lightning::io::Error> {
2195 Ok(())
2196 }
2197 }
2198
2199 impl ScoreLookUp for TestScorer {
2200 type ScoreParams = ();
2201 fn channel_penalty_msat(
2202 &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
2203 _score_params: &Self::ScoreParams,
2204 ) -> u64 {
2205 unimplemented!();
2206 }
2207 }
2208
2209 impl ScoreUpdate for TestScorer {
2210 fn payment_path_failed(
2211 &mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
2212 ) {
2213 if let Some(expectations) = &mut self.event_expectations {
2214 match expectations.pop_front().unwrap() {
2215 TestResult::PaymentFailure { path, short_channel_id } => {
2216 assert_eq!(actual_path, &path);
2217 assert_eq!(actual_short_channel_id, short_channel_id);
2218 },
2219 TestResult::PaymentSuccess { path } => {
2220 panic!("Unexpected successful payment path: {:?}", path)
2221 },
2222 TestResult::ProbeFailure { path } => {
2223 panic!("Unexpected probe failure: {:?}", path)
2224 },
2225 TestResult::ProbeSuccess { path } => {
2226 panic!("Unexpected probe success: {:?}", path)
2227 },
2228 }
2229 }
2230 }
2231
2232 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
2233 if let Some(expectations) = &mut self.event_expectations {
2234 match expectations.pop_front().unwrap() {
2235 TestResult::PaymentFailure { path, .. } => {
2236 panic!("Unexpected payment path failure: {:?}", path)
2237 },
2238 TestResult::PaymentSuccess { path } => {
2239 assert_eq!(actual_path, &path);
2240 },
2241 TestResult::ProbeFailure { path } => {
2242 panic!("Unexpected probe failure: {:?}", path)
2243 },
2244 TestResult::ProbeSuccess { path } => {
2245 panic!("Unexpected probe success: {:?}", path)
2246 },
2247 }
2248 }
2249 }
2250
2251 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
2252 if let Some(expectations) = &mut self.event_expectations {
2253 match expectations.pop_front().unwrap() {
2254 TestResult::PaymentFailure { path, .. } => {
2255 panic!("Unexpected payment path failure: {:?}", path)
2256 },
2257 TestResult::PaymentSuccess { path } => {
2258 panic!("Unexpected payment path success: {:?}", path)
2259 },
2260 TestResult::ProbeFailure { path } => {
2261 assert_eq!(actual_path, &path);
2262 },
2263 TestResult::ProbeSuccess { path } => {
2264 panic!("Unexpected probe success: {:?}", path)
2265 },
2266 }
2267 }
2268 }
2269 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
2270 if let Some(expectations) = &mut self.event_expectations {
2271 match expectations.pop_front().unwrap() {
2272 TestResult::PaymentFailure { path, .. } => {
2273 panic!("Unexpected payment path failure: {:?}", path)
2274 },
2275 TestResult::PaymentSuccess { path } => {
2276 panic!("Unexpected payment path success: {:?}", path)
2277 },
2278 TestResult::ProbeFailure { path } => {
2279 panic!("Unexpected probe failure: {:?}", path)
2280 },
2281 TestResult::ProbeSuccess { path } => {
2282 assert_eq!(actual_path, &path);
2283 },
2284 }
2285 }
2286 }
2287 fn time_passed(&mut self, _: Duration) {}
2288 }
2289
2290 #[cfg(c_bindings)]
2291 impl lightning::routing::scoring::Score for TestScorer {}
2292
2293 impl Drop for TestScorer {
2294 fn drop(&mut self) {
2295 if std::thread::panicking() {
2296 return;
2297 }
2298
2299 if let Some(event_expectations) = &self.event_expectations {
2300 if !event_expectations.is_empty() {
2301 panic!("Unsatisfied event expectations: {:?}", event_expectations);
2302 }
2303 }
2304 }
2305 }
2306
2307 struct TestWallet {}
2308
2309 impl ChangeDestinationSourceSync for TestWallet {
2310 fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
2311 Ok(ScriptBuf::new())
2312 }
2313 }
2314
2315 fn get_full_filepath(filepath: String, filename: String) -> String {
2316 let mut path = PathBuf::from(filepath);
2317 path.push(filename);
2318 path.to_str().unwrap().to_string()
2319 }
2320
2321 fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
2322 let persist_temp_path = env::temp_dir().join(persist_dir);
2323 let persist_dir = persist_temp_path.to_string_lossy().to_string();
2324 let network = Network::Bitcoin;
2325 let mut nodes = Vec::new();
2326 for i in 0..num_nodes {
2327 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
2328 let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
2329 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
2330 let genesis_block = genesis_block(network);
2331 let network_graph = Arc::new(NetworkGraph::new(network, Arc::clone(&logger)));
2332 let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
2333 let now = Duration::from_secs(genesis_block.header.time as u64);
2334 let seed = [i as u8; 32];
2335 let keys_manager =
2336 Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2337 let router = Arc::new(DefaultRouter::new(
2338 Arc::clone(&network_graph),
2339 Arc::clone(&logger),
2340 Arc::clone(&keys_manager),
2341 Arc::clone(&scorer),
2342 Default::default(),
2343 ));
2344 let msg_router = Arc::new(DefaultMessageRouter::new(
2345 Arc::clone(&network_graph),
2346 Arc::clone(&keys_manager),
2347 ));
2348 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
2349 let kv_store =
2350 Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
2351 let now = Duration::from_secs(genesis_block.header.time as u64);
2352 let keys_manager =
2353 Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2354 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2355 Some(Arc::clone(&chain_source)),
2356 Arc::clone(&tx_broadcaster),
2357 Arc::clone(&logger),
2358 Arc::clone(&fee_estimator),
2359 Arc::clone(&kv_store),
2360 Arc::clone(&keys_manager),
2361 keys_manager.get_peer_storage_key(),
2362 ));
2363 let best_block = BestBlock::from_network(network);
2364 let params = ChainParameters { network, best_block };
2365 let manager = Arc::new(ChannelManager::new(
2366 Arc::clone(&fee_estimator),
2367 Arc::clone(&chain_monitor),
2368 Arc::clone(&tx_broadcaster),
2369 Arc::clone(&router),
2370 Arc::clone(&msg_router),
2371 Arc::clone(&logger),
2372 Arc::clone(&keys_manager),
2373 Arc::clone(&keys_manager),
2374 Arc::clone(&keys_manager),
2375 UserConfig::default(),
2376 params,
2377 genesis_block.header.time,
2378 ));
2379 let messenger = Arc::new(OnionMessenger::new(
2380 Arc::clone(&keys_manager),
2381 Arc::clone(&keys_manager),
2382 Arc::clone(&logger),
2383 Arc::clone(&manager),
2384 Arc::clone(&msg_router),
2385 IgnoringMessageHandler {},
2386 Arc::clone(&manager),
2387 IgnoringMessageHandler {},
2388 IgnoringMessageHandler {},
2389 ));
2390 let wallet = Arc::new(TestWallet {});
2391 let sweeper = Arc::new(OutputSweeperSync::new(
2392 best_block,
2393 Arc::clone(&tx_broadcaster),
2394 Arc::clone(&fee_estimator),
2395 None::<Arc<test_utils::TestChainSource>>,
2396 Arc::clone(&keys_manager),
2397 wallet,
2398 Arc::clone(&kv_store),
2399 Arc::clone(&logger),
2400 ));
2401 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
2402 Arc::clone(&network_graph),
2403 Some(Arc::clone(&chain_source)),
2404 Arc::clone(&logger),
2405 ));
2406 let rapid_gossip_sync =
2407 Arc::new(RapidGossipSync::new(Arc::clone(&network_graph), Arc::clone(&logger)));
2408 let msg_handler = MessageHandler {
2409 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
2410 ChainHash::using_genesis_block(Network::Testnet),
2411 )),
2412 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
2413 onion_message_handler: Arc::clone(&messenger),
2414 custom_message_handler: IgnoringMessageHandler {},
2415 send_only_message_handler: IgnoringMessageHandler {},
2416 };
2417 let peer_manager = Arc::new(PeerManager::new(
2418 msg_handler,
2419 0,
2420 &seed,
2421 Arc::clone(&logger),
2422 Arc::clone(&keys_manager),
2423 ));
2424 let liquidity_manager = Arc::new(
2425 LiquidityManagerSync::new(
2426 Arc::clone(&keys_manager),
2427 Arc::clone(&keys_manager),
2428 Arc::clone(&manager),
2429 None,
2430 None,
2431 Arc::clone(&kv_store),
2432 Arc::clone(&tx_broadcaster),
2433 None,
2434 None,
2435 )
2436 .unwrap(),
2437 );
2438 let node = Node {
2439 node: manager,
2440 p2p_gossip_sync,
2441 rapid_gossip_sync,
2442 peer_manager,
2443 liquidity_manager,
2444 chain_monitor,
2445 kv_store,
2446 tx_broadcaster,
2447 network_graph,
2448 logger,
2449 best_block,
2450 scorer,
2451 sweeper,
2452 messenger,
2453 };
2454 nodes.push(node);
2455 }
2456
2457 for i in 0..num_nodes {
2458 for j in (i + 1)..num_nodes {
2459 let init_i = Init {
2460 features: nodes[j].node.init_features(),
2461 networks: None,
2462 remote_network_address: None,
2463 };
2464 nodes[i]
2465 .node
2466 .peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
2467 .unwrap();
2468 let init_j = Init {
2469 features: nodes[i].node.init_features(),
2470 networks: None,
2471 remote_network_address: None,
2472 };
2473 nodes[j]
2474 .node
2475 .peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
2476 .unwrap();
2477 }
2478 }
2479
2480 (persist_dir, nodes)
2481 }
2482
2483 macro_rules! open_channel {
2484 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2485 begin_open_channel!($node_a, $node_b, $channel_value);
2486 let events = $node_a.node.get_and_clear_pending_events();
2487 assert_eq!(events.len(), 1);
2488 let (temporary_channel_id, tx) =
2489 handle_funding_generation_ready!(events[0], $channel_value);
2490 $node_a
2491 .node
2492 .funding_transaction_generated(
2493 temporary_channel_id,
2494 $node_b.node.get_our_node_id(),
2495 tx.clone(),
2496 )
2497 .unwrap();
2498 let msg_a = get_event_msg!(
2499 $node_a,
2500 MessageSendEvent::SendFundingCreated,
2501 $node_b.node.get_our_node_id()
2502 );
2503 $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2504 get_event!($node_b, Event::ChannelPending);
2505 let msg_b = get_event_msg!(
2506 $node_b,
2507 MessageSendEvent::SendFundingSigned,
2508 $node_a.node.get_our_node_id()
2509 );
2510 $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2511 get_event!($node_a, Event::ChannelPending);
2512 tx
2513 }};
2514 }
2515
2516 macro_rules! begin_open_channel {
2517 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2518 $node_a
2519 .node
2520 .create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
2521 .unwrap();
2522 let msg_a = get_event_msg!(
2523 $node_a,
2524 MessageSendEvent::SendOpenChannel,
2525 $node_b.node.get_our_node_id()
2526 );
2527 $node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
2528 let msg_b = get_event_msg!(
2529 $node_b,
2530 MessageSendEvent::SendAcceptChannel,
2531 $node_a.node.get_our_node_id()
2532 );
2533 $node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
2534 }};
2535 }
2536
2537 macro_rules! handle_funding_generation_ready {
2538 ($event: expr, $channel_value: expr) => {{
2539 match $event {
2540 Event::FundingGenerationReady {
2541 temporary_channel_id,
2542 channel_value_satoshis,
2543 ref output_script,
2544 user_channel_id,
2545 ..
2546 } => {
2547 assert_eq!(channel_value_satoshis, $channel_value);
2548 assert_eq!(user_channel_id, 42);
2549
2550 let tx = Transaction {
2551 version: Version::ONE,
2552 lock_time: LockTime::ZERO,
2553 input: Vec::new(),
2554 output: vec![TxOut {
2555 value: Amount::from_sat(channel_value_satoshis),
2556 script_pubkey: output_script.clone(),
2557 }],
2558 };
2559 (temporary_channel_id, tx)
2560 },
2561 _ => panic!("Unexpected event"),
2562 }
2563 }};
2564 }
2565
2566 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
2567 for i in 1..=depth {
2568 let prev_blockhash = node.best_block.block_hash;
2569 let height = node.best_block.height + 1;
2570 let header = create_dummy_header(prev_blockhash, height);
2571 let txdata = vec![(0, tx)];
2572 node.best_block = BestBlock::new(header.block_hash(), height);
2573 match i {
2574 1 => {
2575 node.node.transactions_confirmed(&header, &txdata, height);
2576 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
2577 node.sweeper.transactions_confirmed(&header, &txdata, height);
2578 },
2579 x if x == depth => {
2580 let block = (genesis_block(Network::Bitcoin), height);
2584 node.tx_broadcaster.blocks.lock().unwrap().push(block);
2585 node.node.best_block_updated(&header, height);
2586 node.chain_monitor.best_block_updated(&header, height);
2587 node.sweeper.best_block_updated(&header, height);
2588 },
2589 _ => {},
2590 }
2591 }
2592 }
2593
2594 fn advance_chain(node: &mut Node, num_blocks: u32) {
2595 for i in 1..=num_blocks {
2596 let prev_blockhash = node.best_block.block_hash;
2597 let height = node.best_block.height + 1;
2598 let header = create_dummy_header(prev_blockhash, height);
2599 node.best_block = BestBlock::new(header.block_hash(), height);
2600 if i == num_blocks {
2601 let block = (genesis_block(Network::Bitcoin), height);
2605 node.tx_broadcaster.blocks.lock().unwrap().push(block);
2606 node.node.best_block_updated(&header, height);
2607 node.chain_monitor.best_block_updated(&header, height);
2608 node.sweeper.best_block_updated(&header, height);
2609 }
2610 }
2611 }
2612
2613 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
2614 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
2615 }
2616
2617 #[test]
2618 fn test_background_processor() {
2619 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
2623
2624 let tx = open_channel!(nodes[0], nodes[1], 100000);
2628
2629 let data_dir = nodes[0].kv_store.get_data_dir();
2631 let persister = Arc::new(Persister::new(data_dir));
2632 let event_handler = |_: _| Ok(());
2633 let bg_processor = BackgroundProcessor::start(
2634 persister,
2635 event_handler,
2636 Arc::clone(&nodes[0].chain_monitor),
2637 Arc::clone(&nodes[0].node),
2638 Some(Arc::clone(&nodes[0].messenger)),
2639 nodes[0].p2p_gossip_sync(),
2640 Arc::clone(&nodes[0].peer_manager),
2641 Some(Arc::clone(&nodes[0].liquidity_manager)),
2642 Some(Arc::clone(&nodes[0].sweeper)),
2643 Arc::clone(&nodes[0].logger),
2644 Some(Arc::clone(&nodes[0].scorer)),
2645 );
2646
2647 macro_rules! check_persisted_data {
2648 ($node: expr, $filepath: expr) => {
2649 let mut expected_bytes = Vec::new();
2650 loop {
2651 expected_bytes.clear();
2652 match $node.write(&mut expected_bytes) {
2653 Ok(()) => match std::fs::read($filepath) {
2654 Ok(bytes) => {
2655 if bytes == expected_bytes {
2656 break;
2657 } else {
2658 continue;
2659 }
2660 },
2661 Err(_) => continue,
2662 },
2663 Err(e) => panic!("Unexpected error: {}", e),
2664 }
2665 }
2666 };
2667 }
2668
2669 let filepath =
2671 get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2672 check_persisted_data!(nodes[0].node, filepath.clone());
2673
2674 loop {
2675 if !nodes[0].node.get_event_or_persist_condvar_value() {
2676 break;
2677 }
2678 }
2679
2680 let error_message = "Channel force-closed";
2682 nodes[0]
2683 .node
2684 .force_close_broadcasting_latest_txn(
2685 &ChannelId::v1_from_funding_outpoint(OutPoint {
2686 txid: tx.compute_txid(),
2687 index: 0,
2688 }),
2689 &nodes[1].node.get_our_node_id(),
2690 error_message.to_string(),
2691 )
2692 .unwrap();
2693
2694 check_persisted_data!(nodes[0].node, filepath.clone());
2696 loop {
2697 if !nodes[0].node.get_event_or_persist_condvar_value() {
2698 break;
2699 }
2700 }
2701
2702 let filepath =
2704 get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2705 check_persisted_data!(nodes[0].network_graph, filepath.clone());
2706
2707 let filepath =
2709 get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2710 check_persisted_data!(nodes[0].scorer, filepath.clone());
2711
2712 if !std::thread::panicking() {
2713 bg_processor.stop().unwrap();
2714 }
2715 }
2716
2717 #[test]
2718 fn test_timer_tick_called() {
2719 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
2725 let data_dir = nodes[0].kv_store.get_data_dir();
2726 let persister = Arc::new(Persister::new(data_dir));
2727 let event_handler = |_: _| Ok(());
2728 let bg_processor = BackgroundProcessor::start(
2729 persister,
2730 event_handler,
2731 Arc::clone(&nodes[0].chain_monitor),
2732 Arc::clone(&nodes[0].node),
2733 Some(Arc::clone(&nodes[0].messenger)),
2734 nodes[0].no_gossip_sync(),
2735 Arc::clone(&nodes[0].peer_manager),
2736 Some(Arc::clone(&nodes[0].liquidity_manager)),
2737 Some(Arc::clone(&nodes[0].sweeper)),
2738 Arc::clone(&nodes[0].logger),
2739 Some(Arc::clone(&nodes[0].scorer)),
2740 );
2741 loop {
2742 let log_entries = nodes[0].logger.lines.lock().unwrap();
2743 let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
2744 let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
2745 let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
2746 let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
2747 if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
2748 && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
2749 && log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
2750 && log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
2751 {
2752 break;
2753 }
2754 }
2755
2756 if !std::thread::panicking() {
2757 bg_processor.stop().unwrap();
2758 }
2759 }
2760
2761 #[test]
2762 fn test_channel_manager_persist_error() {
2763 let (_, nodes) = create_nodes(2, "test_persist_error");
2765 open_channel!(nodes[0], nodes[1], 100000);
2766
2767 let data_dir = nodes[0].kv_store.get_data_dir();
2768 let persister = Arc::new(
2769 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2770 );
2771 let event_handler = |_: _| Ok(());
2772 let bg_processor = BackgroundProcessor::start(
2773 persister,
2774 event_handler,
2775 Arc::clone(&nodes[0].chain_monitor),
2776 Arc::clone(&nodes[0].node),
2777 Some(Arc::clone(&nodes[0].messenger)),
2778 nodes[0].no_gossip_sync(),
2779 Arc::clone(&nodes[0].peer_manager),
2780 Some(Arc::clone(&nodes[0].liquidity_manager)),
2781 Some(Arc::clone(&nodes[0].sweeper)),
2782 Arc::clone(&nodes[0].logger),
2783 Some(Arc::clone(&nodes[0].scorer)),
2784 );
2785 match bg_processor.join() {
2786 Ok(_) => panic!("Expected error persisting manager"),
2787 Err(e) => {
2788 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2789 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2790 },
2791 }
2792 }
2793
2794 #[tokio::test]
2795 async fn test_channel_manager_persist_error_async() {
2796 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
2798 open_channel!(nodes[0], nodes[1], 100000);
2799
2800 let data_dir = nodes[0].kv_store.get_data_dir();
2801 let kv_store_sync = Arc::new(
2802 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2803 );
2804 let kv_store = KVStoreSyncWrapper(kv_store_sync);
2805
2806 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
2808 &*(nodes[0].liquidity_manager.get_lm_async()
2809 as *const LiquidityManager<_, _, _, _, _, _, _>)
2810 as &'static LiquidityManager<_, _, _, _, _, _, _>
2811 };
2812 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2813 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2814 as &'static OutputSweeper<_, _, _, _, _, _, _>
2815 };
2816
2817 let bp_future = super::process_events_async(
2818 kv_store,
2819 |_: _| async { Ok(()) },
2820 Arc::clone(&nodes[0].chain_monitor),
2821 Arc::clone(&nodes[0].node),
2822 Some(Arc::clone(&nodes[0].messenger)),
2823 nodes[0].rapid_gossip_sync(),
2824 Arc::clone(&nodes[0].peer_manager),
2825 Some(lm_async),
2826 Some(sweeper_async),
2827 Arc::clone(&nodes[0].logger),
2828 Some(Arc::clone(&nodes[0].scorer)),
2829 move |dur: Duration| {
2830 Box::pin(async move {
2831 tokio::time::sleep(dur).await;
2832 false })
2834 },
2835 false,
2836 || Some(Duration::ZERO),
2837 );
2838 match bp_future.await {
2839 Ok(_) => panic!("Expected error persisting manager"),
2840 Err(e) => {
2841 assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2842 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2843 },
2844 }
2845 }
2846
2847 #[test]
2848 fn test_network_graph_persist_error() {
2849 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2851 let data_dir = nodes[0].kv_store.get_data_dir();
2852 let persister =
2853 Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2854 let event_handler = |_: _| Ok(());
2855 let bg_processor = BackgroundProcessor::start(
2856 persister,
2857 event_handler,
2858 Arc::clone(&nodes[0].chain_monitor),
2859 Arc::clone(&nodes[0].node),
2860 Some(Arc::clone(&nodes[0].messenger)),
2861 nodes[0].p2p_gossip_sync(),
2862 Arc::clone(&nodes[0].peer_manager),
2863 Some(Arc::clone(&nodes[0].liquidity_manager)),
2864 Some(Arc::clone(&nodes[0].sweeper)),
2865 Arc::clone(&nodes[0].logger),
2866 Some(Arc::clone(&nodes[0].scorer)),
2867 );
2868
2869 match bg_processor.stop() {
2870 Ok(_) => panic!("Expected error persisting network graph"),
2871 Err(e) => {
2872 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2873 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2874 },
2875 }
2876 }
2877
2878 #[test]
2879 fn test_scorer_persist_error() {
2880 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2882 let data_dir = nodes[0].kv_store.get_data_dir();
2883 let persister =
2884 Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2885 let event_handler = |_: _| Ok(());
2886 let bg_processor = BackgroundProcessor::start(
2887 persister,
2888 event_handler,
2889 Arc::clone(&nodes[0].chain_monitor),
2890 Arc::clone(&nodes[0].node),
2891 Some(Arc::clone(&nodes[0].messenger)),
2892 nodes[0].no_gossip_sync(),
2893 Arc::clone(&nodes[0].peer_manager),
2894 Some(Arc::clone(&nodes[0].liquidity_manager)),
2895 Some(Arc::clone(&nodes[0].sweeper)),
2896 Arc::clone(&nodes[0].logger),
2897 Some(Arc::clone(&nodes[0].scorer)),
2898 );
2899
2900 match bg_processor.stop() {
2901 Ok(_) => panic!("Expected error persisting scorer"),
2902 Err(e) => {
2903 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2904 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2905 },
2906 }
2907 }
2908
2909 #[test]
2910 fn test_background_event_handling() {
2911 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2912 let node_0_id = nodes[0].node.get_our_node_id();
2913 let node_1_id = nodes[1].node.get_our_node_id();
2914
2915 let channel_value = 100000;
2916 let data_dir = nodes[0].kv_store.get_data_dir();
2917 let persister = Arc::new(Persister::new(data_dir.clone()));
2918
2919 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2921 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2922 let event_handler = move |event: Event| {
2923 match event {
2924 Event::FundingGenerationReady { .. } => funding_generation_send
2925 .send(handle_funding_generation_ready!(event, channel_value))
2926 .unwrap(),
2927 Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2928 Event::ChannelReady { .. } => {},
2929 _ => panic!("Unexpected event: {:?}", event),
2930 }
2931 Ok(())
2932 };
2933
2934 let bg_processor = BackgroundProcessor::start(
2935 persister,
2936 event_handler,
2937 Arc::clone(&nodes[0].chain_monitor),
2938 Arc::clone(&nodes[0].node),
2939 Some(Arc::clone(&nodes[0].messenger)),
2940 nodes[0].no_gossip_sync(),
2941 Arc::clone(&nodes[0].peer_manager),
2942 Some(Arc::clone(&nodes[0].liquidity_manager)),
2943 Some(Arc::clone(&nodes[0].sweeper)),
2944 Arc::clone(&nodes[0].logger),
2945 Some(Arc::clone(&nodes[0].scorer)),
2946 );
2947
2948 begin_open_channel!(nodes[0], nodes[1], channel_value);
2950 let (temporary_channel_id, funding_tx) = funding_generation_recv
2951 .recv_timeout(EVENT_DEADLINE)
2952 .expect("FundingGenerationReady not handled within deadline");
2953 nodes[0]
2954 .node
2955 .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2956 .unwrap();
2957 let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2958 nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2959 get_event!(nodes[1], Event::ChannelPending);
2960 let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2961 nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2962 channel_pending_recv
2963 .recv_timeout(EVENT_DEADLINE)
2964 .expect("ChannelPending not handled within deadline");
2965
2966 confirm_transaction(&mut nodes[0], &funding_tx);
2968 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2969 confirm_transaction(&mut nodes[1], &funding_tx);
2970 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2971 nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2972 let _as_channel_update =
2973 get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2974 nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2975 let _bs_channel_update =
2976 get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2977 let broadcast_funding =
2978 nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2979 assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2980 assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2981
2982 if !std::thread::panicking() {
2983 bg_processor.stop().unwrap();
2984 }
2985
2986 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2988 let event_handler = move |event: Event| {
2989 match event {
2990 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2991 Event::ChannelReady { .. } => {},
2992 Event::ChannelClosed { .. } => {},
2993 _ => panic!("Unexpected event: {:?}", event),
2994 }
2995 Ok(())
2996 };
2997 let persister = Arc::new(Persister::new(data_dir));
2998 let bg_processor = BackgroundProcessor::start(
2999 persister,
3000 event_handler,
3001 Arc::clone(&nodes[0].chain_monitor),
3002 Arc::clone(&nodes[0].node),
3003 Some(Arc::clone(&nodes[0].messenger)),
3004 nodes[0].no_gossip_sync(),
3005 Arc::clone(&nodes[0].peer_manager),
3006 Some(Arc::clone(&nodes[0].liquidity_manager)),
3007 Some(Arc::clone(&nodes[0].sweeper)),
3008 Arc::clone(&nodes[0].logger),
3009 Some(Arc::clone(&nodes[0].scorer)),
3010 );
3011
3012 let error_message = "Channel force-closed";
3014 nodes[0]
3015 .node
3016 .force_close_broadcasting_latest_txn(
3017 &nodes[0].node.list_channels()[0].channel_id,
3018 &node_1_id,
3019 error_message.to_string(),
3020 )
3021 .unwrap();
3022 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
3023 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
3024
3025 let event =
3026 receiver.recv_timeout(EVENT_DEADLINE).expect("Events not handled within deadline");
3027 match event {
3028 Event::SpendableOutputs { outputs, channel_id } => {
3029 nodes[0]
3030 .sweeper
3031 .track_spendable_outputs(outputs, channel_id, false, Some(153))
3032 .unwrap();
3033 },
3034 _ => panic!("Unexpected event: {:?}", event),
3035 }
3036
3037 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3039 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3040 if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
3041 assert!(!tracked_output.is_spent_in(&sweep_tx_0));
3042 match tracked_output.status {
3043 OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
3044 assert_eq!(delayed_until_height, Some(153));
3045 },
3046 _ => panic!("Unexpected status"),
3047 }
3048 }
3049
3050 advance_chain(&mut nodes[0], 3);
3051
3052 let tx_broadcaster = Arc::clone(&nodes[0].tx_broadcaster);
3053 let wait_for_sweep_tx = || -> Transaction {
3054 loop {
3055 let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
3056 if let Some(sweep_tx) = sweep_tx {
3057 return sweep_tx;
3058 }
3059
3060 std::thread::sleep(Duration::from_millis(10));
3061 }
3062 };
3063
3064 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3066 let sweep_tx_0 = wait_for_sweep_tx();
3067 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3068 match tracked_output.status {
3069 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3070 assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
3071 },
3072 _ => panic!("Unexpected status"),
3073 }
3074
3075 advance_chain(&mut nodes[0], 1);
3077 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3078 let sweep_tx_1 = wait_for_sweep_tx();
3079 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3080 match tracked_output.status {
3081 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3082 assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
3083 },
3084 _ => panic!("Unexpected status"),
3085 }
3086 assert_ne!(sweep_tx_0, sweep_tx_1);
3087
3088 advance_chain(&mut nodes[0], 1);
3089 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3090 let sweep_tx_2 = wait_for_sweep_tx();
3091 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3092 match tracked_output.status {
3093 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3094 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3095 },
3096 _ => panic!("Unexpected status"),
3097 }
3098 assert_ne!(sweep_tx_0, sweep_tx_2);
3099 assert_ne!(sweep_tx_1, sweep_tx_2);
3100
3101 confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
3103 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3104 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3105 match tracked_output.status {
3106 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3107 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3108 },
3109 _ => panic!("Unexpected status"),
3110 }
3111
3112 let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
3116 nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
3117
3118 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3119 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3120 match tracked_output.status {
3121 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3122 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3123 },
3124 _ => panic!("Unexpected status"),
3125 }
3126
3127 confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, PRUNE_DELAY_BLOCKS);
3130 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
3131
3132 if !std::thread::panicking() {
3133 bg_processor.stop().unwrap();
3134 }
3135 }
3136
3137 #[test]
3138 fn test_event_handling_failures_are_replayed() {
3139 let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
3140 let channel_value = 100000;
3141 let data_dir = nodes[0].kv_store.get_data_dir();
3142 let persister = Arc::new(Persister::new(data_dir.clone()));
3143
3144 let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
3145 let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
3146 let should_fail_event_handling = Arc::new(AtomicBool::new(true));
3147 let event_handler = move |event: Event| {
3148 if let Ok(true) = should_fail_event_handling.compare_exchange(
3149 true,
3150 false,
3151 Ordering::Acquire,
3152 Ordering::Relaxed,
3153 ) {
3154 first_event_send.send(event).unwrap();
3155 return Err(ReplayEvent());
3156 }
3157
3158 second_event_send.send(event).unwrap();
3159 Ok(())
3160 };
3161
3162 let bg_processor = BackgroundProcessor::start(
3163 persister,
3164 event_handler,
3165 Arc::clone(&nodes[0].chain_monitor),
3166 Arc::clone(&nodes[0].node),
3167 Some(Arc::clone(&nodes[0].messenger)),
3168 nodes[0].no_gossip_sync(),
3169 Arc::clone(&nodes[0].peer_manager),
3170 Some(Arc::clone(&nodes[0].liquidity_manager)),
3171 Some(Arc::clone(&nodes[0].sweeper)),
3172 Arc::clone(&nodes[0].logger),
3173 Some(Arc::clone(&nodes[0].scorer)),
3174 );
3175
3176 begin_open_channel!(nodes[0], nodes[1], channel_value);
3177 assert_eq!(
3178 first_event_recv.recv_timeout(EVENT_DEADLINE).unwrap(),
3179 second_event_recv.recv_timeout(EVENT_DEADLINE).unwrap()
3180 );
3181
3182 if !std::thread::panicking() {
3183 bg_processor.stop().unwrap();
3184 }
3185 }
3186
3187 #[test]
3188 fn test_scorer_persistence() {
3189 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
3190 let data_dir = nodes[0].kv_store.get_data_dir();
3191 let persister = Arc::new(Persister::new(data_dir));
3192 let event_handler = |_: _| Ok(());
3193 let bg_processor = BackgroundProcessor::start(
3194 persister,
3195 event_handler,
3196 Arc::clone(&nodes[0].chain_monitor),
3197 Arc::clone(&nodes[0].node),
3198 Some(Arc::clone(&nodes[0].messenger)),
3199 nodes[0].no_gossip_sync(),
3200 Arc::clone(&nodes[0].peer_manager),
3201 Some(Arc::clone(&nodes[0].liquidity_manager)),
3202 Some(Arc::clone(&nodes[0].sweeper)),
3203 Arc::clone(&nodes[0].logger),
3204 Some(Arc::clone(&nodes[0].scorer)),
3205 );
3206
3207 loop {
3208 let log_entries = nodes[0].logger.lines.lock().unwrap();
3209 let expected_log = "Calling time_passed and persisting scorer".to_string();
3210 if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
3211 break;
3212 }
3213 }
3214
3215 if !std::thread::panicking() {
3216 bg_processor.stop().unwrap();
3217 }
3218 }
3219
3220 macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
3221 ($nodes: expr, $receive: expr, $sleep: expr) => {
3222 let features = ChannelFeatures::empty();
3223 $nodes[0]
3224 .network_graph
3225 .add_channel_from_partial_announcement(
3226 42,
3227 None,
3228 53,
3229 features,
3230 $nodes[0].node.get_our_node_id().into(),
3231 $nodes[1].node.get_our_node_id().into(),
3232 )
3233 .expect("Failed to update channel from partial announcement");
3234 let original_graph_description = $nodes[0].network_graph.to_string();
3235 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
3236 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
3237
3238 loop {
3239 $sleep;
3240 let log_entries = $nodes[0].logger.lines.lock().unwrap();
3241 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
3242 if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
3243 > 1
3244 {
3245 break;
3247 }
3248 }
3249
3250 let initialization_input = vec![
3251 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
3252 247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
3253 98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
3254 250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
3255 67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
3256 115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
3257 1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
3258 65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
3259 149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
3260 175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
3261 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
3262 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
3263 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
3264 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
3265 ];
3266 $nodes[0]
3267 .rapid_gossip_sync
3268 .update_network_graph_no_std(&initialization_input[..], Some(1642291930))
3269 .unwrap();
3270
3271 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
3273
3274 $receive.expect("Network graph not pruned within deadline");
3275
3276 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
3278 };
3279 }
3280
3281 #[test]
3282 fn test_not_pruning_network_graph_until_graph_sync_completion() {
3283 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3284
3285 let (_, nodes) =
3286 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
3287 let data_dir = nodes[0].kv_store.get_data_dir();
3288 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3289
3290 let event_handler = |_: _| Ok(());
3291 let background_processor = BackgroundProcessor::start(
3292 persister,
3293 event_handler,
3294 Arc::clone(&nodes[0].chain_monitor),
3295 Arc::clone(&nodes[0].node),
3296 Some(Arc::clone(&nodes[0].messenger)),
3297 nodes[0].rapid_gossip_sync(),
3298 Arc::clone(&nodes[0].peer_manager),
3299 Some(Arc::clone(&nodes[0].liquidity_manager)),
3300 Some(Arc::clone(&nodes[0].sweeper)),
3301 Arc::clone(&nodes[0].logger),
3302 Some(Arc::clone(&nodes[0].scorer)),
3303 );
3304
3305 do_test_not_pruning_network_graph_until_graph_sync_completion!(
3306 nodes,
3307 receiver.recv_timeout(super::FIRST_NETWORK_PRUNE_TIMER * 5),
3308 std::thread::sleep(Duration::from_millis(1))
3309 );
3310
3311 background_processor.stop().unwrap();
3312 }
3313
3314 #[tokio::test]
3315 async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
3316 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3317
3318 let (_, nodes) =
3319 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
3320 let data_dir = nodes[0].kv_store.get_data_dir();
3321 let kv_store_sync =
3322 Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3323 let kv_store = KVStoreSyncWrapper(kv_store_sync);
3324
3325 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3327 &*(nodes[0].liquidity_manager.get_lm_async()
3328 as *const LiquidityManager<_, _, _, _, _, _, _>)
3329 as &'static LiquidityManager<_, _, _, _, _, _, _>
3330 };
3331 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3332 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3333 as &'static OutputSweeper<_, _, _, _, _, _, _>
3334 };
3335
3336 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3337 let bp_future = super::process_events_async(
3338 kv_store,
3339 |_: _| async { Ok(()) },
3340 Arc::clone(&nodes[0].chain_monitor),
3341 Arc::clone(&nodes[0].node),
3342 Some(Arc::clone(&nodes[0].messenger)),
3343 nodes[0].rapid_gossip_sync(),
3344 Arc::clone(&nodes[0].peer_manager),
3345 Some(lm_async),
3346 Some(sweeper_async),
3347 Arc::clone(&nodes[0].logger),
3348 Some(Arc::clone(&nodes[0].scorer)),
3349 move |dur: Duration| {
3350 let mut exit_receiver = exit_receiver.clone();
3351 Box::pin(async move {
3352 tokio::select! {
3353 _ = tokio::time::sleep(dur) => false,
3354 _ = exit_receiver.changed() => true,
3355 }
3356 })
3357 },
3358 false,
3359 || Some(Duration::from_secs(1696300000)),
3360 );
3361
3362 let t1 = tokio::spawn(bp_future);
3363 let t2 = tokio::spawn(async move {
3364 do_test_not_pruning_network_graph_until_graph_sync_completion!(
3365 nodes,
3366 {
3367 let mut i = 0;
3368 loop {
3369 tokio::time::sleep(super::FIRST_NETWORK_PRUNE_TIMER).await;
3370 if let Ok(()) = receiver.try_recv() {
3371 break Ok::<(), ()>(());
3372 }
3373 assert!(i < 5);
3374 i += 1;
3375 }
3376 },
3377 tokio::time::sleep(Duration::from_millis(1)).await
3378 );
3379 exit_sender.send(()).unwrap();
3380 });
3381 let (r1, r2) = tokio::join!(t1, t2);
3382 r1.unwrap().unwrap();
3383 r2.unwrap()
3384 }
3385
3386 macro_rules! do_test_payment_path_scoring {
3387 ($nodes: expr, $receive: expr) => {
3388 let scored_scid = 4242;
3394 let secp_ctx = Secp256k1::new();
3395 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
3396 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
3397
3398 let path = Path { hops: vec![RouteHop {
3399 pubkey: node_1_id,
3400 node_features: NodeFeatures::empty(),
3401 short_channel_id: scored_scid,
3402 channel_features: ChannelFeatures::empty(),
3403 fee_msat: 0,
3404 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
3405 maybe_announced_channel: true,
3406 }], blinded_tail: None };
3407
3408 $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
3409 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3410 payment_id: None,
3411 payment_hash: PaymentHash([42; 32]),
3412 payment_failed_permanently: false,
3413 failure: PathFailure::OnPath { network_update: None },
3414 path: path.clone(),
3415 short_channel_id: Some(scored_scid),
3416 error_code: None,
3417 error_data: None,
3418 hold_times: Vec::new(),
3419 });
3420 let event = $receive.expect("PaymentPathFailed not handled within deadline");
3421 match event {
3422 Event::PaymentPathFailed { .. } => {},
3423 _ => panic!("Unexpected event"),
3424 }
3425
3426 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3429 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3430 payment_id: None,
3431 payment_hash: PaymentHash([42; 32]),
3432 payment_failed_permanently: true,
3433 failure: PathFailure::OnPath { network_update: None },
3434 path: path.clone(),
3435 short_channel_id: None,
3436 error_code: None,
3437 error_data: None,
3438 hold_times: Vec::new(),
3439 });
3440 let event = $receive.expect("PaymentPathFailed not handled within deadline");
3441 match event {
3442 Event::PaymentPathFailed { .. } => {},
3443 _ => panic!("Unexpected event"),
3444 }
3445
3446 $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
3447 $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
3448 payment_id: PaymentId([42; 32]),
3449 payment_hash: None,
3450 path: path.clone(),
3451 hold_times: Vec::new(),
3452 });
3453 let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
3454 match event {
3455 Event::PaymentPathSuccessful { .. } => {},
3456 _ => panic!("Unexpected event"),
3457 }
3458
3459 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3460 $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
3461 payment_id: PaymentId([42; 32]),
3462 payment_hash: PaymentHash([42; 32]),
3463 path: path.clone(),
3464 });
3465 let event = $receive.expect("ProbeSuccessful not handled within deadline");
3466 match event {
3467 Event::ProbeSuccessful { .. } => {},
3468 _ => panic!("Unexpected event"),
3469 }
3470
3471 $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
3472 $nodes[0].node.push_pending_event(Event::ProbeFailed {
3473 payment_id: PaymentId([42; 32]),
3474 payment_hash: PaymentHash([42; 32]),
3475 path,
3476 short_channel_id: Some(scored_scid),
3477 });
3478 let event = $receive.expect("ProbeFailure not handled within deadline");
3479 match event {
3480 Event::ProbeFailed { .. } => {},
3481 _ => panic!("Unexpected event"),
3482 }
3483 }
3484 }
3485
3486 #[test]
3487 fn test_payment_path_scoring() {
3488 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3489 let event_handler = move |event: Event| {
3490 match event {
3491 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
3492 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
3493 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
3494 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
3495 _ => panic!("Unexpected event: {:?}", event),
3496 }
3497 Ok(())
3498 };
3499
3500 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
3501 let data_dir = nodes[0].kv_store.get_data_dir();
3502 let persister = Arc::new(Persister::new(data_dir));
3503 let bg_processor = BackgroundProcessor::start(
3504 persister,
3505 event_handler,
3506 Arc::clone(&nodes[0].chain_monitor),
3507 Arc::clone(&nodes[0].node),
3508 Some(Arc::clone(&nodes[0].messenger)),
3509 nodes[0].no_gossip_sync(),
3510 Arc::clone(&nodes[0].peer_manager),
3511 Some(Arc::clone(&nodes[0].liquidity_manager)),
3512 Some(Arc::clone(&nodes[0].sweeper)),
3513 Arc::clone(&nodes[0].logger),
3514 Some(Arc::clone(&nodes[0].scorer)),
3515 );
3516
3517 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(EVENT_DEADLINE));
3518
3519 if !std::thread::panicking() {
3520 bg_processor.stop().unwrap();
3521 }
3522
3523 let log_entries = nodes[0].logger.lines.lock().unwrap();
3524 let expected_log = "Persisting scorer after update".to_string();
3525 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
3526 }
3527
3528 #[tokio::test]
3529 async fn test_payment_path_scoring_async() {
3530 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
3531 let event_handler = move |event: Event| {
3532 let sender_ref = sender.clone();
3533 async move {
3534 match event {
3535 Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
3536 Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3537 Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3538 Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
3539 _ => panic!("Unexpected event: {:?}", event),
3540 }
3541 Ok(())
3542 }
3543 };
3544
3545 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
3546 let data_dir = nodes[0].kv_store.get_data_dir();
3547 let kv_store_sync = Arc::new(Persister::new(data_dir));
3548 let kv_store = KVStoreSyncWrapper(kv_store_sync);
3549
3550 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3551
3552 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3554 &*(nodes[0].liquidity_manager.get_lm_async()
3555 as *const LiquidityManager<_, _, _, _, _, _, _>)
3556 as &'static LiquidityManager<_, _, _, _, _, _, _>
3557 };
3558 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3559 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3560 as &'static OutputSweeper<_, _, _, _, _, _, _>
3561 };
3562
3563 let bp_future = super::process_events_async(
3564 kv_store,
3565 event_handler,
3566 Arc::clone(&nodes[0].chain_monitor),
3567 Arc::clone(&nodes[0].node),
3568 Some(Arc::clone(&nodes[0].messenger)),
3569 nodes[0].no_gossip_sync(),
3570 Arc::clone(&nodes[0].peer_manager),
3571 Some(lm_async),
3572 Some(sweeper_async),
3573 Arc::clone(&nodes[0].logger),
3574 Some(Arc::clone(&nodes[0].scorer)),
3575 move |dur: Duration| {
3576 let mut exit_receiver = exit_receiver.clone();
3577 Box::pin(async move {
3578 tokio::select! {
3579 _ = tokio::time::sleep(dur) => false,
3580 _ = exit_receiver.changed() => true,
3581 }
3582 })
3583 },
3584 false,
3585 || Some(Duration::ZERO),
3586 );
3587 let t1 = tokio::spawn(bp_future);
3588 let t2 = tokio::spawn(async move {
3589 do_test_payment_path_scoring!(nodes, receiver.recv().await);
3590 exit_sender.send(()).unwrap();
3591
3592 let log_entries = nodes[0].logger.lines.lock().unwrap();
3593 let expected_log = "Persisting scorer after update".to_string();
3594 assert_eq!(
3595 *log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
3596 5
3597 );
3598 });
3599
3600 let (r1, r2) = tokio::join!(t1, t2);
3601 r1.unwrap().unwrap();
3602 r2.unwrap()
3603 }
3604
3605 #[tokio::test]
3606 #[cfg(not(c_bindings))]
3607 async fn test_no_consts() {
3608 let (_, nodes) = create_nodes(1, "test_no_consts");
3610 let bg_processor = BackgroundProcessor::start(
3611 Arc::clone(&nodes[0].kv_store),
3612 move |_: Event| Ok(()),
3613 Arc::clone(&nodes[0].chain_monitor),
3614 Arc::clone(&nodes[0].node),
3615 crate::NO_ONION_MESSENGER,
3616 nodes[0].no_gossip_sync(),
3617 Arc::clone(&nodes[0].peer_manager),
3618 crate::NO_LIQUIDITY_MANAGER_SYNC,
3619 Some(Arc::clone(&nodes[0].sweeper)),
3620 Arc::clone(&nodes[0].logger),
3621 Some(Arc::clone(&nodes[0].scorer)),
3622 );
3623
3624 if !std::thread::panicking() {
3625 bg_processor.stop().unwrap();
3626 }
3627
3628 let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
3629 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3630 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3631 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3632 as &'static OutputSweeper<_, _, _, _, _, _, _>
3633 };
3634 let bp_future = super::process_events_async(
3635 kv_store,
3636 move |_: Event| async move { Ok(()) },
3637 Arc::clone(&nodes[0].chain_monitor),
3638 Arc::clone(&nodes[0].node),
3639 crate::NO_ONION_MESSENGER,
3640 nodes[0].no_gossip_sync(),
3641 Arc::clone(&nodes[0].peer_manager),
3642 crate::NO_LIQUIDITY_MANAGER,
3643 Some(sweeper_async),
3644 Arc::clone(&nodes[0].logger),
3645 Some(Arc::clone(&nodes[0].scorer)),
3646 move |dur: Duration| {
3647 let mut exit_receiver = exit_receiver.clone();
3648 Box::pin(async move {
3649 tokio::select! {
3650 _ = tokio::time::sleep(dur) => false,
3651 _ = exit_receiver.changed() => true,
3652 }
3653 })
3654 },
3655 false,
3656 || Some(Duration::ZERO),
3657 );
3658 let t1 = tokio::spawn(bp_future);
3659 exit_sender.send(()).unwrap();
3660 t1.await.unwrap().unwrap();
3661 }
3662}