1#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
13#![deny(rustdoc::broken_intra_doc_links)]
14#![deny(rustdoc::private_intra_doc_links)]
15#![deny(missing_docs)]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
18
19#[cfg(any(test, feature = "std"))]
20extern crate core;
21
22#[cfg(not(feature = "std"))]
23extern crate alloc;
24
25#[macro_use]
26extern crate lightning;
27extern crate lightning_rapid_gossip_sync;
28
29mod fwd_batch;
30
31use fwd_batch::BatchDelay;
32
33use lightning::chain;
34use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35use lightning::chain::chainmonitor::{ChainMonitor, Persist};
36#[cfg(feature = "std")]
37use lightning::events::EventHandler;
38#[cfg(feature = "std")]
39use lightning::events::EventsProvider;
40use lightning::events::ReplayEvent;
41use lightning::events::{Event, PathFailure};
42use lightning::util::ser::Writeable;
43
44use lightning::ln::channelmanager::AChannelManager;
45use lightning::ln::msgs::OnionMessageHandler;
46use lightning::ln::peer_handler::APeerManager;
47use lightning::onion_message::messenger::AOnionMessenger;
48use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
49use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
50use lightning::routing::utxo::UtxoLookup;
51use lightning::sign::{
52 ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53};
54use lightning::util::logger::Logger;
55use lightning::util::persist::{
56 KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
57 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
58 NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
59 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
60 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
61};
62use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
63#[cfg(feature = "std")]
64use lightning::util::wakers::Sleeper;
65use lightning_rapid_gossip_sync::RapidGossipSync;
66
67use lightning_liquidity::ALiquidityManager;
68#[cfg(feature = "std")]
69use lightning_liquidity::ALiquidityManagerSync;
70
71use core::ops::Deref;
72use core::time::Duration;
73
74#[cfg(feature = "std")]
75use core::sync::atomic::{AtomicBool, Ordering};
76#[cfg(feature = "std")]
77use std::sync::Arc;
78#[cfg(feature = "std")]
79use std::thread::{self, JoinHandle};
80#[cfg(feature = "std")]
81use std::time::Instant;
82
83#[cfg(not(feature = "std"))]
84use alloc::boxed::Box;
85#[cfg(all(not(c_bindings), not(feature = "std")))]
86use alloc::sync::Arc;
87
88#[cfg(feature = "std")]
117#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
118pub struct BackgroundProcessor {
119 stop_thread: Arc<AtomicBool>,
120 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
121}
122
123#[cfg(not(test))]
124const FRESHNESS_TIMER: Duration = Duration::from_secs(60);
125#[cfg(test)]
126const FRESHNESS_TIMER: Duration = Duration::from_secs(1);
127
128#[cfg(all(not(test), not(debug_assertions)))]
129const PING_TIMER: Duration = Duration::from_secs(10);
130#[cfg(all(not(test), debug_assertions))]
134const PING_TIMER: Duration = Duration::from_secs(30);
135#[cfg(test)]
136const PING_TIMER: Duration = Duration::from_secs(1);
137
138#[cfg(not(test))]
139const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(10);
140#[cfg(test)]
141const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(1);
142
143const NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60 * 60);
145
146#[cfg(not(test))]
147const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(60 * 5);
148#[cfg(test)]
149const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(1);
150
151#[cfg(not(test))]
152const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60);
153#[cfg(test)]
154const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(1);
155
156#[cfg(not(test))]
157const REBROADCAST_TIMER: Duration = Duration::from_secs(30);
158#[cfg(test)]
159const REBROADCAST_TIMER: Duration = Duration::from_secs(1);
160
161#[cfg(not(test))]
162const SWEEPER_TIMER: Duration = Duration::from_secs(30);
163#[cfg(test)]
164const SWEEPER_TIMER: Duration = Duration::from_secs(1);
165
166const fn min_duration(a: Duration, b: Duration) -> Duration {
168 if a.as_nanos() < b.as_nanos() {
169 a
170 } else {
171 b
172 }
173}
174const FASTEST_TIMER: Duration = min_duration(
175 min_duration(FRESHNESS_TIMER, PING_TIMER),
176 min_duration(SCORER_PERSIST_TIMER, min_duration(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
177);
178
179pub enum GossipSync<
181 P: Deref<Target = P2PGossipSync<G, U, L>>,
182 R: Deref<Target = RapidGossipSync<G, L>>,
183 G: Deref<Target = NetworkGraph<L>>,
184 U: Deref,
185 L: Deref,
186> where
187 U::Target: UtxoLookup,
188 L::Target: Logger,
189{
190 P2P(P),
192 Rapid(R),
194 None,
196}
197
198impl<
199 P: Deref<Target = P2PGossipSync<G, U, L>>,
200 R: Deref<Target = RapidGossipSync<G, L>>,
201 G: Deref<Target = NetworkGraph<L>>,
202 U: Deref,
203 L: Deref,
204 > GossipSync<P, R, G, U, L>
205where
206 U::Target: UtxoLookup,
207 L::Target: Logger,
208{
209 fn network_graph(&self) -> Option<&G> {
210 match self {
211 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
212 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
213 GossipSync::None => None,
214 }
215 }
216
217 fn prunable_network_graph(&self) -> Option<&G> {
218 match self {
219 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
220 GossipSync::Rapid(gossip_sync) => {
221 if gossip_sync.is_initial_sync_complete() {
222 Some(gossip_sync.network_graph())
223 } else {
224 None
225 }
226 },
227 GossipSync::None => None,
228 }
229 }
230}
231
232impl<
234 P: Deref<Target = P2PGossipSync<G, U, L>>,
235 G: Deref<Target = NetworkGraph<L>>,
236 U: Deref,
237 L: Deref,
238 > GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
239where
240 U::Target: UtxoLookup,
241 L::Target: Logger,
242{
243 pub fn p2p(gossip_sync: P) -> Self {
245 GossipSync::P2P(gossip_sync)
246 }
247}
248
249impl<
251 'a,
252 R: Deref<Target = RapidGossipSync<G, L>>,
253 G: Deref<Target = NetworkGraph<L>>,
254 L: Deref,
255 >
256 GossipSync<
257 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
258 R,
259 G,
260 &'a (dyn UtxoLookup + Send + Sync),
261 L,
262 > where
263 L::Target: Logger,
264{
265 pub fn rapid(gossip_sync: R) -> Self {
267 GossipSync::Rapid(gossip_sync)
268 }
269}
270
271impl<'a, L: Deref>
273 GossipSync<
274 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
275 &RapidGossipSync<&'a NetworkGraph<L>, L>,
276 &'a NetworkGraph<L>,
277 &'a (dyn UtxoLookup + Send + Sync),
278 L,
279 > where
280 L::Target: Logger,
281{
282 pub fn none() -> Self {
284 GossipSync::None
285 }
286}
287
288fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
289where
290 L::Target: Logger,
291{
292 if let Event::PaymentPathFailed {
293 failure: PathFailure::OnPath { network_update: Some(ref upd) },
294 ..
295 } = event
296 {
297 network_graph.handle_network_update(upd);
298 }
299}
300
301fn update_scorer<'a, S: Deref<Target = SC>, SC: 'a + WriteableScore<'a>>(
304 scorer: &'a S, event: &Event, duration_since_epoch: Duration,
305) -> bool {
306 match event {
307 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
308 let mut score = scorer.write_lock();
309 score.payment_path_failed(path, *scid, duration_since_epoch);
310 },
311 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
312 let mut score = scorer.write_lock();
315 score.probe_successful(path, duration_since_epoch);
316 },
317 Event::PaymentPathSuccessful { path, .. } => {
318 let mut score = scorer.write_lock();
319 score.payment_path_successful(path, duration_since_epoch);
320 },
321 Event::ProbeSuccessful { path, .. } => {
322 let mut score = scorer.write_lock();
323 score.probe_successful(path, duration_since_epoch);
324 },
325 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
326 let mut score = scorer.write_lock();
327 score.probe_failed(path, *scid, duration_since_epoch);
328 },
329 _ => return false,
330 }
331 true
332}
333
334#[cfg(all(not(c_bindings), feature = "std"))]
335type ScorerWrapper<T> = std::sync::RwLock<T>;
336
337#[cfg(all(not(c_bindings), not(feature = "std")))]
338type ScorerWrapper<T> = core::cell::RefCell<T>;
339
340#[cfg(not(c_bindings))]
341type DynRouter = lightning::routing::router::DefaultRouter<
342 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
343 &'static (dyn Logger + Send + Sync),
344 &'static (dyn EntropySource + Send + Sync),
345 &'static ScorerWrapper<
346 lightning::routing::scoring::ProbabilisticScorer<
347 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
348 &'static (dyn Logger + Send + Sync),
349 >,
350 >,
351 lightning::routing::scoring::ProbabilisticScoringFeeParameters,
352 lightning::routing::scoring::ProbabilisticScorer<
353 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
354 &'static (dyn Logger + Send + Sync),
355 >,
356>;
357
358#[cfg(not(c_bindings))]
359type DynMessageRouter = lightning::onion_message::messenger::DefaultMessageRouter<
360 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
361 &'static (dyn Logger + Send + Sync),
362 &'static (dyn EntropySource + Send + Sync),
363>;
364
365#[cfg(all(not(c_bindings), not(taproot)))]
366type DynSignerProvider = dyn lightning::sign::SignerProvider<EcdsaSigner = lightning::sign::InMemorySigner>
367 + Send
368 + Sync;
369
370#[cfg(all(not(c_bindings), taproot))]
371type DynSignerProvider = (dyn lightning::sign::SignerProvider<
372 EcdsaSigner = lightning::sign::InMemorySigner,
373 TaprootSigner = lightning::sign::InMemorySigner,
374> + Send
375 + Sync);
376
377#[cfg(not(c_bindings))]
378type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
379 &'static (dyn chain::Watch<lightning::sign::InMemorySigner> + Send + Sync),
380 &'static (dyn BroadcasterInterface + Send + Sync),
381 &'static (dyn EntropySource + Send + Sync),
382 &'static (dyn lightning::sign::NodeSigner + Send + Sync),
383 &'static DynSignerProvider,
384 &'static (dyn FeeEstimator + Send + Sync),
385 &'static DynRouter,
386 &'static DynMessageRouter,
387 &'static (dyn Logger + Send + Sync),
388>;
389
390#[cfg(not(c_bindings))]
393pub const NO_ONION_MESSENGER: Option<
394 Arc<
395 dyn AOnionMessenger<
396 EntropySource = dyn EntropySource + Send + Sync,
397 ES = &(dyn EntropySource + Send + Sync),
398 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
399 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
400 Logger = dyn Logger + Send + Sync,
401 L = &'static (dyn Logger + Send + Sync),
402 NodeIdLookUp = DynChannelManager,
403 NL = &'static DynChannelManager,
404 MessageRouter = DynMessageRouter,
405 MR = &'static DynMessageRouter,
406 OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
407 OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
408 AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
409 APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
410 DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
411 DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
412 CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
413 CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
414 > + Send
415 + Sync,
416 >,
417> = None;
418
419#[cfg(not(c_bindings))]
422pub const NO_LIQUIDITY_MANAGER: Option<
423 Arc<
424 dyn ALiquidityManager<
425 EntropySource = dyn EntropySource + Send + Sync,
426 ES = &(dyn EntropySource + Send + Sync),
427 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
428 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
429 AChannelManager = DynChannelManager,
430 CM = &DynChannelManager,
431 Filter = dyn chain::Filter + Send + Sync,
432 C = &(dyn chain::Filter + Send + Sync),
433 KVStore = dyn lightning::util::persist::KVStore + Send + Sync,
434 K = &(dyn lightning::util::persist::KVStore + Send + Sync),
435 TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
436 TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
437 BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
438 + Send
439 + Sync,
440 T = &(dyn BroadcasterInterface + Send + Sync),
441 > + Send
442 + Sync,
443 >,
444> = None;
445
446#[cfg(all(not(c_bindings), feature = "std"))]
449pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
450 Arc<
451 dyn ALiquidityManagerSync<
452 EntropySource = dyn EntropySource + Send + Sync,
453 ES = &(dyn EntropySource + Send + Sync),
454 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
455 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
456 AChannelManager = DynChannelManager,
457 CM = &DynChannelManager,
458 Filter = dyn chain::Filter + Send + Sync,
459 C = &(dyn chain::Filter + Send + Sync),
460 KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
461 KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
462 TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
463 TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
464 BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
465 + Send
466 + Sync,
467 T = &(dyn BroadcasterInterface + Send + Sync),
468 > + Send
469 + Sync,
470 >,
471> = None;
472
473pub(crate) mod futures_util {
474 use core::future::Future;
475 use core::marker::Unpin;
476 use core::pin::Pin;
477 use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
478 pub(crate) struct Selector<
479 A: Future<Output = ()> + Unpin,
480 B: Future<Output = ()> + Unpin,
481 C: Future<Output = ()> + Unpin,
482 D: Future<Output = ()> + Unpin,
483 E: Future<Output = bool> + Unpin,
484 > {
485 pub a: A,
486 pub b: B,
487 pub c: C,
488 pub d: D,
489 pub e: E,
490 }
491
492 pub(crate) enum SelectorOutput {
493 A,
494 B,
495 C,
496 D,
497 E(bool),
498 }
499
500 impl<
501 A: Future<Output = ()> + Unpin,
502 B: Future<Output = ()> + Unpin,
503 C: Future<Output = ()> + Unpin,
504 D: Future<Output = ()> + Unpin,
505 E: Future<Output = bool> + Unpin,
506 > Future for Selector<A, B, C, D, E>
507 {
508 type Output = SelectorOutput;
509 fn poll(
510 mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
511 ) -> Poll<SelectorOutput> {
512 match Pin::new(&mut self.a).poll(ctx) {
513 Poll::Ready(()) => {
514 return Poll::Ready(SelectorOutput::A);
515 },
516 Poll::Pending => {},
517 }
518 match Pin::new(&mut self.b).poll(ctx) {
519 Poll::Ready(()) => {
520 return Poll::Ready(SelectorOutput::B);
521 },
522 Poll::Pending => {},
523 }
524 match Pin::new(&mut self.c).poll(ctx) {
525 Poll::Ready(()) => {
526 return Poll::Ready(SelectorOutput::C);
527 },
528 Poll::Pending => {},
529 }
530 match Pin::new(&mut self.d).poll(ctx) {
531 Poll::Ready(()) => {
532 return Poll::Ready(SelectorOutput::D);
533 },
534 Poll::Pending => {},
535 }
536 match Pin::new(&mut self.e).poll(ctx) {
537 Poll::Ready(res) => {
538 return Poll::Ready(SelectorOutput::E(res));
539 },
540 Poll::Pending => {},
541 }
542 Poll::Pending
543 }
544 }
545
546 pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
549 pub optional_future: Option<F>,
550 }
551
552 impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
553 type Output = ();
554 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
555 match self.optional_future.as_mut() {
556 Some(f) => match Pin::new(f).poll(ctx) {
557 Poll::Ready(()) => {
558 self.optional_future.take();
559 Poll::Ready(())
560 },
561 Poll::Pending => Poll::Pending,
562 },
563 None => Poll::Pending,
564 }
565 }
566 }
567
568 fn dummy_waker_clone(_: *const ()) -> RawWaker {
572 RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
573 }
574 fn dummy_waker_action(_: *const ()) {}
575
576 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
577 dummy_waker_clone,
578 dummy_waker_action,
579 dummy_waker_action,
580 dummy_waker_action,
581 );
582 pub(crate) fn dummy_waker() -> Waker {
583 unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
584 }
585
586 enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
587 Pending(Option<F>),
588 Ready(Result<(), ERR>),
589 }
590
591 pub(crate) struct Joiner<
592 ERR,
593 A: Future<Output = Result<(), ERR>> + Unpin,
594 B: Future<Output = Result<(), ERR>> + Unpin,
595 C: Future<Output = Result<(), ERR>> + Unpin,
596 D: Future<Output = Result<(), ERR>> + Unpin,
597 E: Future<Output = Result<(), ERR>> + Unpin,
598 > {
599 a: JoinerResult<ERR, A>,
600 b: JoinerResult<ERR, B>,
601 c: JoinerResult<ERR, C>,
602 d: JoinerResult<ERR, D>,
603 e: JoinerResult<ERR, E>,
604 }
605
606 impl<
607 ERR,
608 A: Future<Output = Result<(), ERR>> + Unpin,
609 B: Future<Output = Result<(), ERR>> + Unpin,
610 C: Future<Output = Result<(), ERR>> + Unpin,
611 D: Future<Output = Result<(), ERR>> + Unpin,
612 E: Future<Output = Result<(), ERR>> + Unpin,
613 > Joiner<ERR, A, B, C, D, E>
614 {
615 pub(crate) fn new() -> Self {
616 Self {
617 a: JoinerResult::Pending(None),
618 b: JoinerResult::Pending(None),
619 c: JoinerResult::Pending(None),
620 d: JoinerResult::Pending(None),
621 e: JoinerResult::Pending(None),
622 }
623 }
624
625 pub(crate) fn set_a(&mut self, fut: A) {
626 self.a = JoinerResult::Pending(Some(fut));
627 }
628 pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
629 self.a = JoinerResult::Ready(res);
630 }
631 pub(crate) fn set_b(&mut self, fut: B) {
632 self.b = JoinerResult::Pending(Some(fut));
633 }
634 pub(crate) fn set_c(&mut self, fut: C) {
635 self.c = JoinerResult::Pending(Some(fut));
636 }
637 pub(crate) fn set_d(&mut self, fut: D) {
638 self.d = JoinerResult::Pending(Some(fut));
639 }
640 pub(crate) fn set_e(&mut self, fut: E) {
641 self.e = JoinerResult::Pending(Some(fut));
642 }
643 }
644
645 impl<
646 ERR,
647 A: Future<Output = Result<(), ERR>> + Unpin,
648 B: Future<Output = Result<(), ERR>> + Unpin,
649 C: Future<Output = Result<(), ERR>> + Unpin,
650 D: Future<Output = Result<(), ERR>> + Unpin,
651 E: Future<Output = Result<(), ERR>> + Unpin,
652 > Future for Joiner<ERR, A, B, C, D, E>
653 where
654 Joiner<ERR, A, B, C, D, E>: Unpin,
655 {
656 type Output = [Result<(), ERR>; 5];
657 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
658 let mut all_complete = true;
659 macro_rules! handle {
660 ($val: ident) => {
661 match &mut (self.$val) {
662 JoinerResult::Pending(None) => {
663 self.$val = JoinerResult::Ready(Ok(()));
664 },
665 JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
666 match Pin::new(val).poll(ctx) {
667 Poll::Ready(res) => {
668 self.$val = JoinerResult::Ready(res);
669 },
670 Poll::Pending => {
671 all_complete = false;
672 },
673 }
674 },
675 JoinerResult::Ready(_) => {},
676 }
677 };
678 }
679 handle!(a);
680 handle!(b);
681 handle!(c);
682 handle!(d);
683 handle!(e);
684
685 if all_complete {
686 let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
687 if let JoinerResult::Ready(ref mut val) = &mut self.a {
688 core::mem::swap(&mut res[0], val);
689 }
690 if let JoinerResult::Ready(ref mut val) = &mut self.b {
691 core::mem::swap(&mut res[1], val);
692 }
693 if let JoinerResult::Ready(ref mut val) = &mut self.c {
694 core::mem::swap(&mut res[2], val);
695 }
696 if let JoinerResult::Ready(ref mut val) = &mut self.d {
697 core::mem::swap(&mut res[3], val);
698 }
699 if let JoinerResult::Ready(ref mut val) = &mut self.e {
700 core::mem::swap(&mut res[4], val);
701 }
702 Poll::Ready(res)
703 } else {
704 Poll::Pending
705 }
706 }
707 }
708}
709use core::task;
710use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
711
712#[cfg_attr(
720 feature = "std",
721 doc = " See [`BackgroundProcessor::start`] for information on which actions this handles.\n"
722)]
723#[cfg_attr(
832 feature = "std",
833 doc = " let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());"
834)]
835#[cfg_attr(feature = "std", doc = "")]
836#[cfg_attr(feature = "std", doc = " let mut receiver = stop_receiver.clone();")]
838#[cfg_attr(feature = "std", doc = " _ = receiver.changed() => true,")]
842#[cfg_attr(feature = "std", doc = " let handle = tokio::spawn(async move {")]
849#[cfg_attr(
850 not(feature = "std"),
851 doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
852)]
853#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
854#[cfg_attr(feature = "std", doc = " stop_sender.send(()).unwrap();")]
876#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
877pub async fn process_events_async<
880 'a,
881 UL: Deref,
882 CF: Deref,
883 T: Deref,
884 F: Deref,
885 G: Deref<Target = NetworkGraph<L>>,
886 L: Deref,
887 P: Deref,
888 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
889 EventHandler: Fn(Event) -> EventHandlerFuture,
890 ES: Deref,
891 M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
892 CM: Deref,
893 OM: Deref,
894 PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
895 RGS: Deref<Target = RapidGossipSync<G, L>>,
896 PM: Deref,
897 LM: Deref,
898 D: Deref,
899 O: Deref,
900 K: Deref,
901 OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
902 S: Deref<Target = SC>,
903 SC: for<'b> WriteableScore<'b>,
904 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
905 Sleeper: Fn(Duration) -> SleepFuture,
906 FetchTime: Fn() -> Option<Duration>,
907>(
908 kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
909 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
910 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
911 sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
912) -> Result<(), lightning::io::Error>
913where
914 UL::Target: UtxoLookup,
915 CF::Target: chain::Filter,
916 T::Target: BroadcasterInterface,
917 F::Target: FeeEstimator,
918 L::Target: Logger,
919 P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
920 ES::Target: EntropySource,
921 CM::Target: AChannelManager,
922 OM::Target: AOnionMessenger,
923 PM::Target: APeerManager,
924 LM::Target: ALiquidityManager,
925 O::Target: OutputSpender,
926 D::Target: ChangeDestinationSource,
927 K::Target: KVStore,
928{
929 let async_event_handler = |event| {
930 let network_graph = gossip_sync.network_graph();
931 let event_handler = &event_handler;
932 let scorer = &scorer;
933 let logger = &logger;
934 let kv_store = &kv_store;
935 let fetch_time = &fetch_time;
936 Box::pin(async move {
938 if let Some(network_graph) = network_graph {
939 handle_network_graph_update(network_graph, &event)
940 }
941 if let Some(ref scorer) = scorer {
942 if let Some(duration_since_epoch) = fetch_time() {
943 if update_scorer(scorer, &event, duration_since_epoch) {
944 log_trace!(logger, "Persisting scorer after update");
945 if let Err(e) = kv_store
946 .write(
947 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
948 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
949 SCORER_PERSISTENCE_KEY,
950 scorer.encode(),
951 )
952 .await
953 {
954 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
955 }
960 }
961 }
962 }
963 event_handler(event).await
964 })
965 };
966 let mut batch_delay = BatchDelay::new();
967
968 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
969 channel_manager.get_cm().timer_tick_occurred();
970 log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
971 chain_monitor.rebroadcast_pending_claims();
972
973 let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
974 let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
975 let mut last_ping_call = sleeper(PING_TIMER);
976 let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER);
977 let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
978 let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
979 let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
980 let mut have_pruned = false;
981 let mut have_decayed_scorer = false;
982
983 let mut last_forwards_processing_call = sleeper(batch_delay.get());
984
985 loop {
986 channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
987 chain_monitor.process_pending_events_async(async_event_handler).await;
988 if let Some(om) = &onion_messenger {
989 om.get_om().process_pending_events_async(async_event_handler).await
990 }
991
992 peer_manager.as_ref().process_events();
1004 match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
1005 sleeper(batch_delay.next())
1006 }) {
1007 Some(false) => {
1008 channel_manager.get_cm().process_pending_htlc_forwards();
1009 },
1010 Some(true) => break,
1011 None => {},
1012 }
1013
1014 let mut await_start = None;
1017 if mobile_interruptable_platform {
1018 await_start = Some(sleeper(Duration::from_secs(1)));
1019 }
1020 let om_fut = if let Some(om) = onion_messenger.as_ref() {
1021 let fut = om.get_om().get_update_future();
1022 OptionalSelector { optional_future: Some(fut) }
1023 } else {
1024 OptionalSelector { optional_future: None }
1025 };
1026 let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1027 let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1028 OptionalSelector { optional_future: Some(fut) }
1029 } else {
1030 OptionalSelector { optional_future: None }
1031 };
1032 let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
1033 let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
1034 (true, true) => batch_delay.get().min(Duration::from_millis(100)),
1035 (true, false) => batch_delay.get().min(FASTEST_TIMER),
1036 (false, true) => Duration::from_millis(100),
1037 (false, false) => FASTEST_TIMER,
1038 };
1039 let fut = Selector {
1040 a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1041 b: chain_monitor.get_update_future(),
1042 c: om_fut,
1043 d: lm_fut,
1044 e: sleeper(sleep_delay),
1045 };
1046 match fut.await {
1047 SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
1048 SelectorOutput::E(exit) => {
1049 if exit {
1050 break;
1051 }
1052 },
1053 }
1054
1055 let await_slow = if mobile_interruptable_platform {
1056 match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
1059 Some(true) => break,
1060 Some(false) => true,
1061 None => false,
1062 }
1063 } else {
1064 false
1065 };
1066 match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
1067 Some(false) => {
1068 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1069 channel_manager.get_cm().timer_tick_occurred();
1070 },
1071 Some(true) => break,
1072 None => {},
1073 }
1074
1075 let mut futures = Joiner::new();
1076
1077 if channel_manager.get_cm().get_and_clear_needs_persistence() {
1078 log_trace!(logger, "Persisting ChannelManager...");
1079
1080 let fut = async {
1081 kv_store
1082 .write(
1083 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1084 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1085 CHANNEL_MANAGER_PERSISTENCE_KEY,
1086 channel_manager.get_cm().encode(),
1087 )
1088 .await
1089 };
1090 let mut fut = Box::pin(fut);
1092
1093 use core::future::Future;
1099 let mut waker = dummy_waker();
1100 let mut ctx = task::Context::from_waker(&mut waker);
1101 match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1102 task::Poll::Ready(res) => futures.set_a_res(res),
1103 task::Poll::Pending => futures.set_a(fut),
1104 }
1105
1106 log_trace!(logger, "Done persisting ChannelManager.");
1107 }
1108
1109 let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
1115 NETWORK_PRUNE_TIMER
1116 } else {
1117 FIRST_NETWORK_PRUNE_TIMER
1118 };
1119 let prune_timer_elapsed = {
1120 match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
1121 Some(false) => true,
1122 Some(true) => break,
1123 None => false,
1124 }
1125 };
1126
1127 let should_prune = match gossip_sync {
1128 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1129 _ => prune_timer_elapsed,
1130 };
1131 if should_prune {
1132 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1134 if let Some(duration_since_epoch) = fetch_time() {
1135 log_trace!(logger, "Pruning and persisting network graph.");
1136 network_graph.remove_stale_channels_and_tracking_with_time(
1137 duration_since_epoch.as_secs(),
1138 );
1139 } else {
1140 log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
1141 log_trace!(logger, "Persisting network graph.");
1142 }
1143 let fut = async {
1144 if let Err(e) = kv_store
1145 .write(
1146 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1147 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1148 NETWORK_GRAPH_PERSISTENCE_KEY,
1149 network_graph.encode(),
1150 )
1151 .await
1152 {
1153 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1154 }
1155
1156 Ok(())
1157 };
1158
1159 futures.set_b(Box::pin(fut));
1161
1162 have_pruned = true;
1163 }
1164 }
1165 if !have_decayed_scorer {
1166 if let Some(ref scorer) = scorer {
1167 if let Some(duration_since_epoch) = fetch_time() {
1168 log_trace!(logger, "Calling time_passed on scorer at startup");
1169 scorer.write_lock().time_passed(duration_since_epoch);
1170 }
1171 }
1172 have_decayed_scorer = true;
1173 }
1174 match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1175 sleeper(SCORER_PERSIST_TIMER)
1176 }) {
1177 Some(false) => {
1178 if let Some(ref scorer) = scorer {
1179 if let Some(duration_since_epoch) = fetch_time() {
1180 log_trace!(logger, "Calling time_passed and persisting scorer");
1181 scorer.write_lock().time_passed(duration_since_epoch);
1182 } else {
1183 log_trace!(logger, "Persisting scorer");
1184 }
1185 let fut = async {
1186 if let Err(e) = kv_store
1187 .write(
1188 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1189 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1190 SCORER_PERSISTENCE_KEY,
1191 scorer.encode(),
1192 )
1193 .await
1194 {
1195 log_error!(
1196 logger,
1197 "Error: Failed to persist scorer, check your disk and permissions {}",
1198 e
1199 );
1200 }
1201
1202 Ok(())
1203 };
1204
1205 futures.set_c(Box::pin(fut));
1207 }
1208 },
1209 Some(true) => break,
1210 None => {},
1211 }
1212 match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1213 Some(false) => {
1214 log_trace!(logger, "Regenerating sweeper spends if necessary");
1215 if let Some(ref sweeper) = sweeper {
1216 let fut = async {
1217 let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1218
1219 Ok(())
1220 };
1221
1222 futures.set_d(Box::pin(fut));
1224 }
1225 },
1226 Some(true) => break,
1227 None => {},
1228 }
1229
1230 if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1231 log_trace!(logger, "Persisting LiquidityManager...");
1232 let fut = async {
1233 liquidity_manager.get_lm().persist().await.map_err(|e| {
1234 log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1235 e
1236 })
1237 };
1238 futures.set_e(Box::pin(fut));
1239 }
1240
1241 for res in futures.await {
1243 res?;
1244 }
1245
1246 match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1247 sleeper(ONION_MESSAGE_HANDLER_TIMER)
1248 }) {
1249 Some(false) => {
1250 if let Some(om) = &onion_messenger {
1251 log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1252 om.get_om().timer_tick_occurred();
1253 }
1254 },
1255 Some(true) => break,
1256 None => {},
1257 }
1258
1259 if await_slow {
1261 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
1274 peer_manager.as_ref().disconnect_all_peers();
1275 last_ping_call = sleeper(PING_TIMER);
1276 } else {
1277 match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
1278 Some(false) => {
1279 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1280 peer_manager.as_ref().timer_tick_occurred();
1281 },
1282 Some(true) => break,
1283 _ => {},
1284 }
1285 }
1286
1287 match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
1289 Some(false) => {
1290 log_trace!(logger, "Rebroadcasting monitor's pending claims");
1291 chain_monitor.rebroadcast_pending_claims();
1292 },
1293 Some(true) => break,
1294 None => {},
1295 }
1296 }
1297 log_trace!(logger, "Terminating background processor.");
1298
1299 kv_store
1303 .write(
1304 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1305 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1306 CHANNEL_MANAGER_PERSISTENCE_KEY,
1307 channel_manager.get_cm().encode(),
1308 )
1309 .await?;
1310 if let Some(ref scorer) = scorer {
1311 kv_store
1312 .write(
1313 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1314 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1315 SCORER_PERSISTENCE_KEY,
1316 scorer.encode(),
1317 )
1318 .await?;
1319 }
1320 if let Some(network_graph) = gossip_sync.network_graph() {
1321 kv_store
1322 .write(
1323 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1324 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1325 NETWORK_GRAPH_PERSISTENCE_KEY,
1326 network_graph.encode(),
1327 )
1328 .await?;
1329 }
1330 Ok(())
1331}
1332
1333fn check_and_reset_sleeper<
1334 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1335>(
1336 fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
1337) -> Option<bool> {
1338 let mut waker = dummy_waker();
1339 let mut ctx = task::Context::from_waker(&mut waker);
1340 match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1341 task::Poll::Ready(exit) => {
1342 *fut = new_sleeper();
1343 Some(exit)
1344 },
1345 task::Poll::Pending => None,
1346 }
1347}
1348
1349pub async fn process_events_async_with_kv_store_sync<
1352 UL: Deref,
1353 CF: Deref,
1354 T: Deref,
1355 F: Deref,
1356 G: Deref<Target = NetworkGraph<L>>,
1357 L: Deref,
1358 P: Deref,
1359 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1360 EventHandler: Fn(Event) -> EventHandlerFuture,
1361 ES: Deref,
1362 M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1363 CM: Deref,
1364 OM: Deref,
1365 PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1366 RGS: Deref<Target = RapidGossipSync<G, L>>,
1367 PM: Deref,
1368 LM: Deref,
1369 D: Deref,
1370 O: Deref,
1371 K: Deref,
1372 OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1373 S: Deref<Target = SC>,
1374 SC: for<'b> WriteableScore<'b>,
1375 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1376 Sleeper: Fn(Duration) -> SleepFuture,
1377 FetchTime: Fn() -> Option<Duration>,
1378>(
1379 kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1380 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1381 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1382 sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1383) -> Result<(), lightning::io::Error>
1384where
1385 UL::Target: UtxoLookup,
1386 CF::Target: chain::Filter,
1387 T::Target: BroadcasterInterface,
1388 F::Target: FeeEstimator,
1389 L::Target: Logger,
1390 P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1391 ES::Target: EntropySource,
1392 CM::Target: AChannelManager,
1393 OM::Target: AOnionMessenger,
1394 PM::Target: APeerManager,
1395 LM::Target: ALiquidityManager,
1396 O::Target: OutputSpender,
1397 D::Target: ChangeDestinationSourceSync,
1398 K::Target: KVStoreSync,
1399{
1400 let kv_store = KVStoreSyncWrapper(kv_store);
1401 process_events_async(
1402 kv_store,
1403 event_handler,
1404 chain_monitor,
1405 channel_manager,
1406 onion_messenger,
1407 gossip_sync,
1408 peer_manager,
1409 liquidity_manager,
1410 sweeper.as_ref().map(|os| os.sweeper_async()),
1411 logger,
1412 scorer,
1413 sleeper,
1414 mobile_interruptable_platform,
1415 fetch_time,
1416 )
1417 .await
1418}
1419
1420#[cfg(feature = "std")]
1421impl BackgroundProcessor {
1422 pub fn start<
1465 'a,
1466 UL: 'static + Deref,
1467 CF: 'static + Deref,
1468 T: 'static + Deref,
1469 F: 'static + Deref + Send,
1470 G: 'static + Deref<Target = NetworkGraph<L>>,
1471 L: 'static + Deref + Send,
1472 P: 'static + Deref,
1473 EH: 'static + EventHandler + Send,
1474 ES: 'static + Deref + Send,
1475 M: 'static
1476 + Deref<
1477 Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1478 >
1479 + Send
1480 + Sync,
1481 CM: 'static + Deref + Send,
1482 OM: 'static + Deref + Send,
1483 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
1484 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1485 PM: 'static + Deref + Send,
1486 LM: 'static + Deref + Send,
1487 S: 'static + Deref<Target = SC> + Send + Sync,
1488 SC: for<'b> WriteableScore<'b>,
1489 D: 'static + Deref,
1490 O: 'static + Deref,
1491 K: 'static + Deref + Send,
1492 OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1493 >(
1494 kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
1495 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1496 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1497 ) -> Self
1498 where
1499 UL::Target: 'static + UtxoLookup,
1500 CF::Target: 'static + chain::Filter,
1501 T::Target: 'static + BroadcasterInterface,
1502 F::Target: 'static + FeeEstimator,
1503 L::Target: 'static + Logger,
1504 P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1505 ES::Target: 'static + EntropySource,
1506 CM::Target: AChannelManager,
1507 OM::Target: AOnionMessenger,
1508 PM::Target: APeerManager,
1509 LM::Target: ALiquidityManagerSync,
1510 D::Target: ChangeDestinationSourceSync,
1511 O::Target: 'static + OutputSpender,
1512 K::Target: 'static + KVStoreSync,
1513 {
1514 let stop_thread = Arc::new(AtomicBool::new(false));
1515 let stop_thread_clone = Arc::clone(&stop_thread);
1516 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1517 let event_handler = |event| {
1518 let network_graph = gossip_sync.network_graph();
1519 if let Some(network_graph) = network_graph {
1520 handle_network_graph_update(network_graph, &event)
1521 }
1522 if let Some(ref scorer) = scorer {
1523 use std::time::SystemTime;
1524 let duration_since_epoch = SystemTime::now()
1525 .duration_since(SystemTime::UNIX_EPOCH)
1526 .expect("Time should be sometime after 1970");
1527 if update_scorer(scorer, &event, duration_since_epoch) {
1528 log_trace!(logger, "Persisting scorer after update");
1529 if let Err(e) = kv_store.write(
1530 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1531 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1532 SCORER_PERSISTENCE_KEY,
1533 scorer.encode(),
1534 ) {
1535 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1536 }
1537 }
1538 }
1539 event_handler.handle_event(event)
1540 };
1541 let mut batch_delay = BatchDelay::new();
1542
1543 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
1544 channel_manager.get_cm().timer_tick_occurred();
1545 log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1546 chain_monitor.rebroadcast_pending_claims();
1547
1548 let mut last_freshness_call = Instant::now();
1549 let mut last_onion_message_handler_call = Instant::now();
1550 let mut last_ping_call = Instant::now();
1551 let mut last_prune_call = Instant::now();
1552 let mut last_scorer_persist_call = Instant::now();
1553 let mut last_rebroadcast_call = Instant::now();
1554 let mut last_sweeper_call = Instant::now();
1555 let mut have_pruned = false;
1556 let mut have_decayed_scorer = false;
1557
1558 let mut cur_batch_delay = batch_delay.get();
1559 let mut last_forwards_processing_call = Instant::now();
1560
1561 loop {
1562 channel_manager.get_cm().process_pending_events(&event_handler);
1563 chain_monitor.process_pending_events(&event_handler);
1564 if let Some(om) = &onion_messenger {
1565 om.get_om().process_pending_events(&event_handler)
1566 };
1567
1568 peer_manager.as_ref().process_events();
1580 if last_forwards_processing_call.elapsed() > cur_batch_delay {
1581 channel_manager.get_cm().process_pending_htlc_forwards();
1582 cur_batch_delay = batch_delay.next();
1583 last_forwards_processing_call = Instant::now();
1584 }
1585 if stop_thread.load(Ordering::Acquire) {
1586 log_trace!(logger, "Terminating background processor.");
1587 break;
1588 }
1589 let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1590 (Some(om), Some(lm)) => Sleeper::from_four_futures(
1591 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1592 &chain_monitor.get_update_future(),
1593 &om.get_om().get_update_future(),
1594 &lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1595 ),
1596 (Some(om), None) => Sleeper::from_three_futures(
1597 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1598 &chain_monitor.get_update_future(),
1599 &om.get_om().get_update_future(),
1600 ),
1601 (None, Some(lm)) => Sleeper::from_three_futures(
1602 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1603 &chain_monitor.get_update_future(),
1604 &lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1605 ),
1606 (None, None) => Sleeper::from_two_futures(
1607 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1608 &chain_monitor.get_update_future(),
1609 ),
1610 };
1611 let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1612 batch_delay.get()
1613 } else {
1614 Duration::MAX
1615 };
1616 let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1617 sleeper.wait_timeout(fastest_timeout);
1618 if stop_thread.load(Ordering::Acquire) {
1619 log_trace!(logger, "Terminating background processor.");
1620 break;
1621 }
1622 if last_freshness_call.elapsed() > FRESHNESS_TIMER {
1623 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1624 channel_manager.get_cm().timer_tick_occurred();
1625 last_freshness_call = Instant::now();
1626 }
1627 if channel_manager.get_cm().get_and_clear_needs_persistence() {
1628 log_trace!(logger, "Persisting ChannelManager...");
1629 (kv_store.write(
1630 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1631 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1632 CHANNEL_MANAGER_PERSISTENCE_KEY,
1633 channel_manager.get_cm().encode(),
1634 ))?;
1635 log_trace!(logger, "Done persisting ChannelManager.");
1636 }
1637
1638 if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1639 log_trace!(logger, "Persisting LiquidityManager...");
1640 let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1641 log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1642 });
1643 }
1644
1645 let prune_timer =
1651 if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1652 let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
1653 let should_prune = match gossip_sync {
1654 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1655 _ => prune_timer_elapsed,
1656 };
1657 if should_prune {
1658 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1660 let duration_since_epoch = std::time::SystemTime::now()
1661 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1662 .expect("Time should be sometime after 1970");
1663
1664 log_trace!(logger, "Pruning and persisting network graph.");
1665 network_graph.remove_stale_channels_and_tracking_with_time(
1666 duration_since_epoch.as_secs(),
1667 );
1668 if let Err(e) = kv_store.write(
1669 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1670 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1671 NETWORK_GRAPH_PERSISTENCE_KEY,
1672 network_graph.encode(),
1673 ) {
1674 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
1675 }
1676 have_pruned = true;
1677 }
1678 last_prune_call = Instant::now();
1679 }
1680 if !have_decayed_scorer {
1681 if let Some(ref scorer) = scorer {
1682 let duration_since_epoch = std::time::SystemTime::now()
1683 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1684 .expect("Time should be sometime after 1970");
1685 log_trace!(logger, "Calling time_passed on scorer at startup");
1686 scorer.write_lock().time_passed(duration_since_epoch);
1687 }
1688 have_decayed_scorer = true;
1689 }
1690 if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER {
1691 if let Some(ref scorer) = scorer {
1692 let duration_since_epoch = std::time::SystemTime::now()
1693 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1694 .expect("Time should be sometime after 1970");
1695 log_trace!(logger, "Calling time_passed and persisting scorer");
1696 scorer.write_lock().time_passed(duration_since_epoch);
1697 if let Err(e) = kv_store.write(
1698 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1699 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1700 SCORER_PERSISTENCE_KEY,
1701 scorer.encode(),
1702 ) {
1703 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
1704 }
1705 }
1706 last_scorer_persist_call = Instant::now();
1707 }
1708 if last_sweeper_call.elapsed() > SWEEPER_TIMER {
1709 log_trace!(logger, "Regenerating sweeper spends if necessary");
1710 if let Some(ref sweeper) = sweeper {
1711 let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1712 }
1713 last_sweeper_call = Instant::now();
1714 }
1715 if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
1716 if let Some(om) = &onion_messenger {
1717 log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1718 om.get_om().timer_tick_occurred();
1719 }
1720 last_onion_message_handler_call = Instant::now();
1721 }
1722 if last_ping_call.elapsed() > PING_TIMER {
1723 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1724 peer_manager.as_ref().timer_tick_occurred();
1725 last_ping_call = Instant::now();
1726 }
1727 if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
1728 log_trace!(logger, "Rebroadcasting monitor's pending claims");
1729 chain_monitor.rebroadcast_pending_claims();
1730 last_rebroadcast_call = Instant::now();
1731 }
1732 }
1733
1734 kv_store.write(
1738 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1739 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1740 CHANNEL_MANAGER_PERSISTENCE_KEY,
1741 channel_manager.get_cm().encode(),
1742 )?;
1743 if let Some(ref scorer) = scorer {
1744 kv_store.write(
1745 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1746 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1747 SCORER_PERSISTENCE_KEY,
1748 scorer.encode(),
1749 )?;
1750 }
1751 if let Some(network_graph) = gossip_sync.network_graph() {
1752 kv_store.write(
1753 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1754 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1755 NETWORK_GRAPH_PERSISTENCE_KEY,
1756 network_graph.encode(),
1757 )?;
1758 }
1759 Ok(())
1760 });
1761 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1762 }
1763
1764 pub fn join(mut self) -> Result<(), std::io::Error> {
1774 assert!(self.thread_handle.is_some());
1775 self.join_thread()
1776 }
1777
1778 pub fn stop(mut self) -> Result<(), std::io::Error> {
1788 assert!(self.thread_handle.is_some());
1789 self.stop_and_join_thread()
1790 }
1791
1792 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1793 self.stop_thread.store(true, Ordering::Release);
1794 self.join_thread()
1795 }
1796
1797 fn join_thread(&mut self) -> Result<(), std::io::Error> {
1798 match self.thread_handle.take() {
1799 Some(handle) => handle.join().unwrap(),
1800 None => Ok(()),
1801 }
1802 }
1803}
1804
1805#[cfg(feature = "std")]
1806impl Drop for BackgroundProcessor {
1807 fn drop(&mut self) {
1808 self.stop_and_join_thread().unwrap();
1809 }
1810}
1811
1812#[cfg(all(feature = "std", test))]
1813mod tests {
1814 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1815 use bitcoin::constants::{genesis_block, ChainHash};
1816 use bitcoin::hashes::Hash;
1817 use bitcoin::locktime::absolute::LockTime;
1818 use bitcoin::network::Network;
1819 use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1820 use bitcoin::transaction::Version;
1821 use bitcoin::transaction::{Transaction, TxOut};
1822 use bitcoin::{Amount, ScriptBuf, Txid};
1823 use core::sync::atomic::{AtomicBool, Ordering};
1824 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1825 use lightning::chain::transaction::OutPoint;
1826 use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1827 use lightning::events::{Event, PathFailure, ReplayEvent};
1828 use lightning::ln::channelmanager;
1829 use lightning::ln::channelmanager::{
1830 ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1831 };
1832 use lightning::ln::functional_test_utils::*;
1833 use lightning::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, MessageSendEvent};
1834 use lightning::ln::peer_handler::{
1835 IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1836 };
1837 use lightning::ln::types::ChannelId;
1838 use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1839 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1840 use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1841 use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1842 use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager, NodeSigner};
1843 use lightning::types::features::{ChannelFeatures, NodeFeatures};
1844 use lightning::types::payment::PaymentHash;
1845 use lightning::util::config::UserConfig;
1846 use lightning::util::persist::{
1847 KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
1848 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1849 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1850 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1851 SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1852 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1853 };
1854 use lightning::util::ser::Writeable;
1855 use lightning::util::sweep::{
1856 OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1857 };
1858 use lightning::util::test_utils;
1859 use lightning::{get_event, get_event_msg};
1860 use lightning_liquidity::utils::time::DefaultTimeProvider;
1861 use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
1862 use lightning_persister::fs_store::FilesystemStore;
1863 use lightning_rapid_gossip_sync::RapidGossipSync;
1864 use std::collections::VecDeque;
1865 use std::path::PathBuf;
1866 use std::sync::mpsc::SyncSender;
1867 use std::sync::Arc;
1868 use std::time::Duration;
1869 use std::{env, fs};
1870
1871 const EVENT_DEADLINE: Duration =
1872 Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));
1873
1874 #[derive(Clone, Hash, PartialEq, Eq)]
1875 struct TestDescriptor {}
1876 impl SocketDescriptor for TestDescriptor {
1877 fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize {
1878 0
1879 }
1880
1881 fn disconnect_socket(&mut self) {}
1882 }
1883
1884 #[cfg(c_bindings)]
1885 type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1886 #[cfg(not(c_bindings))]
1887 type LockingWrapper<T> = std::sync::Mutex<T>;
1888
1889 type ChannelManager = channelmanager::ChannelManager<
1890 Arc<ChainMonitor>,
1891 Arc<test_utils::TestBroadcaster>,
1892 Arc<KeysManager>,
1893 Arc<KeysManager>,
1894 Arc<KeysManager>,
1895 Arc<test_utils::TestFeeEstimator>,
1896 Arc<
1897 DefaultRouter<
1898 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1899 Arc<test_utils::TestLogger>,
1900 Arc<KeysManager>,
1901 Arc<LockingWrapper<TestScorer>>,
1902 (),
1903 TestScorer,
1904 >,
1905 >,
1906 Arc<
1907 DefaultMessageRouter<
1908 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1909 Arc<test_utils::TestLogger>,
1910 Arc<KeysManager>,
1911 >,
1912 >,
1913 Arc<test_utils::TestLogger>,
1914 >;
1915
1916 type ChainMonitor = chainmonitor::ChainMonitor<
1917 InMemorySigner,
1918 Arc<test_utils::TestChainSource>,
1919 Arc<test_utils::TestBroadcaster>,
1920 Arc<test_utils::TestFeeEstimator>,
1921 Arc<test_utils::TestLogger>,
1922 Arc<Persister>,
1923 Arc<KeysManager>,
1924 >;
1925
1926 type PGS = Arc<
1927 P2PGossipSync<
1928 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1929 Arc<test_utils::TestChainSource>,
1930 Arc<test_utils::TestLogger>,
1931 >,
1932 >;
1933 type RGS = Arc<
1934 RapidGossipSync<
1935 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1936 Arc<test_utils::TestLogger>,
1937 >,
1938 >;
1939
1940 type OM = OnionMessenger<
1941 Arc<KeysManager>,
1942 Arc<KeysManager>,
1943 Arc<test_utils::TestLogger>,
1944 Arc<ChannelManager>,
1945 Arc<
1946 DefaultMessageRouter<
1947 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1948 Arc<test_utils::TestLogger>,
1949 Arc<KeysManager>,
1950 >,
1951 >,
1952 IgnoringMessageHandler,
1953 Arc<ChannelManager>,
1954 IgnoringMessageHandler,
1955 IgnoringMessageHandler,
1956 >;
1957
1958 type LM = LiquidityManagerSync<
1959 Arc<KeysManager>,
1960 Arc<KeysManager>,
1961 Arc<ChannelManager>,
1962 Arc<dyn Filter + Sync + Send>,
1963 Arc<Persister>,
1964 DefaultTimeProvider,
1965 Arc<test_utils::TestBroadcaster>,
1966 >;
1967
1968 struct Node {
1969 node: Arc<ChannelManager>,
1970 messenger: Arc<OM>,
1971 p2p_gossip_sync: PGS,
1972 rapid_gossip_sync: RGS,
1973 peer_manager: Arc<
1974 PeerManager<
1975 TestDescriptor,
1976 Arc<test_utils::TestChannelMessageHandler>,
1977 Arc<test_utils::TestRoutingMessageHandler>,
1978 Arc<OM>,
1979 Arc<test_utils::TestLogger>,
1980 IgnoringMessageHandler,
1981 Arc<KeysManager>,
1982 IgnoringMessageHandler,
1983 >,
1984 >,
1985 liquidity_manager: Arc<LM>,
1986 chain_monitor: Arc<ChainMonitor>,
1987 kv_store: Arc<Persister>,
1988 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1989 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1990 logger: Arc<test_utils::TestLogger>,
1991 best_block: BestBlock,
1992 scorer: Arc<LockingWrapper<TestScorer>>,
1993 sweeper: Arc<
1994 OutputSweeperSync<
1995 Arc<test_utils::TestBroadcaster>,
1996 Arc<TestWallet>,
1997 Arc<test_utils::TestFeeEstimator>,
1998 Arc<test_utils::TestChainSource>,
1999 Arc<Persister>,
2000 Arc<test_utils::TestLogger>,
2001 Arc<KeysManager>,
2002 >,
2003 >,
2004 }
2005
2006 impl Node {
2007 fn p2p_gossip_sync(
2008 &self,
2009 ) -> GossipSync<
2010 PGS,
2011 RGS,
2012 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2013 Arc<test_utils::TestChainSource>,
2014 Arc<test_utils::TestLogger>,
2015 > {
2016 GossipSync::P2P(Arc::clone(&self.p2p_gossip_sync))
2017 }
2018
2019 fn rapid_gossip_sync(
2020 &self,
2021 ) -> GossipSync<
2022 PGS,
2023 RGS,
2024 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2025 Arc<test_utils::TestChainSource>,
2026 Arc<test_utils::TestLogger>,
2027 > {
2028 GossipSync::Rapid(Arc::clone(&self.rapid_gossip_sync))
2029 }
2030
2031 fn no_gossip_sync(
2032 &self,
2033 ) -> GossipSync<
2034 PGS,
2035 RGS,
2036 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2037 Arc<test_utils::TestChainSource>,
2038 Arc<test_utils::TestLogger>,
2039 > {
2040 GossipSync::None
2041 }
2042 }
2043
2044 impl Drop for Node {
2045 fn drop(&mut self) {
2046 let data_dir = self.kv_store.get_data_dir();
2047 match fs::remove_dir_all(data_dir.clone()) {
2048 Err(e) => {
2049 println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
2050 },
2051 _ => {},
2052 }
2053 }
2054 }
2055
2056 struct Persister {
2057 graph_error: Option<(std::io::ErrorKind, &'static str)>,
2058 graph_persistence_notifier: Option<SyncSender<()>>,
2059 manager_error: Option<(std::io::ErrorKind, &'static str)>,
2060 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
2061 kv_store: FilesystemStore,
2062 }
2063
2064 impl Persister {
2065 fn new(data_dir: PathBuf) -> Self {
2066 let kv_store = FilesystemStore::new(data_dir);
2067 Self {
2068 graph_error: None,
2069 graph_persistence_notifier: None,
2070 manager_error: None,
2071 scorer_error: None,
2072 kv_store,
2073 }
2074 }
2075
2076 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2077 Self { graph_error: Some((error, message)), ..self }
2078 }
2079
2080 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
2081 Self { graph_persistence_notifier: Some(sender), ..self }
2082 }
2083
2084 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2085 Self { manager_error: Some((error, message)), ..self }
2086 }
2087
2088 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2089 Self { scorer_error: Some((error, message)), ..self }
2090 }
2091
2092 pub fn get_data_dir(&self) -> PathBuf {
2093 self.kv_store.get_data_dir()
2094 }
2095 }
2096
2097 impl KVStoreSync for Persister {
2098 fn read(
2099 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2100 ) -> lightning::io::Result<Vec<u8>> {
2101 self.kv_store.read(primary_namespace, secondary_namespace, key)
2102 }
2103
2104 fn write(
2105 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
2106 ) -> lightning::io::Result<()> {
2107 if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
2108 && secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
2109 && key == CHANNEL_MANAGER_PERSISTENCE_KEY
2110 {
2111 if let Some((error, message)) = self.manager_error {
2112 return Err(std::io::Error::new(error, message).into());
2113 }
2114 }
2115
2116 if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
2117 && secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
2118 && key == NETWORK_GRAPH_PERSISTENCE_KEY
2119 {
2120 if let Some(sender) = &self.graph_persistence_notifier {
2121 match sender.send(()) {
2122 Ok(()) => {},
2123 Err(std::sync::mpsc::SendError(())) => {
2124 println!("Persister failed to notify as receiver went away.")
2125 },
2126 }
2127 };
2128
2129 if let Some((error, message)) = self.graph_error {
2130 return Err(std::io::Error::new(error, message).into());
2131 }
2132 }
2133
2134 if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
2135 && secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
2136 && key == SCORER_PERSISTENCE_KEY
2137 {
2138 if let Some((error, message)) = self.scorer_error {
2139 return Err(std::io::Error::new(error, message).into());
2140 }
2141 }
2142
2143 self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
2144 }
2145
2146 fn remove(
2147 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
2148 ) -> lightning::io::Result<()> {
2149 self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
2150 }
2151
2152 fn list(
2153 &self, primary_namespace: &str, secondary_namespace: &str,
2154 ) -> lightning::io::Result<Vec<String>> {
2155 self.kv_store.list(primary_namespace, secondary_namespace)
2156 }
2157 }
2158
2159 struct TestScorer {
2160 event_expectations: Option<VecDeque<TestResult>>,
2161 }
2162
2163 #[derive(Debug)]
2164 enum TestResult {
2165 PaymentFailure { path: Path, short_channel_id: u64 },
2166 PaymentSuccess { path: Path },
2167 ProbeFailure { path: Path },
2168 ProbeSuccess { path: Path },
2169 }
2170
2171 impl TestScorer {
2172 fn new() -> Self {
2173 Self { event_expectations: None }
2174 }
2175
2176 fn expect(&mut self, expectation: TestResult) {
2177 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
2178 }
2179 }
2180
2181 impl lightning::util::ser::Writeable for TestScorer {
2182 fn write<W: lightning::util::ser::Writer>(
2183 &self, _: &mut W,
2184 ) -> Result<(), lightning::io::Error> {
2185 Ok(())
2186 }
2187 }
2188
2189 impl ScoreLookUp for TestScorer {
2190 type ScoreParams = ();
2191 fn channel_penalty_msat(
2192 &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
2193 _score_params: &Self::ScoreParams,
2194 ) -> u64 {
2195 unimplemented!();
2196 }
2197 }
2198
2199 impl ScoreUpdate for TestScorer {
2200 fn payment_path_failed(
2201 &mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
2202 ) {
2203 if let Some(expectations) = &mut self.event_expectations {
2204 match expectations.pop_front().unwrap() {
2205 TestResult::PaymentFailure { path, short_channel_id } => {
2206 assert_eq!(actual_path, &path);
2207 assert_eq!(actual_short_channel_id, short_channel_id);
2208 },
2209 TestResult::PaymentSuccess { path } => {
2210 panic!("Unexpected successful payment path: {:?}", path)
2211 },
2212 TestResult::ProbeFailure { path } => {
2213 panic!("Unexpected probe failure: {:?}", path)
2214 },
2215 TestResult::ProbeSuccess { path } => {
2216 panic!("Unexpected probe success: {:?}", path)
2217 },
2218 }
2219 }
2220 }
2221
2222 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
2223 if let Some(expectations) = &mut self.event_expectations {
2224 match expectations.pop_front().unwrap() {
2225 TestResult::PaymentFailure { path, .. } => {
2226 panic!("Unexpected payment path failure: {:?}", path)
2227 },
2228 TestResult::PaymentSuccess { path } => {
2229 assert_eq!(actual_path, &path);
2230 },
2231 TestResult::ProbeFailure { path } => {
2232 panic!("Unexpected probe failure: {:?}", path)
2233 },
2234 TestResult::ProbeSuccess { path } => {
2235 panic!("Unexpected probe success: {:?}", path)
2236 },
2237 }
2238 }
2239 }
2240
2241 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
2242 if let Some(expectations) = &mut self.event_expectations {
2243 match expectations.pop_front().unwrap() {
2244 TestResult::PaymentFailure { path, .. } => {
2245 panic!("Unexpected payment path failure: {:?}", path)
2246 },
2247 TestResult::PaymentSuccess { path } => {
2248 panic!("Unexpected payment path success: {:?}", path)
2249 },
2250 TestResult::ProbeFailure { path } => {
2251 assert_eq!(actual_path, &path);
2252 },
2253 TestResult::ProbeSuccess { path } => {
2254 panic!("Unexpected probe success: {:?}", path)
2255 },
2256 }
2257 }
2258 }
2259 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
2260 if let Some(expectations) = &mut self.event_expectations {
2261 match expectations.pop_front().unwrap() {
2262 TestResult::PaymentFailure { path, .. } => {
2263 panic!("Unexpected payment path failure: {:?}", path)
2264 },
2265 TestResult::PaymentSuccess { path } => {
2266 panic!("Unexpected payment path success: {:?}", path)
2267 },
2268 TestResult::ProbeFailure { path } => {
2269 panic!("Unexpected probe failure: {:?}", path)
2270 },
2271 TestResult::ProbeSuccess { path } => {
2272 assert_eq!(actual_path, &path);
2273 },
2274 }
2275 }
2276 }
2277 fn time_passed(&mut self, _: Duration) {}
2278 }
2279
2280 #[cfg(c_bindings)]
2281 impl lightning::routing::scoring::Score for TestScorer {}
2282
2283 impl Drop for TestScorer {
2284 fn drop(&mut self) {
2285 if std::thread::panicking() {
2286 return;
2287 }
2288
2289 if let Some(event_expectations) = &self.event_expectations {
2290 if !event_expectations.is_empty() {
2291 panic!("Unsatisfied event expectations: {:?}", event_expectations);
2292 }
2293 }
2294 }
2295 }
2296
2297 struct TestWallet {}
2298
2299 impl ChangeDestinationSourceSync for TestWallet {
2300 fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
2301 Ok(ScriptBuf::new())
2302 }
2303 }
2304
2305 fn get_full_filepath(filepath: String, filename: String) -> String {
2306 let mut path = PathBuf::from(filepath);
2307 path.push(filename);
2308 path.to_str().unwrap().to_string()
2309 }
2310
2311 fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
2312 let persist_temp_path = env::temp_dir().join(persist_dir);
2313 let persist_dir = persist_temp_path.to_string_lossy().to_string();
2314 let network = Network::Bitcoin;
2315 let mut nodes = Vec::new();
2316 for i in 0..num_nodes {
2317 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
2318 let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
2319 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
2320 let genesis_block = genesis_block(network);
2321 let network_graph = Arc::new(NetworkGraph::new(network, Arc::clone(&logger)));
2322 let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
2323 let now = Duration::from_secs(genesis_block.header.time as u64);
2324 let seed = [i as u8; 32];
2325 let keys_manager =
2326 Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2327 let router = Arc::new(DefaultRouter::new(
2328 Arc::clone(&network_graph),
2329 Arc::clone(&logger),
2330 Arc::clone(&keys_manager),
2331 Arc::clone(&scorer),
2332 Default::default(),
2333 ));
2334 let msg_router = Arc::new(DefaultMessageRouter::new(
2335 Arc::clone(&network_graph),
2336 Arc::clone(&keys_manager),
2337 ));
2338 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
2339 let kv_store =
2340 Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
2341 let now = Duration::from_secs(genesis_block.header.time as u64);
2342 let keys_manager =
2343 Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2344 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2345 Some(Arc::clone(&chain_source)),
2346 Arc::clone(&tx_broadcaster),
2347 Arc::clone(&logger),
2348 Arc::clone(&fee_estimator),
2349 Arc::clone(&kv_store),
2350 Arc::clone(&keys_manager),
2351 keys_manager.get_peer_storage_key(),
2352 ));
2353 let best_block = BestBlock::from_network(network);
2354 let params = ChainParameters { network, best_block };
2355 let manager = Arc::new(ChannelManager::new(
2356 Arc::clone(&fee_estimator),
2357 Arc::clone(&chain_monitor),
2358 Arc::clone(&tx_broadcaster),
2359 Arc::clone(&router),
2360 Arc::clone(&msg_router),
2361 Arc::clone(&logger),
2362 Arc::clone(&keys_manager),
2363 Arc::clone(&keys_manager),
2364 Arc::clone(&keys_manager),
2365 UserConfig::default(),
2366 params,
2367 genesis_block.header.time,
2368 ));
2369 let messenger = Arc::new(OnionMessenger::new(
2370 Arc::clone(&keys_manager),
2371 Arc::clone(&keys_manager),
2372 Arc::clone(&logger),
2373 Arc::clone(&manager),
2374 Arc::clone(&msg_router),
2375 IgnoringMessageHandler {},
2376 Arc::clone(&manager),
2377 IgnoringMessageHandler {},
2378 IgnoringMessageHandler {},
2379 ));
2380 let wallet = Arc::new(TestWallet {});
2381 let sweeper = Arc::new(OutputSweeperSync::new(
2382 best_block,
2383 Arc::clone(&tx_broadcaster),
2384 Arc::clone(&fee_estimator),
2385 None::<Arc<test_utils::TestChainSource>>,
2386 Arc::clone(&keys_manager),
2387 wallet,
2388 Arc::clone(&kv_store),
2389 Arc::clone(&logger),
2390 ));
2391 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
2392 Arc::clone(&network_graph),
2393 Some(Arc::clone(&chain_source)),
2394 Arc::clone(&logger),
2395 ));
2396 let rapid_gossip_sync =
2397 Arc::new(RapidGossipSync::new(Arc::clone(&network_graph), Arc::clone(&logger)));
2398 let msg_handler = MessageHandler {
2399 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
2400 ChainHash::using_genesis_block(Network::Testnet),
2401 )),
2402 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
2403 onion_message_handler: Arc::clone(&messenger),
2404 custom_message_handler: IgnoringMessageHandler {},
2405 send_only_message_handler: IgnoringMessageHandler {},
2406 };
2407 let peer_manager = Arc::new(PeerManager::new(
2408 msg_handler,
2409 0,
2410 &seed,
2411 Arc::clone(&logger),
2412 Arc::clone(&keys_manager),
2413 ));
2414 let liquidity_manager = Arc::new(
2415 LiquidityManagerSync::new(
2416 Arc::clone(&keys_manager),
2417 Arc::clone(&keys_manager),
2418 Arc::clone(&manager),
2419 None,
2420 None,
2421 Arc::clone(&kv_store),
2422 Arc::clone(&tx_broadcaster),
2423 None,
2424 None,
2425 )
2426 .unwrap(),
2427 );
2428 let node = Node {
2429 node: manager,
2430 p2p_gossip_sync,
2431 rapid_gossip_sync,
2432 peer_manager,
2433 liquidity_manager,
2434 chain_monitor,
2435 kv_store,
2436 tx_broadcaster,
2437 network_graph,
2438 logger,
2439 best_block,
2440 scorer,
2441 sweeper,
2442 messenger,
2443 };
2444 nodes.push(node);
2445 }
2446
2447 for i in 0..num_nodes {
2448 for j in (i + 1)..num_nodes {
2449 let init_i = Init {
2450 features: nodes[j].node.init_features(),
2451 networks: None,
2452 remote_network_address: None,
2453 };
2454 nodes[i]
2455 .node
2456 .peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
2457 .unwrap();
2458 let init_j = Init {
2459 features: nodes[i].node.init_features(),
2460 networks: None,
2461 remote_network_address: None,
2462 };
2463 nodes[j]
2464 .node
2465 .peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
2466 .unwrap();
2467 }
2468 }
2469
2470 (persist_dir, nodes)
2471 }
2472
2473 macro_rules! open_channel {
2474 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2475 begin_open_channel!($node_a, $node_b, $channel_value);
2476 let events = $node_a.node.get_and_clear_pending_events();
2477 assert_eq!(events.len(), 1);
2478 let (temporary_channel_id, tx) =
2479 handle_funding_generation_ready!(events[0], $channel_value);
2480 $node_a
2481 .node
2482 .funding_transaction_generated(
2483 temporary_channel_id,
2484 $node_b.node.get_our_node_id(),
2485 tx.clone(),
2486 )
2487 .unwrap();
2488 let msg_a = get_event_msg!(
2489 $node_a,
2490 MessageSendEvent::SendFundingCreated,
2491 $node_b.node.get_our_node_id()
2492 );
2493 $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2494 get_event!($node_b, Event::ChannelPending);
2495 let msg_b = get_event_msg!(
2496 $node_b,
2497 MessageSendEvent::SendFundingSigned,
2498 $node_a.node.get_our_node_id()
2499 );
2500 $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2501 get_event!($node_a, Event::ChannelPending);
2502 tx
2503 }};
2504 }
2505
2506 macro_rules! begin_open_channel {
2507 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2508 $node_a
2509 .node
2510 .create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
2511 .unwrap();
2512 let msg_a = get_event_msg!(
2513 $node_a,
2514 MessageSendEvent::SendOpenChannel,
2515 $node_b.node.get_our_node_id()
2516 );
2517 $node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
2518 let msg_b = get_event_msg!(
2519 $node_b,
2520 MessageSendEvent::SendAcceptChannel,
2521 $node_a.node.get_our_node_id()
2522 );
2523 $node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
2524 }};
2525 }
2526
2527 macro_rules! handle_funding_generation_ready {
2528 ($event: expr, $channel_value: expr) => {{
2529 match $event {
2530 Event::FundingGenerationReady {
2531 temporary_channel_id,
2532 channel_value_satoshis,
2533 ref output_script,
2534 user_channel_id,
2535 ..
2536 } => {
2537 assert_eq!(channel_value_satoshis, $channel_value);
2538 assert_eq!(user_channel_id, 42);
2539
2540 let tx = Transaction {
2541 version: Version::ONE,
2542 lock_time: LockTime::ZERO,
2543 input: Vec::new(),
2544 output: vec![TxOut {
2545 value: Amount::from_sat(channel_value_satoshis),
2546 script_pubkey: output_script.clone(),
2547 }],
2548 };
2549 (temporary_channel_id, tx)
2550 },
2551 _ => panic!("Unexpected event"),
2552 }
2553 }};
2554 }
2555
2556 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
2557 for i in 1..=depth {
2558 let prev_blockhash = node.best_block.block_hash;
2559 let height = node.best_block.height + 1;
2560 let header = create_dummy_header(prev_blockhash, height);
2561 let txdata = vec![(0, tx)];
2562 node.best_block = BestBlock::new(header.block_hash(), height);
2563 match i {
2564 1 => {
2565 node.node.transactions_confirmed(&header, &txdata, height);
2566 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
2567 node.sweeper.transactions_confirmed(&header, &txdata, height);
2568 },
2569 x if x == depth => {
2570 let block = (genesis_block(Network::Bitcoin), height);
2574 node.tx_broadcaster.blocks.lock().unwrap().push(block);
2575 node.node.best_block_updated(&header, height);
2576 node.chain_monitor.best_block_updated(&header, height);
2577 node.sweeper.best_block_updated(&header, height);
2578 },
2579 _ => {},
2580 }
2581 }
2582 }
2583
2584 fn advance_chain(node: &mut Node, num_blocks: u32) {
2585 for i in 1..=num_blocks {
2586 let prev_blockhash = node.best_block.block_hash;
2587 let height = node.best_block.height + 1;
2588 let header = create_dummy_header(prev_blockhash, height);
2589 node.best_block = BestBlock::new(header.block_hash(), height);
2590 if i == num_blocks {
2591 let block = (genesis_block(Network::Bitcoin), height);
2595 node.tx_broadcaster.blocks.lock().unwrap().push(block);
2596 node.node.best_block_updated(&header, height);
2597 node.chain_monitor.best_block_updated(&header, height);
2598 node.sweeper.best_block_updated(&header, height);
2599 }
2600 }
2601 }
2602
2603 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
2604 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
2605 }
2606
2607 #[test]
2608 fn test_background_processor() {
2609 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
2613
2614 let tx = open_channel!(nodes[0], nodes[1], 100000);
2618
2619 let data_dir = nodes[0].kv_store.get_data_dir();
2621 let persister = Arc::new(Persister::new(data_dir));
2622 let event_handler = |_: _| Ok(());
2623 let bg_processor = BackgroundProcessor::start(
2624 persister,
2625 event_handler,
2626 Arc::clone(&nodes[0].chain_monitor),
2627 Arc::clone(&nodes[0].node),
2628 Some(Arc::clone(&nodes[0].messenger)),
2629 nodes[0].p2p_gossip_sync(),
2630 Arc::clone(&nodes[0].peer_manager),
2631 Some(Arc::clone(&nodes[0].liquidity_manager)),
2632 Some(Arc::clone(&nodes[0].sweeper)),
2633 Arc::clone(&nodes[0].logger),
2634 Some(Arc::clone(&nodes[0].scorer)),
2635 );
2636
2637 macro_rules! check_persisted_data {
2638 ($node: expr, $filepath: expr) => {
2639 let mut expected_bytes = Vec::new();
2640 loop {
2641 expected_bytes.clear();
2642 match $node.write(&mut expected_bytes) {
2643 Ok(()) => match std::fs::read($filepath) {
2644 Ok(bytes) => {
2645 if bytes == expected_bytes {
2646 break;
2647 } else {
2648 continue;
2649 }
2650 },
2651 Err(_) => continue,
2652 },
2653 Err(e) => panic!("Unexpected error: {}", e),
2654 }
2655 }
2656 };
2657 }
2658
2659 let filepath =
2661 get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2662 check_persisted_data!(nodes[0].node, filepath.clone());
2663
2664 loop {
2665 if !nodes[0].node.get_event_or_persist_condvar_value() {
2666 break;
2667 }
2668 }
2669
2670 let error_message = "Channel force-closed";
2672 nodes[0]
2673 .node
2674 .force_close_broadcasting_latest_txn(
2675 &ChannelId::v1_from_funding_outpoint(OutPoint {
2676 txid: tx.compute_txid(),
2677 index: 0,
2678 }),
2679 &nodes[1].node.get_our_node_id(),
2680 error_message.to_string(),
2681 )
2682 .unwrap();
2683
2684 check_persisted_data!(nodes[0].node, filepath.clone());
2686 loop {
2687 if !nodes[0].node.get_event_or_persist_condvar_value() {
2688 break;
2689 }
2690 }
2691
2692 let filepath =
2694 get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2695 check_persisted_data!(nodes[0].network_graph, filepath.clone());
2696
2697 let filepath =
2699 get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2700 check_persisted_data!(nodes[0].scorer, filepath.clone());
2701
2702 if !std::thread::panicking() {
2703 bg_processor.stop().unwrap();
2704 }
2705 }
2706
2707 #[test]
2708 fn test_timer_tick_called() {
2709 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
2715 let data_dir = nodes[0].kv_store.get_data_dir();
2716 let persister = Arc::new(Persister::new(data_dir));
2717 let event_handler = |_: _| Ok(());
2718 let bg_processor = BackgroundProcessor::start(
2719 persister,
2720 event_handler,
2721 Arc::clone(&nodes[0].chain_monitor),
2722 Arc::clone(&nodes[0].node),
2723 Some(Arc::clone(&nodes[0].messenger)),
2724 nodes[0].no_gossip_sync(),
2725 Arc::clone(&nodes[0].peer_manager),
2726 Some(Arc::clone(&nodes[0].liquidity_manager)),
2727 Some(Arc::clone(&nodes[0].sweeper)),
2728 Arc::clone(&nodes[0].logger),
2729 Some(Arc::clone(&nodes[0].scorer)),
2730 );
2731 loop {
2732 let log_entries = nodes[0].logger.lines.lock().unwrap();
2733 let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
2734 let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
2735 let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
2736 let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
2737 if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
2738 && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
2739 && log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
2740 && log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
2741 {
2742 break;
2743 }
2744 }
2745
2746 if !std::thread::panicking() {
2747 bg_processor.stop().unwrap();
2748 }
2749 }
2750
2751 #[test]
2752 fn test_channel_manager_persist_error() {
2753 let (_, nodes) = create_nodes(2, "test_persist_error");
2755 open_channel!(nodes[0], nodes[1], 100000);
2756
2757 let data_dir = nodes[0].kv_store.get_data_dir();
2758 let persister = Arc::new(
2759 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2760 );
2761 let event_handler = |_: _| Ok(());
2762 let bg_processor = BackgroundProcessor::start(
2763 persister,
2764 event_handler,
2765 Arc::clone(&nodes[0].chain_monitor),
2766 Arc::clone(&nodes[0].node),
2767 Some(Arc::clone(&nodes[0].messenger)),
2768 nodes[0].no_gossip_sync(),
2769 Arc::clone(&nodes[0].peer_manager),
2770 Some(Arc::clone(&nodes[0].liquidity_manager)),
2771 Some(Arc::clone(&nodes[0].sweeper)),
2772 Arc::clone(&nodes[0].logger),
2773 Some(Arc::clone(&nodes[0].scorer)),
2774 );
2775 match bg_processor.join() {
2776 Ok(_) => panic!("Expected error persisting manager"),
2777 Err(e) => {
2778 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2779 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2780 },
2781 }
2782 }
2783
2784 #[tokio::test]
2785 async fn test_channel_manager_persist_error_async() {
2786 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
2788 open_channel!(nodes[0], nodes[1], 100000);
2789
2790 let data_dir = nodes[0].kv_store.get_data_dir();
2791 let kv_store_sync = Arc::new(
2792 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2793 );
2794 let kv_store = KVStoreSyncWrapper(kv_store_sync);
2795
2796 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
2798 &*(nodes[0].liquidity_manager.get_lm_async()
2799 as *const LiquidityManager<_, _, _, _, _, _, _>)
2800 as &'static LiquidityManager<_, _, _, _, _, _, _>
2801 };
2802 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2803 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2804 as &'static OutputSweeper<_, _, _, _, _, _, _>
2805 };
2806
2807 let bp_future = super::process_events_async(
2808 kv_store,
2809 |_: _| async { Ok(()) },
2810 Arc::clone(&nodes[0].chain_monitor),
2811 Arc::clone(&nodes[0].node),
2812 Some(Arc::clone(&nodes[0].messenger)),
2813 nodes[0].rapid_gossip_sync(),
2814 Arc::clone(&nodes[0].peer_manager),
2815 Some(lm_async),
2816 Some(sweeper_async),
2817 Arc::clone(&nodes[0].logger),
2818 Some(Arc::clone(&nodes[0].scorer)),
2819 move |dur: Duration| {
2820 Box::pin(async move {
2821 tokio::time::sleep(dur).await;
2822 false })
2824 },
2825 false,
2826 || Some(Duration::ZERO),
2827 );
2828 match bp_future.await {
2829 Ok(_) => panic!("Expected error persisting manager"),
2830 Err(e) => {
2831 assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2832 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2833 },
2834 }
2835 }
2836
2837 #[test]
2838 fn test_network_graph_persist_error() {
2839 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2841 let data_dir = nodes[0].kv_store.get_data_dir();
2842 let persister =
2843 Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2844 let event_handler = |_: _| Ok(());
2845 let bg_processor = BackgroundProcessor::start(
2846 persister,
2847 event_handler,
2848 Arc::clone(&nodes[0].chain_monitor),
2849 Arc::clone(&nodes[0].node),
2850 Some(Arc::clone(&nodes[0].messenger)),
2851 nodes[0].p2p_gossip_sync(),
2852 Arc::clone(&nodes[0].peer_manager),
2853 Some(Arc::clone(&nodes[0].liquidity_manager)),
2854 Some(Arc::clone(&nodes[0].sweeper)),
2855 Arc::clone(&nodes[0].logger),
2856 Some(Arc::clone(&nodes[0].scorer)),
2857 );
2858
2859 match bg_processor.stop() {
2860 Ok(_) => panic!("Expected error persisting network graph"),
2861 Err(e) => {
2862 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2863 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2864 },
2865 }
2866 }
2867
2868 #[test]
2869 fn test_scorer_persist_error() {
2870 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2872 let data_dir = nodes[0].kv_store.get_data_dir();
2873 let persister =
2874 Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2875 let event_handler = |_: _| Ok(());
2876 let bg_processor = BackgroundProcessor::start(
2877 persister,
2878 event_handler,
2879 Arc::clone(&nodes[0].chain_monitor),
2880 Arc::clone(&nodes[0].node),
2881 Some(Arc::clone(&nodes[0].messenger)),
2882 nodes[0].no_gossip_sync(),
2883 Arc::clone(&nodes[0].peer_manager),
2884 Some(Arc::clone(&nodes[0].liquidity_manager)),
2885 Some(Arc::clone(&nodes[0].sweeper)),
2886 Arc::clone(&nodes[0].logger),
2887 Some(Arc::clone(&nodes[0].scorer)),
2888 );
2889
2890 match bg_processor.stop() {
2891 Ok(_) => panic!("Expected error persisting scorer"),
2892 Err(e) => {
2893 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2894 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2895 },
2896 }
2897 }
2898
2899 #[test]
2900 fn test_background_event_handling() {
2901 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2902 let node_0_id = nodes[0].node.get_our_node_id();
2903 let node_1_id = nodes[1].node.get_our_node_id();
2904
2905 let channel_value = 100000;
2906 let data_dir = nodes[0].kv_store.get_data_dir();
2907 let persister = Arc::new(Persister::new(data_dir.clone()));
2908
2909 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2911 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2912 let event_handler = move |event: Event| {
2913 match event {
2914 Event::FundingGenerationReady { .. } => funding_generation_send
2915 .send(handle_funding_generation_ready!(event, channel_value))
2916 .unwrap(),
2917 Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2918 Event::ChannelReady { .. } => {},
2919 _ => panic!("Unexpected event: {:?}", event),
2920 }
2921 Ok(())
2922 };
2923
2924 let bg_processor = BackgroundProcessor::start(
2925 persister,
2926 event_handler,
2927 Arc::clone(&nodes[0].chain_monitor),
2928 Arc::clone(&nodes[0].node),
2929 Some(Arc::clone(&nodes[0].messenger)),
2930 nodes[0].no_gossip_sync(),
2931 Arc::clone(&nodes[0].peer_manager),
2932 Some(Arc::clone(&nodes[0].liquidity_manager)),
2933 Some(Arc::clone(&nodes[0].sweeper)),
2934 Arc::clone(&nodes[0].logger),
2935 Some(Arc::clone(&nodes[0].scorer)),
2936 );
2937
2938 begin_open_channel!(nodes[0], nodes[1], channel_value);
2940 let (temporary_channel_id, funding_tx) = funding_generation_recv
2941 .recv_timeout(EVENT_DEADLINE)
2942 .expect("FundingGenerationReady not handled within deadline");
2943 nodes[0]
2944 .node
2945 .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2946 .unwrap();
2947 let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2948 nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2949 get_event!(nodes[1], Event::ChannelPending);
2950 let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2951 nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2952 channel_pending_recv
2953 .recv_timeout(EVENT_DEADLINE)
2954 .expect("ChannelPending not handled within deadline");
2955
2956 confirm_transaction(&mut nodes[0], &funding_tx);
2958 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2959 confirm_transaction(&mut nodes[1], &funding_tx);
2960 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2961 nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2962 let _as_channel_update =
2963 get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2964 nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2965 let _bs_channel_update =
2966 get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2967 let broadcast_funding =
2968 nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2969 assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2970 assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2971
2972 if !std::thread::panicking() {
2973 bg_processor.stop().unwrap();
2974 }
2975
2976 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2978 let event_handler = move |event: Event| {
2979 match event {
2980 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2981 Event::ChannelReady { .. } => {},
2982 Event::ChannelClosed { .. } => {},
2983 _ => panic!("Unexpected event: {:?}", event),
2984 }
2985 Ok(())
2986 };
2987 let persister = Arc::new(Persister::new(data_dir));
2988 let bg_processor = BackgroundProcessor::start(
2989 persister,
2990 event_handler,
2991 Arc::clone(&nodes[0].chain_monitor),
2992 Arc::clone(&nodes[0].node),
2993 Some(Arc::clone(&nodes[0].messenger)),
2994 nodes[0].no_gossip_sync(),
2995 Arc::clone(&nodes[0].peer_manager),
2996 Some(Arc::clone(&nodes[0].liquidity_manager)),
2997 Some(Arc::clone(&nodes[0].sweeper)),
2998 Arc::clone(&nodes[0].logger),
2999 Some(Arc::clone(&nodes[0].scorer)),
3000 );
3001
3002 let error_message = "Channel force-closed";
3004 nodes[0]
3005 .node
3006 .force_close_broadcasting_latest_txn(
3007 &nodes[0].node.list_channels()[0].channel_id,
3008 &node_1_id,
3009 error_message.to_string(),
3010 )
3011 .unwrap();
3012 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
3013 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
3014
3015 let event =
3016 receiver.recv_timeout(EVENT_DEADLINE).expect("Events not handled within deadline");
3017 match event {
3018 Event::SpendableOutputs { outputs, channel_id } => {
3019 nodes[0]
3020 .sweeper
3021 .track_spendable_outputs(outputs, channel_id, false, Some(153))
3022 .unwrap();
3023 },
3024 _ => panic!("Unexpected event: {:?}", event),
3025 }
3026
3027 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3029 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3030 if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
3031 assert!(!tracked_output.is_spent_in(&sweep_tx_0));
3032 match tracked_output.status {
3033 OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
3034 assert_eq!(delayed_until_height, Some(153));
3035 },
3036 _ => panic!("Unexpected status"),
3037 }
3038 }
3039
3040 advance_chain(&mut nodes[0], 3);
3041
3042 let tx_broadcaster = Arc::clone(&nodes[0].tx_broadcaster);
3043 let wait_for_sweep_tx = || -> Transaction {
3044 loop {
3045 let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
3046 if let Some(sweep_tx) = sweep_tx {
3047 return sweep_tx;
3048 }
3049
3050 std::thread::sleep(Duration::from_millis(10));
3051 }
3052 };
3053
3054 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3056 let sweep_tx_0 = wait_for_sweep_tx();
3057 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3058 match tracked_output.status {
3059 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3060 assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
3061 },
3062 _ => panic!("Unexpected status"),
3063 }
3064
3065 advance_chain(&mut nodes[0], 1);
3067 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3068 let sweep_tx_1 = wait_for_sweep_tx();
3069 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3070 match tracked_output.status {
3071 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3072 assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
3073 },
3074 _ => panic!("Unexpected status"),
3075 }
3076 assert_ne!(sweep_tx_0, sweep_tx_1);
3077
3078 advance_chain(&mut nodes[0], 1);
3079 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3080 let sweep_tx_2 = wait_for_sweep_tx();
3081 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3082 match tracked_output.status {
3083 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3084 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3085 },
3086 _ => panic!("Unexpected status"),
3087 }
3088 assert_ne!(sweep_tx_0, sweep_tx_2);
3089 assert_ne!(sweep_tx_1, sweep_tx_2);
3090
3091 confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
3093 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3094 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3095 match tracked_output.status {
3096 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3097 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3098 },
3099 _ => panic!("Unexpected status"),
3100 }
3101
3102 let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
3106 nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
3107
3108 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3109 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3110 match tracked_output.status {
3111 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3112 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3113 },
3114 _ => panic!("Unexpected status"),
3115 }
3116
3117 confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, PRUNE_DELAY_BLOCKS);
3120 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
3121
3122 if !std::thread::panicking() {
3123 bg_processor.stop().unwrap();
3124 }
3125 }
3126
3127 #[test]
3128 fn test_event_handling_failures_are_replayed() {
3129 let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
3130 let channel_value = 100000;
3131 let data_dir = nodes[0].kv_store.get_data_dir();
3132 let persister = Arc::new(Persister::new(data_dir.clone()));
3133
3134 let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
3135 let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
3136 let should_fail_event_handling = Arc::new(AtomicBool::new(true));
3137 let event_handler = move |event: Event| {
3138 if let Ok(true) = should_fail_event_handling.compare_exchange(
3139 true,
3140 false,
3141 Ordering::Acquire,
3142 Ordering::Relaxed,
3143 ) {
3144 first_event_send.send(event).unwrap();
3145 return Err(ReplayEvent());
3146 }
3147
3148 second_event_send.send(event).unwrap();
3149 Ok(())
3150 };
3151
3152 let bg_processor = BackgroundProcessor::start(
3153 persister,
3154 event_handler,
3155 Arc::clone(&nodes[0].chain_monitor),
3156 Arc::clone(&nodes[0].node),
3157 Some(Arc::clone(&nodes[0].messenger)),
3158 nodes[0].no_gossip_sync(),
3159 Arc::clone(&nodes[0].peer_manager),
3160 Some(Arc::clone(&nodes[0].liquidity_manager)),
3161 Some(Arc::clone(&nodes[0].sweeper)),
3162 Arc::clone(&nodes[0].logger),
3163 Some(Arc::clone(&nodes[0].scorer)),
3164 );
3165
3166 begin_open_channel!(nodes[0], nodes[1], channel_value);
3167 assert_eq!(
3168 first_event_recv.recv_timeout(EVENT_DEADLINE).unwrap(),
3169 second_event_recv.recv_timeout(EVENT_DEADLINE).unwrap()
3170 );
3171
3172 if !std::thread::panicking() {
3173 bg_processor.stop().unwrap();
3174 }
3175 }
3176
3177 #[test]
3178 fn test_scorer_persistence() {
3179 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
3180 let data_dir = nodes[0].kv_store.get_data_dir();
3181 let persister = Arc::new(Persister::new(data_dir));
3182 let event_handler = |_: _| Ok(());
3183 let bg_processor = BackgroundProcessor::start(
3184 persister,
3185 event_handler,
3186 Arc::clone(&nodes[0].chain_monitor),
3187 Arc::clone(&nodes[0].node),
3188 Some(Arc::clone(&nodes[0].messenger)),
3189 nodes[0].no_gossip_sync(),
3190 Arc::clone(&nodes[0].peer_manager),
3191 Some(Arc::clone(&nodes[0].liquidity_manager)),
3192 Some(Arc::clone(&nodes[0].sweeper)),
3193 Arc::clone(&nodes[0].logger),
3194 Some(Arc::clone(&nodes[0].scorer)),
3195 );
3196
3197 loop {
3198 let log_entries = nodes[0].logger.lines.lock().unwrap();
3199 let expected_log = "Calling time_passed and persisting scorer".to_string();
3200 if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
3201 break;
3202 }
3203 }
3204
3205 if !std::thread::panicking() {
3206 bg_processor.stop().unwrap();
3207 }
3208 }
3209
3210 macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
3211 ($nodes: expr, $receive: expr, $sleep: expr) => {
3212 let features = ChannelFeatures::empty();
3213 $nodes[0]
3214 .network_graph
3215 .add_channel_from_partial_announcement(
3216 42,
3217 None,
3218 53,
3219 features,
3220 $nodes[0].node.get_our_node_id().into(),
3221 $nodes[1].node.get_our_node_id().into(),
3222 )
3223 .expect("Failed to update channel from partial announcement");
3224 let original_graph_description = $nodes[0].network_graph.to_string();
3225 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
3226 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
3227
3228 loop {
3229 $sleep;
3230 let log_entries = $nodes[0].logger.lines.lock().unwrap();
3231 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
3232 if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
3233 > 1
3234 {
3235 break;
3237 }
3238 }
3239
3240 let initialization_input = vec![
3241 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
3242 247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
3243 98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
3244 250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
3245 67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
3246 115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
3247 1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
3248 65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
3249 149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
3250 175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
3251 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
3252 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
3253 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
3254 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
3255 ];
3256 $nodes[0]
3257 .rapid_gossip_sync
3258 .update_network_graph_no_std(&initialization_input[..], Some(1642291930))
3259 .unwrap();
3260
3261 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
3263
3264 $receive.expect("Network graph not pruned within deadline");
3265
3266 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
3268 };
3269 }
3270
3271 #[test]
3272 fn test_not_pruning_network_graph_until_graph_sync_completion() {
3273 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3274
3275 let (_, nodes) =
3276 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
3277 let data_dir = nodes[0].kv_store.get_data_dir();
3278 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3279
3280 let event_handler = |_: _| Ok(());
3281 let background_processor = BackgroundProcessor::start(
3282 persister,
3283 event_handler,
3284 Arc::clone(&nodes[0].chain_monitor),
3285 Arc::clone(&nodes[0].node),
3286 Some(Arc::clone(&nodes[0].messenger)),
3287 nodes[0].rapid_gossip_sync(),
3288 Arc::clone(&nodes[0].peer_manager),
3289 Some(Arc::clone(&nodes[0].liquidity_manager)),
3290 Some(Arc::clone(&nodes[0].sweeper)),
3291 Arc::clone(&nodes[0].logger),
3292 Some(Arc::clone(&nodes[0].scorer)),
3293 );
3294
3295 do_test_not_pruning_network_graph_until_graph_sync_completion!(
3296 nodes,
3297 receiver.recv_timeout(super::FIRST_NETWORK_PRUNE_TIMER * 5),
3298 std::thread::sleep(Duration::from_millis(1))
3299 );
3300
3301 background_processor.stop().unwrap();
3302 }
3303
3304 #[tokio::test]
3305 async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
3306 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3307
3308 let (_, nodes) =
3309 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
3310 let data_dir = nodes[0].kv_store.get_data_dir();
3311 let kv_store_sync =
3312 Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3313 let kv_store = KVStoreSyncWrapper(kv_store_sync);
3314
3315 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3317 &*(nodes[0].liquidity_manager.get_lm_async()
3318 as *const LiquidityManager<_, _, _, _, _, _, _>)
3319 as &'static LiquidityManager<_, _, _, _, _, _, _>
3320 };
3321 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3322 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3323 as &'static OutputSweeper<_, _, _, _, _, _, _>
3324 };
3325
3326 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3327 let bp_future = super::process_events_async(
3328 kv_store,
3329 |_: _| async { Ok(()) },
3330 Arc::clone(&nodes[0].chain_monitor),
3331 Arc::clone(&nodes[0].node),
3332 Some(Arc::clone(&nodes[0].messenger)),
3333 nodes[0].rapid_gossip_sync(),
3334 Arc::clone(&nodes[0].peer_manager),
3335 Some(lm_async),
3336 Some(sweeper_async),
3337 Arc::clone(&nodes[0].logger),
3338 Some(Arc::clone(&nodes[0].scorer)),
3339 move |dur: Duration| {
3340 let mut exit_receiver = exit_receiver.clone();
3341 Box::pin(async move {
3342 tokio::select! {
3343 _ = tokio::time::sleep(dur) => false,
3344 _ = exit_receiver.changed() => true,
3345 }
3346 })
3347 },
3348 false,
3349 || Some(Duration::from_secs(1696300000)),
3350 );
3351
3352 let t1 = tokio::spawn(bp_future);
3353 let t2 = tokio::spawn(async move {
3354 do_test_not_pruning_network_graph_until_graph_sync_completion!(
3355 nodes,
3356 {
3357 let mut i = 0;
3358 loop {
3359 tokio::time::sleep(super::FIRST_NETWORK_PRUNE_TIMER).await;
3360 if let Ok(()) = receiver.try_recv() {
3361 break Ok::<(), ()>(());
3362 }
3363 assert!(i < 5);
3364 i += 1;
3365 }
3366 },
3367 tokio::time::sleep(Duration::from_millis(1)).await
3368 );
3369 exit_sender.send(()).unwrap();
3370 });
3371 let (r1, r2) = tokio::join!(t1, t2);
3372 r1.unwrap().unwrap();
3373 r2.unwrap()
3374 }
3375
3376 macro_rules! do_test_payment_path_scoring {
3377 ($nodes: expr, $receive: expr) => {
3378 let scored_scid = 4242;
3384 let secp_ctx = Secp256k1::new();
3385 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
3386 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
3387
3388 let path = Path { hops: vec![RouteHop {
3389 pubkey: node_1_id,
3390 node_features: NodeFeatures::empty(),
3391 short_channel_id: scored_scid,
3392 channel_features: ChannelFeatures::empty(),
3393 fee_msat: 0,
3394 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
3395 maybe_announced_channel: true,
3396 }], blinded_tail: None };
3397
3398 $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
3399 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3400 payment_id: None,
3401 payment_hash: PaymentHash([42; 32]),
3402 payment_failed_permanently: false,
3403 failure: PathFailure::OnPath { network_update: None },
3404 path: path.clone(),
3405 short_channel_id: Some(scored_scid),
3406 error_code: None,
3407 error_data: None,
3408 hold_times: Vec::new(),
3409 });
3410 let event = $receive.expect("PaymentPathFailed not handled within deadline");
3411 match event {
3412 Event::PaymentPathFailed { .. } => {},
3413 _ => panic!("Unexpected event"),
3414 }
3415
3416 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3419 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3420 payment_id: None,
3421 payment_hash: PaymentHash([42; 32]),
3422 payment_failed_permanently: true,
3423 failure: PathFailure::OnPath { network_update: None },
3424 path: path.clone(),
3425 short_channel_id: None,
3426 error_code: None,
3427 error_data: None,
3428 hold_times: Vec::new(),
3429 });
3430 let event = $receive.expect("PaymentPathFailed not handled within deadline");
3431 match event {
3432 Event::PaymentPathFailed { .. } => {},
3433 _ => panic!("Unexpected event"),
3434 }
3435
3436 $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
3437 $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
3438 payment_id: PaymentId([42; 32]),
3439 payment_hash: None,
3440 path: path.clone(),
3441 hold_times: Vec::new(),
3442 });
3443 let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
3444 match event {
3445 Event::PaymentPathSuccessful { .. } => {},
3446 _ => panic!("Unexpected event"),
3447 }
3448
3449 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3450 $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
3451 payment_id: PaymentId([42; 32]),
3452 payment_hash: PaymentHash([42; 32]),
3453 path: path.clone(),
3454 });
3455 let event = $receive.expect("ProbeSuccessful not handled within deadline");
3456 match event {
3457 Event::ProbeSuccessful { .. } => {},
3458 _ => panic!("Unexpected event"),
3459 }
3460
3461 $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
3462 $nodes[0].node.push_pending_event(Event::ProbeFailed {
3463 payment_id: PaymentId([42; 32]),
3464 payment_hash: PaymentHash([42; 32]),
3465 path,
3466 short_channel_id: Some(scored_scid),
3467 });
3468 let event = $receive.expect("ProbeFailure not handled within deadline");
3469 match event {
3470 Event::ProbeFailed { .. } => {},
3471 _ => panic!("Unexpected event"),
3472 }
3473 }
3474 }
3475
3476 #[test]
3477 fn test_payment_path_scoring() {
3478 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3479 let event_handler = move |event: Event| {
3480 match event {
3481 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
3482 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
3483 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
3484 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
3485 _ => panic!("Unexpected event: {:?}", event),
3486 }
3487 Ok(())
3488 };
3489
3490 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
3491 let data_dir = nodes[0].kv_store.get_data_dir();
3492 let persister = Arc::new(Persister::new(data_dir));
3493 let bg_processor = BackgroundProcessor::start(
3494 persister,
3495 event_handler,
3496 Arc::clone(&nodes[0].chain_monitor),
3497 Arc::clone(&nodes[0].node),
3498 Some(Arc::clone(&nodes[0].messenger)),
3499 nodes[0].no_gossip_sync(),
3500 Arc::clone(&nodes[0].peer_manager),
3501 Some(Arc::clone(&nodes[0].liquidity_manager)),
3502 Some(Arc::clone(&nodes[0].sweeper)),
3503 Arc::clone(&nodes[0].logger),
3504 Some(Arc::clone(&nodes[0].scorer)),
3505 );
3506
3507 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(EVENT_DEADLINE));
3508
3509 if !std::thread::panicking() {
3510 bg_processor.stop().unwrap();
3511 }
3512
3513 let log_entries = nodes[0].logger.lines.lock().unwrap();
3514 let expected_log = "Persisting scorer after update".to_string();
3515 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
3516 }
3517
3518 #[tokio::test]
3519 async fn test_payment_path_scoring_async() {
3520 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
3521 let event_handler = move |event: Event| {
3522 let sender_ref = sender.clone();
3523 async move {
3524 match event {
3525 Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
3526 Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3527 Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3528 Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
3529 _ => panic!("Unexpected event: {:?}", event),
3530 }
3531 Ok(())
3532 }
3533 };
3534
3535 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
3536 let data_dir = nodes[0].kv_store.get_data_dir();
3537 let kv_store_sync = Arc::new(Persister::new(data_dir));
3538 let kv_store = KVStoreSyncWrapper(kv_store_sync);
3539
3540 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3541
3542 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3544 &*(nodes[0].liquidity_manager.get_lm_async()
3545 as *const LiquidityManager<_, _, _, _, _, _, _>)
3546 as &'static LiquidityManager<_, _, _, _, _, _, _>
3547 };
3548 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3549 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3550 as &'static OutputSweeper<_, _, _, _, _, _, _>
3551 };
3552
3553 let bp_future = super::process_events_async(
3554 kv_store,
3555 event_handler,
3556 Arc::clone(&nodes[0].chain_monitor),
3557 Arc::clone(&nodes[0].node),
3558 Some(Arc::clone(&nodes[0].messenger)),
3559 nodes[0].no_gossip_sync(),
3560 Arc::clone(&nodes[0].peer_manager),
3561 Some(lm_async),
3562 Some(sweeper_async),
3563 Arc::clone(&nodes[0].logger),
3564 Some(Arc::clone(&nodes[0].scorer)),
3565 move |dur: Duration| {
3566 let mut exit_receiver = exit_receiver.clone();
3567 Box::pin(async move {
3568 tokio::select! {
3569 _ = tokio::time::sleep(dur) => false,
3570 _ = exit_receiver.changed() => true,
3571 }
3572 })
3573 },
3574 false,
3575 || Some(Duration::ZERO),
3576 );
3577 let t1 = tokio::spawn(bp_future);
3578 let t2 = tokio::spawn(async move {
3579 do_test_payment_path_scoring!(nodes, receiver.recv().await);
3580 exit_sender.send(()).unwrap();
3581
3582 let log_entries = nodes[0].logger.lines.lock().unwrap();
3583 let expected_log = "Persisting scorer after update".to_string();
3584 assert_eq!(
3585 *log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
3586 5
3587 );
3588 });
3589
3590 let (r1, r2) = tokio::join!(t1, t2);
3591 r1.unwrap().unwrap();
3592 r2.unwrap()
3593 }
3594
3595 #[tokio::test]
3596 #[cfg(not(c_bindings))]
3597 async fn test_no_consts() {
3598 let (_, nodes) = create_nodes(1, "test_no_consts");
3600 let bg_processor = BackgroundProcessor::start(
3601 Arc::clone(&nodes[0].kv_store),
3602 move |_: Event| Ok(()),
3603 Arc::clone(&nodes[0].chain_monitor),
3604 Arc::clone(&nodes[0].node),
3605 crate::NO_ONION_MESSENGER,
3606 nodes[0].no_gossip_sync(),
3607 Arc::clone(&nodes[0].peer_manager),
3608 crate::NO_LIQUIDITY_MANAGER_SYNC,
3609 Some(Arc::clone(&nodes[0].sweeper)),
3610 Arc::clone(&nodes[0].logger),
3611 Some(Arc::clone(&nodes[0].scorer)),
3612 );
3613
3614 if !std::thread::panicking() {
3615 bg_processor.stop().unwrap();
3616 }
3617
3618 let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
3619 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3620 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3621 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3622 as &'static OutputSweeper<_, _, _, _, _, _, _>
3623 };
3624 let bp_future = super::process_events_async(
3625 kv_store,
3626 move |_: Event| async move { Ok(()) },
3627 Arc::clone(&nodes[0].chain_monitor),
3628 Arc::clone(&nodes[0].node),
3629 crate::NO_ONION_MESSENGER,
3630 nodes[0].no_gossip_sync(),
3631 Arc::clone(&nodes[0].peer_manager),
3632 crate::NO_LIQUIDITY_MANAGER,
3633 Some(sweeper_async),
3634 Arc::clone(&nodes[0].logger),
3635 Some(Arc::clone(&nodes[0].scorer)),
3636 move |dur: Duration| {
3637 let mut exit_receiver = exit_receiver.clone();
3638 Box::pin(async move {
3639 tokio::select! {
3640 _ = tokio::time::sleep(dur) => false,
3641 _ = exit_receiver.changed() => true,
3642 }
3643 })
3644 },
3645 false,
3646 || Some(Duration::ZERO),
3647 );
3648 let t1 = tokio::spawn(bp_future);
3649 exit_sender.send(()).unwrap();
3650 t1.await.unwrap().unwrap();
3651 }
3652}