1#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
13#![deny(rustdoc::broken_intra_doc_links)]
14#![deny(rustdoc::private_intra_doc_links)]
15#![deny(missing_docs)]
16#![cfg_attr(docsrs, feature(doc_cfg))]
17#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
18
19#[cfg(any(test, feature = "std"))]
20extern crate core;
21
22#[cfg(not(feature = "std"))]
23extern crate alloc;
24
25#[macro_use]
26extern crate lightning;
27extern crate lightning_rapid_gossip_sync;
28
29mod fwd_batch;
30
31use fwd_batch::BatchDelay;
32
33use lightning::chain;
34use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35use lightning::chain::chainmonitor::{ChainMonitor, Persist};
36#[cfg(feature = "std")]
37use lightning::events::EventHandler;
38#[cfg(feature = "std")]
39use lightning::events::EventsProvider;
40use lightning::events::ReplayEvent;
41use lightning::events::{Event, PathFailure};
42use lightning::util::ser::Writeable;
43
44use lightning::ln::channelmanager::AChannelManager;
45use lightning::ln::msgs::OnionMessageHandler;
46use lightning::ln::peer_handler::APeerManager;
47use lightning::onion_message::messenger::AOnionMessenger;
48use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
49use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
50use lightning::routing::utxo::UtxoLookup;
51use lightning::sign::{
52 ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
53};
54use lightning::util::logger::Logger;
55use lightning::util::persist::{
56 KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
57 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
58 NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
59 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY,
60 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
61};
62use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
63#[cfg(feature = "std")]
64use lightning::util::wakers::Sleeper;
65use lightning_rapid_gossip_sync::RapidGossipSync;
66
67use lightning_liquidity::ALiquidityManager;
68#[cfg(feature = "std")]
69use lightning_liquidity::ALiquidityManagerSync;
70
71use core::ops::Deref;
72use core::time::Duration;
73
74#[cfg(feature = "std")]
75use core::sync::atomic::{AtomicBool, Ordering};
76#[cfg(feature = "std")]
77use std::sync::Arc;
78#[cfg(feature = "std")]
79use std::thread::{self, JoinHandle};
80#[cfg(feature = "std")]
81use std::time::Instant;
82
83#[cfg(not(feature = "std"))]
84use alloc::boxed::Box;
85#[cfg(all(not(c_bindings), not(feature = "std")))]
86use alloc::sync::Arc;
87
88#[cfg(feature = "std")]
117#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
118pub struct BackgroundProcessor {
119 stop_thread: Arc<AtomicBool>,
120 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
121}
122
123#[cfg(not(test))]
124const FRESHNESS_TIMER: Duration = Duration::from_secs(60);
125#[cfg(test)]
126const FRESHNESS_TIMER: Duration = Duration::from_secs(1);
127
128#[cfg(all(not(test), not(debug_assertions)))]
129const PING_TIMER: Duration = Duration::from_secs(10);
130#[cfg(all(not(test), debug_assertions))]
134const PING_TIMER: Duration = Duration::from_secs(30);
135#[cfg(test)]
136const PING_TIMER: Duration = Duration::from_secs(1);
137
138#[cfg(not(test))]
139const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(10);
140#[cfg(test)]
141const ONION_MESSAGE_HANDLER_TIMER: Duration = Duration::from_secs(1);
142
143const NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60 * 60);
145
146#[cfg(not(test))]
147const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(60 * 5);
148#[cfg(test)]
149const SCORER_PERSIST_TIMER: Duration = Duration::from_secs(1);
150
151#[cfg(not(test))]
152const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(60);
153#[cfg(test)]
154const FIRST_NETWORK_PRUNE_TIMER: Duration = Duration::from_secs(1);
155
156#[cfg(not(test))]
157const REBROADCAST_TIMER: Duration = Duration::from_secs(30);
158#[cfg(test)]
159const REBROADCAST_TIMER: Duration = Duration::from_secs(1);
160
161#[cfg(not(test))]
162const SWEEPER_TIMER: Duration = Duration::from_secs(30);
163#[cfg(test)]
164const SWEEPER_TIMER: Duration = Duration::from_secs(1);
165
166const fn min_duration(a: Duration, b: Duration) -> Duration {
168 if a.as_nanos() < b.as_nanos() {
169 a
170 } else {
171 b
172 }
173}
174const FASTEST_TIMER: Duration = min_duration(
175 min_duration(FRESHNESS_TIMER, PING_TIMER),
176 min_duration(SCORER_PERSIST_TIMER, min_duration(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
177);
178
179pub enum GossipSync<
181 P: Deref<Target = P2PGossipSync<G, U, L>>,
182 R: Deref<Target = RapidGossipSync<G, L>>,
183 G: Deref<Target = NetworkGraph<L>>,
184 U: Deref,
185 L: Deref,
186> where
187 U::Target: UtxoLookup,
188 L::Target: Logger,
189{
190 P2P(P),
192 Rapid(R),
194 None,
196}
197
198impl<
199 P: Deref<Target = P2PGossipSync<G, U, L>>,
200 R: Deref<Target = RapidGossipSync<G, L>>,
201 G: Deref<Target = NetworkGraph<L>>,
202 U: Deref,
203 L: Deref,
204 > GossipSync<P, R, G, U, L>
205where
206 U::Target: UtxoLookup,
207 L::Target: Logger,
208{
209 fn network_graph(&self) -> Option<&G> {
210 match self {
211 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
212 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
213 GossipSync::None => None,
214 }
215 }
216
217 fn prunable_network_graph(&self) -> Option<&G> {
218 match self {
219 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
220 GossipSync::Rapid(gossip_sync) => {
221 if gossip_sync.is_initial_sync_complete() {
222 Some(gossip_sync.network_graph())
223 } else {
224 None
225 }
226 },
227 GossipSync::None => None,
228 }
229 }
230}
231
232impl<
234 P: Deref<Target = P2PGossipSync<G, U, L>>,
235 G: Deref<Target = NetworkGraph<L>>,
236 U: Deref,
237 L: Deref,
238 > GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
239where
240 U::Target: UtxoLookup,
241 L::Target: Logger,
242{
243 pub fn p2p(gossip_sync: P) -> Self {
245 GossipSync::P2P(gossip_sync)
246 }
247}
248
249impl<
251 'a,
252 R: Deref<Target = RapidGossipSync<G, L>>,
253 G: Deref<Target = NetworkGraph<L>>,
254 L: Deref,
255 >
256 GossipSync<
257 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
258 R,
259 G,
260 &'a (dyn UtxoLookup + Send + Sync),
261 L,
262 > where
263 L::Target: Logger,
264{
265 pub fn rapid(gossip_sync: R) -> Self {
267 GossipSync::Rapid(gossip_sync)
268 }
269}
270
271impl<'a, L: Deref>
273 GossipSync<
274 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
275 &RapidGossipSync<&'a NetworkGraph<L>, L>,
276 &'a NetworkGraph<L>,
277 &'a (dyn UtxoLookup + Send + Sync),
278 L,
279 > where
280 L::Target: Logger,
281{
282 pub fn none() -> Self {
284 GossipSync::None
285 }
286}
287
288fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
289where
290 L::Target: Logger,
291{
292 if let Event::PaymentPathFailed {
293 failure: PathFailure::OnPath { network_update: Some(ref upd) },
294 ..
295 } = event
296 {
297 network_graph.handle_network_update(upd);
298 }
299}
300
301fn update_scorer<'a, S: Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
304 scorer: &'a S, event: &Event, duration_since_epoch: Duration,
305) -> bool {
306 match event {
307 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
308 let mut score = scorer.write_lock();
309 score.payment_path_failed(path, *scid, duration_since_epoch);
310 },
311 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
312 let mut score = scorer.write_lock();
315 score.probe_successful(path, duration_since_epoch);
316 },
317 Event::PaymentPathSuccessful { path, .. } => {
318 let mut score = scorer.write_lock();
319 score.payment_path_successful(path, duration_since_epoch);
320 },
321 Event::ProbeSuccessful { path, .. } => {
322 let mut score = scorer.write_lock();
323 score.probe_successful(path, duration_since_epoch);
324 },
325 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
326 let mut score = scorer.write_lock();
327 score.probe_failed(path, *scid, duration_since_epoch);
328 },
329 _ => return false,
330 }
331 true
332}
333
334#[cfg(all(not(c_bindings), feature = "std"))]
335type ScorerWrapper<T> = std::sync::RwLock<T>;
336
337#[cfg(all(not(c_bindings), not(feature = "std")))]
338type ScorerWrapper<T> = core::cell::RefCell<T>;
339
340#[cfg(not(c_bindings))]
341type DynRouter = lightning::routing::router::DefaultRouter<
342 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
343 &'static (dyn Logger + Send + Sync),
344 &'static (dyn EntropySource + Send + Sync),
345 &'static ScorerWrapper<
346 lightning::routing::scoring::ProbabilisticScorer<
347 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
348 &'static (dyn Logger + Send + Sync),
349 >,
350 >,
351 lightning::routing::scoring::ProbabilisticScoringFeeParameters,
352 lightning::routing::scoring::ProbabilisticScorer<
353 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
354 &'static (dyn Logger + Send + Sync),
355 >,
356>;
357
358#[cfg(not(c_bindings))]
359type DynMessageRouter = lightning::onion_message::messenger::DefaultMessageRouter<
360 &'static NetworkGraph<&'static (dyn Logger + Send + Sync)>,
361 &'static (dyn Logger + Send + Sync),
362 &'static (dyn EntropySource + Send + Sync),
363>;
364
365#[cfg(all(not(c_bindings), not(taproot)))]
366type DynSignerProvider = dyn lightning::sign::SignerProvider<EcdsaSigner = lightning::sign::InMemorySigner>
367 + Send
368 + Sync;
369
370#[cfg(all(not(c_bindings), taproot))]
371type DynSignerProvider = (dyn lightning::sign::SignerProvider<
372 EcdsaSigner = lightning::sign::InMemorySigner,
373 TaprootSigner = lightning::sign::InMemorySigner,
374> + Send
375 + Sync);
376
377#[cfg(not(c_bindings))]
378type DynChannelManager = lightning::ln::channelmanager::ChannelManager<
379 &'static (dyn chain::Watch<lightning::sign::InMemorySigner> + Send + Sync),
380 &'static (dyn BroadcasterInterface + Send + Sync),
381 &'static (dyn EntropySource + Send + Sync),
382 &'static (dyn lightning::sign::NodeSigner + Send + Sync),
383 &'static DynSignerProvider,
384 &'static (dyn FeeEstimator + Send + Sync),
385 &'static DynRouter,
386 &'static DynMessageRouter,
387 &'static (dyn Logger + Send + Sync),
388>;
389
390#[cfg(not(c_bindings))]
393pub const NO_ONION_MESSENGER: Option<
394 Arc<
395 dyn AOnionMessenger<
396 EntropySource = dyn EntropySource + Send + Sync,
397 ES = &(dyn EntropySource + Send + Sync),
398 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
399 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
400 Logger = dyn Logger + Send + Sync,
401 L = &'static (dyn Logger + Send + Sync),
402 NodeIdLookUp = DynChannelManager,
403 NL = &'static DynChannelManager,
404 MessageRouter = DynMessageRouter,
405 MR = &'static DynMessageRouter,
406 OffersMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
407 OMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
408 AsyncPaymentsMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
409 APH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
410 DNSResolverMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
411 DRH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
412 CustomOnionMessageHandler = lightning::ln::peer_handler::IgnoringMessageHandler,
413 CMH = &'static lightning::ln::peer_handler::IgnoringMessageHandler,
414 > + Send
415 + Sync,
416 >,
417> = None;
418
419#[cfg(not(c_bindings))]
422pub const NO_LIQUIDITY_MANAGER: Option<
423 Arc<
424 dyn ALiquidityManager<
425 EntropySource = dyn EntropySource + Send + Sync,
426 ES = &(dyn EntropySource + Send + Sync),
427 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
428 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
429 AChannelManager = DynChannelManager,
430 CM = &DynChannelManager,
431 Filter = dyn chain::Filter + Send + Sync,
432 C = &(dyn chain::Filter + Send + Sync),
433 KVStore = dyn lightning::util::persist::KVStore + Send + Sync,
434 K = &(dyn lightning::util::persist::KVStore + Send + Sync),
435 TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
436 TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
437 BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
438 + Send
439 + Sync,
440 T = &(dyn BroadcasterInterface + Send + Sync),
441 > + Send
442 + Sync,
443 >,
444> = None;
445
446#[cfg(all(not(c_bindings), feature = "std"))]
449pub const NO_LIQUIDITY_MANAGER_SYNC: Option<
450 Arc<
451 dyn ALiquidityManagerSync<
452 EntropySource = dyn EntropySource + Send + Sync,
453 ES = &(dyn EntropySource + Send + Sync),
454 NodeSigner = dyn lightning::sign::NodeSigner + Send + Sync,
455 NS = &(dyn lightning::sign::NodeSigner + Send + Sync),
456 AChannelManager = DynChannelManager,
457 CM = &DynChannelManager,
458 Filter = dyn chain::Filter + Send + Sync,
459 C = &(dyn chain::Filter + Send + Sync),
460 KVStoreSync = dyn lightning::util::persist::KVStoreSync + Send + Sync,
461 KS = &(dyn lightning::util::persist::KVStoreSync + Send + Sync),
462 TimeProvider = dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync,
463 TP = &(dyn lightning_liquidity::utils::time::TimeProvider + Send + Sync),
464 BroadcasterInterface = dyn lightning::chain::chaininterface::BroadcasterInterface
465 + Send
466 + Sync,
467 T = &(dyn BroadcasterInterface + Send + Sync),
468 > + Send
469 + Sync,
470 >,
471> = None;
472
473pub(crate) mod futures_util {
474 use core::future::Future;
475 use core::marker::Unpin;
476 use core::pin::Pin;
477 use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
478 pub(crate) struct Selector<
479 A: Future<Output = ()> + Unpin,
480 B: Future<Output = ()> + Unpin,
481 C: Future<Output = ()> + Unpin,
482 D: Future<Output = ()> + Unpin,
483 E: Future<Output = bool> + Unpin,
484 > {
485 pub a: A,
486 pub b: B,
487 pub c: C,
488 pub d: D,
489 pub e: E,
490 }
491
492 pub(crate) enum SelectorOutput {
493 A,
494 B,
495 C,
496 D,
497 E(bool),
498 }
499
500 impl<
501 A: Future<Output = ()> + Unpin,
502 B: Future<Output = ()> + Unpin,
503 C: Future<Output = ()> + Unpin,
504 D: Future<Output = ()> + Unpin,
505 E: Future<Output = bool> + Unpin,
506 > Future for Selector<A, B, C, D, E>
507 {
508 type Output = SelectorOutput;
509 fn poll(
510 mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
511 ) -> Poll<SelectorOutput> {
512 match Pin::new(&mut self.a).poll(ctx) {
513 Poll::Ready(()) => {
514 return Poll::Ready(SelectorOutput::A);
515 },
516 Poll::Pending => {},
517 }
518 match Pin::new(&mut self.b).poll(ctx) {
519 Poll::Ready(()) => {
520 return Poll::Ready(SelectorOutput::B);
521 },
522 Poll::Pending => {},
523 }
524 match Pin::new(&mut self.c).poll(ctx) {
525 Poll::Ready(()) => {
526 return Poll::Ready(SelectorOutput::C);
527 },
528 Poll::Pending => {},
529 }
530 match Pin::new(&mut self.d).poll(ctx) {
531 Poll::Ready(()) => {
532 return Poll::Ready(SelectorOutput::D);
533 },
534 Poll::Pending => {},
535 }
536 match Pin::new(&mut self.e).poll(ctx) {
537 Poll::Ready(res) => {
538 return Poll::Ready(SelectorOutput::E(res));
539 },
540 Poll::Pending => {},
541 }
542 Poll::Pending
543 }
544 }
545
546 pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
549 pub optional_future: Option<F>,
550 }
551
552 impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
553 type Output = ();
554 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
555 match self.optional_future.as_mut() {
556 Some(f) => match Pin::new(f).poll(ctx) {
557 Poll::Ready(()) => {
558 self.optional_future.take();
559 Poll::Ready(())
560 },
561 Poll::Pending => Poll::Pending,
562 },
563 None => Poll::Pending,
564 }
565 }
566 }
567
568 fn dummy_waker_clone(_: *const ()) -> RawWaker {
572 RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
573 }
574 fn dummy_waker_action(_: *const ()) {}
575
576 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
577 dummy_waker_clone,
578 dummy_waker_action,
579 dummy_waker_action,
580 dummy_waker_action,
581 );
582 pub(crate) fn dummy_waker() -> Waker {
583 unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
584 }
585
586 enum JoinerResult<ERR, F: Future<Output = Result<(), ERR>> + Unpin> {
587 Pending(Option<F>),
588 Ready(Result<(), ERR>),
589 }
590
591 pub(crate) struct Joiner<
592 ERR,
593 A: Future<Output = Result<(), ERR>> + Unpin,
594 B: Future<Output = Result<(), ERR>> + Unpin,
595 C: Future<Output = Result<(), ERR>> + Unpin,
596 D: Future<Output = Result<(), ERR>> + Unpin,
597 E: Future<Output = Result<(), ERR>> + Unpin,
598 > {
599 a: JoinerResult<ERR, A>,
600 b: JoinerResult<ERR, B>,
601 c: JoinerResult<ERR, C>,
602 d: JoinerResult<ERR, D>,
603 e: JoinerResult<ERR, E>,
604 }
605
606 impl<
607 ERR,
608 A: Future<Output = Result<(), ERR>> + Unpin,
609 B: Future<Output = Result<(), ERR>> + Unpin,
610 C: Future<Output = Result<(), ERR>> + Unpin,
611 D: Future<Output = Result<(), ERR>> + Unpin,
612 E: Future<Output = Result<(), ERR>> + Unpin,
613 > Joiner<ERR, A, B, C, D, E>
614 {
615 pub(crate) fn new() -> Self {
616 Self {
617 a: JoinerResult::Pending(None),
618 b: JoinerResult::Pending(None),
619 c: JoinerResult::Pending(None),
620 d: JoinerResult::Pending(None),
621 e: JoinerResult::Pending(None),
622 }
623 }
624
625 pub(crate) fn set_a(&mut self, fut: A) {
626 self.a = JoinerResult::Pending(Some(fut));
627 }
628 pub(crate) fn set_a_res(&mut self, res: Result<(), ERR>) {
629 self.a = JoinerResult::Ready(res);
630 }
631 pub(crate) fn set_b(&mut self, fut: B) {
632 self.b = JoinerResult::Pending(Some(fut));
633 }
634 pub(crate) fn set_c(&mut self, fut: C) {
635 self.c = JoinerResult::Pending(Some(fut));
636 }
637 pub(crate) fn set_d(&mut self, fut: D) {
638 self.d = JoinerResult::Pending(Some(fut));
639 }
640 pub(crate) fn set_e(&mut self, fut: E) {
641 self.e = JoinerResult::Pending(Some(fut));
642 }
643 }
644
645 impl<
646 ERR,
647 A: Future<Output = Result<(), ERR>> + Unpin,
648 B: Future<Output = Result<(), ERR>> + Unpin,
649 C: Future<Output = Result<(), ERR>> + Unpin,
650 D: Future<Output = Result<(), ERR>> + Unpin,
651 E: Future<Output = Result<(), ERR>> + Unpin,
652 > Future for Joiner<ERR, A, B, C, D, E>
653 where
654 Joiner<ERR, A, B, C, D, E>: Unpin,
655 {
656 type Output = [Result<(), ERR>; 5];
657 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
658 let mut all_complete = true;
659 macro_rules! handle {
660 ($val: ident) => {
661 match &mut (self.$val) {
662 JoinerResult::Pending(None) => {
663 self.$val = JoinerResult::Ready(Ok(()));
664 },
665 JoinerResult::<ERR, _>::Pending(Some(ref mut val)) => {
666 match Pin::new(val).poll(ctx) {
667 Poll::Ready(res) => {
668 self.$val = JoinerResult::Ready(res);
669 },
670 Poll::Pending => {
671 all_complete = false;
672 },
673 }
674 },
675 JoinerResult::Ready(_) => {},
676 }
677 };
678 }
679 handle!(a);
680 handle!(b);
681 handle!(c);
682 handle!(d);
683 handle!(e);
684
685 if all_complete {
686 let mut res = [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())];
687 if let JoinerResult::Ready(ref mut val) = &mut self.a {
688 core::mem::swap(&mut res[0], val);
689 }
690 if let JoinerResult::Ready(ref mut val) = &mut self.b {
691 core::mem::swap(&mut res[1], val);
692 }
693 if let JoinerResult::Ready(ref mut val) = &mut self.c {
694 core::mem::swap(&mut res[2], val);
695 }
696 if let JoinerResult::Ready(ref mut val) = &mut self.d {
697 core::mem::swap(&mut res[3], val);
698 }
699 if let JoinerResult::Ready(ref mut val) = &mut self.e {
700 core::mem::swap(&mut res[4], val);
701 }
702 Poll::Ready(res)
703 } else {
704 Poll::Pending
705 }
706 }
707 }
708}
709use core::task;
710use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
711
712#[cfg_attr(
720 feature = "std",
721 doc = " See [`BackgroundProcessor::start`] for information on which actions this handles.\n"
722)]
723#[cfg_attr(
832 feature = "std",
833 doc = " let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());"
834)]
835#[cfg_attr(feature = "std", doc = "")]
836#[cfg_attr(feature = "std", doc = " let mut receiver = stop_receiver.clone();")]
838#[cfg_attr(feature = "std", doc = " _ = receiver.changed() => true,")]
842#[cfg_attr(feature = "std", doc = " let handle = tokio::spawn(async move {")]
849#[cfg_attr(
850 not(feature = "std"),
851 doc = " let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();"
852)]
853#[cfg_attr(not(feature = "std"), doc = " rt.block_on(async move {")]
854#[cfg_attr(feature = "std", doc = " stop_sender.send(()).unwrap();")]
876#[cfg_attr(feature = "std", doc = " handle.await.unwrap()")]
877pub async fn process_events_async<
880 'a,
881 UL: Deref,
882 CF: Deref,
883 T: Deref,
884 F: Deref,
885 G: Deref<Target = NetworkGraph<L>>,
886 L: Deref,
887 P: Deref,
888 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
889 EventHandler: Fn(Event) -> EventHandlerFuture,
890 ES: Deref + Send,
891 M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
892 + Send
893 + Sync,
894 CM: Deref,
895 OM: Deref,
896 PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
897 RGS: Deref<Target = RapidGossipSync<G, L>>,
898 PM: Deref,
899 LM: Deref,
900 D: Deref,
901 O: Deref,
902 K: Deref,
903 OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
904 S: Deref<Target = SC> + Send + Sync,
905 SC: for<'b> WriteableScore<'b>,
906 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
907 Sleeper: Fn(Duration) -> SleepFuture,
908 FetchTime: Fn() -> Option<Duration>,
909>(
910 kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
911 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
912 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
913 sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
914) -> Result<(), lightning::io::Error>
915where
916 UL::Target: UtxoLookup,
917 CF::Target: chain::Filter,
918 T::Target: BroadcasterInterface,
919 F::Target: FeeEstimator,
920 L::Target: Logger,
921 P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
922 ES::Target: EntropySource,
923 CM::Target: AChannelManager,
924 OM::Target: AOnionMessenger,
925 PM::Target: APeerManager,
926 LM::Target: ALiquidityManager,
927 O::Target: OutputSpender,
928 D::Target: ChangeDestinationSource,
929 K::Target: KVStore,
930{
931 let async_event_handler = |event| {
932 let network_graph = gossip_sync.network_graph();
933 let event_handler = &event_handler;
934 let scorer = &scorer;
935 let logger = &logger;
936 let kv_store = &kv_store;
937 let fetch_time = &fetch_time;
938 Box::pin(async move {
940 if let Some(network_graph) = network_graph {
941 handle_network_graph_update(network_graph, &event)
942 }
943 if let Some(ref scorer) = scorer {
944 if let Some(duration_since_epoch) = fetch_time() {
945 if update_scorer(scorer, &event, duration_since_epoch) {
946 log_trace!(logger, "Persisting scorer after update");
947 if let Err(e) = kv_store
948 .write(
949 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
950 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
951 SCORER_PERSISTENCE_KEY,
952 scorer.encode(),
953 )
954 .await
955 {
956 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
957 }
962 }
963 }
964 }
965 event_handler(event).await
966 })
967 };
968 let mut batch_delay = BatchDelay::new();
969
970 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
971 channel_manager.get_cm().timer_tick_occurred();
972 log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
973 chain_monitor.rebroadcast_pending_claims();
974
975 let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
976 let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
977 let mut last_ping_call = sleeper(PING_TIMER);
978 let mut last_prune_call = sleeper(FIRST_NETWORK_PRUNE_TIMER);
979 let mut last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
980 let mut last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
981 let mut last_sweeper_call = sleeper(SWEEPER_TIMER);
982 let mut have_pruned = false;
983 let mut have_decayed_scorer = false;
984
985 let mut last_forwards_processing_call = sleeper(batch_delay.get());
986
987 loop {
988 channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
989 chain_monitor.process_pending_events_async(async_event_handler).await;
990 if let Some(om) = &onion_messenger {
991 om.get_om().process_pending_events_async(async_event_handler).await
992 }
993
994 peer_manager.as_ref().process_events();
1006 match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
1007 sleeper(batch_delay.next())
1008 }) {
1009 Some(false) => {
1010 channel_manager.get_cm().process_pending_htlc_forwards();
1011 },
1012 Some(true) => break,
1013 None => {},
1014 }
1015
1016 let mut await_start = None;
1019 if mobile_interruptable_platform {
1020 await_start = Some(sleeper(Duration::from_secs(1)));
1021 }
1022 let om_fut = if let Some(om) = onion_messenger.as_ref() {
1023 let fut = om.get_om().get_update_future();
1024 OptionalSelector { optional_future: Some(fut) }
1025 } else {
1026 OptionalSelector { optional_future: None }
1027 };
1028 let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
1029 let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
1030 OptionalSelector { optional_future: Some(fut) }
1031 } else {
1032 OptionalSelector { optional_future: None }
1033 };
1034 let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
1035 let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
1036 (true, true) => batch_delay.get().min(Duration::from_millis(100)),
1037 (true, false) => batch_delay.get().min(FASTEST_TIMER),
1038 (false, true) => Duration::from_millis(100),
1039 (false, false) => FASTEST_TIMER,
1040 };
1041 let fut = Selector {
1042 a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
1043 b: chain_monitor.get_update_future(),
1044 c: om_fut,
1045 d: lm_fut,
1046 e: sleeper(sleep_delay),
1047 };
1048 match fut.await {
1049 SelectorOutput::A | SelectorOutput::B | SelectorOutput::C | SelectorOutput::D => {},
1050 SelectorOutput::E(exit) => {
1051 if exit {
1052 break;
1053 }
1054 },
1055 }
1056
1057 let await_slow = if mobile_interruptable_platform {
1058 match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
1061 Some(true) => break,
1062 Some(false) => true,
1063 None => false,
1064 }
1065 } else {
1066 false
1067 };
1068 match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
1069 Some(false) => {
1070 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1071 channel_manager.get_cm().timer_tick_occurred();
1072 },
1073 Some(true) => break,
1074 None => {},
1075 }
1076
1077 let mut futures = Joiner::new();
1078
1079 if channel_manager.get_cm().get_and_clear_needs_persistence() {
1080 log_trace!(logger, "Persisting ChannelManager...");
1081
1082 let fut = async {
1083 kv_store
1084 .write(
1085 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1086 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1087 CHANNEL_MANAGER_PERSISTENCE_KEY,
1088 channel_manager.get_cm().encode(),
1089 )
1090 .await
1091 };
1092 let mut fut = Box::pin(fut);
1094
1095 use core::future::Future;
1101 let mut waker = dummy_waker();
1102 let mut ctx = task::Context::from_waker(&mut waker);
1103 match core::pin::Pin::new(&mut fut).poll(&mut ctx) {
1104 task::Poll::Ready(res) => futures.set_a_res(res),
1105 task::Poll::Pending => futures.set_a(fut),
1106 }
1107
1108 log_trace!(logger, "Done persisting ChannelManager.");
1109 }
1110
1111 let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
1117 NETWORK_PRUNE_TIMER
1118 } else {
1119 FIRST_NETWORK_PRUNE_TIMER
1120 };
1121 let prune_timer_elapsed = {
1122 match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
1123 Some(false) => true,
1124 Some(true) => break,
1125 None => false,
1126 }
1127 };
1128
1129 let should_prune = match gossip_sync {
1130 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1131 _ => prune_timer_elapsed,
1132 };
1133 if should_prune {
1134 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1136 if let Some(duration_since_epoch) = fetch_time() {
1137 log_trace!(logger, "Pruning and persisting network graph.");
1138 network_graph.remove_stale_channels_and_tracking_with_time(
1139 duration_since_epoch.as_secs(),
1140 );
1141 } else {
1142 log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
1143 log_trace!(logger, "Persisting network graph.");
1144 }
1145 let fut = async {
1146 if let Err(e) = kv_store
1147 .write(
1148 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1149 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1150 NETWORK_GRAPH_PERSISTENCE_KEY,
1151 network_graph.encode(),
1152 )
1153 .await
1154 {
1155 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1156 }
1157
1158 Ok(())
1159 };
1160
1161 futures.set_b(Box::pin(fut));
1163
1164 have_pruned = true;
1165 }
1166 }
1167 if !have_decayed_scorer {
1168 if let Some(ref scorer) = scorer {
1169 if let Some(duration_since_epoch) = fetch_time() {
1170 log_trace!(logger, "Calling time_passed on scorer at startup");
1171 scorer.write_lock().time_passed(duration_since_epoch);
1172 }
1173 }
1174 have_decayed_scorer = true;
1175 }
1176 match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1177 sleeper(SCORER_PERSIST_TIMER)
1178 }) {
1179 Some(false) => {
1180 if let Some(ref scorer) = scorer {
1181 if let Some(duration_since_epoch) = fetch_time() {
1182 log_trace!(logger, "Calling time_passed and persisting scorer");
1183 scorer.write_lock().time_passed(duration_since_epoch);
1184 } else {
1185 log_trace!(logger, "Persisting scorer");
1186 }
1187 let fut = async {
1188 if let Err(e) = kv_store
1189 .write(
1190 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1191 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1192 SCORER_PERSISTENCE_KEY,
1193 scorer.encode(),
1194 )
1195 .await
1196 {
1197 log_error!(
1198 logger,
1199 "Error: Failed to persist scorer, check your disk and permissions {}",
1200 e
1201 );
1202 }
1203
1204 Ok(())
1205 };
1206
1207 futures.set_c(Box::pin(fut));
1209 }
1210 },
1211 Some(true) => break,
1212 None => {},
1213 }
1214 match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
1215 Some(false) => {
1216 log_trace!(logger, "Regenerating sweeper spends if necessary");
1217 if let Some(ref sweeper) = sweeper {
1218 let fut = async {
1219 let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1220
1221 Ok(())
1222 };
1223
1224 futures.set_d(Box::pin(fut));
1226 }
1227 },
1228 Some(true) => break,
1229 None => {},
1230 }
1231
1232 if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1233 log_trace!(logger, "Persisting LiquidityManager...");
1234 let fut = async {
1235 liquidity_manager.get_lm().persist().await.map_err(|e| {
1236 log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1237 e
1238 })
1239 };
1240 futures.set_e(Box::pin(fut));
1241 }
1242
1243 for res in futures.await {
1245 res?;
1246 }
1247
1248 match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1249 sleeper(ONION_MESSAGE_HANDLER_TIMER)
1250 }) {
1251 Some(false) => {
1252 if let Some(om) = &onion_messenger {
1253 log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1254 om.get_om().timer_tick_occurred();
1255 }
1256 },
1257 Some(true) => break,
1258 None => {},
1259 }
1260
1261 if await_slow {
1263 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
1276 peer_manager.as_ref().disconnect_all_peers();
1277 last_ping_call = sleeper(PING_TIMER);
1278 } else {
1279 match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
1280 Some(false) => {
1281 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1282 peer_manager.as_ref().timer_tick_occurred();
1283 },
1284 Some(true) => break,
1285 _ => {},
1286 }
1287 }
1288
1289 match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
1291 Some(false) => {
1292 log_trace!(logger, "Rebroadcasting monitor's pending claims");
1293 chain_monitor.rebroadcast_pending_claims();
1294 },
1295 Some(true) => break,
1296 None => {},
1297 }
1298 }
1299 log_trace!(logger, "Terminating background processor.");
1300
1301 kv_store
1305 .write(
1306 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1307 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1308 CHANNEL_MANAGER_PERSISTENCE_KEY,
1309 channel_manager.get_cm().encode(),
1310 )
1311 .await?;
1312 if let Some(ref scorer) = scorer {
1313 kv_store
1314 .write(
1315 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1316 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1317 SCORER_PERSISTENCE_KEY,
1318 scorer.encode(),
1319 )
1320 .await?;
1321 }
1322 if let Some(network_graph) = gossip_sync.network_graph() {
1323 kv_store
1324 .write(
1325 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1326 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1327 NETWORK_GRAPH_PERSISTENCE_KEY,
1328 network_graph.encode(),
1329 )
1330 .await?;
1331 }
1332 Ok(())
1333}
1334
1335fn check_and_reset_sleeper<
1336 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1337>(
1338 fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
1339) -> Option<bool> {
1340 let mut waker = dummy_waker();
1341 let mut ctx = task::Context::from_waker(&mut waker);
1342 match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1343 task::Poll::Ready(exit) => {
1344 *fut = new_sleeper();
1345 Some(exit)
1346 },
1347 task::Poll::Pending => None,
1348 }
1349}
1350
1351pub async fn process_events_async_with_kv_store_sync<
1354 UL: Deref,
1355 CF: Deref,
1356 T: Deref,
1357 F: Deref,
1358 G: Deref<Target = NetworkGraph<L>>,
1359 L: Deref + Send + Sync,
1360 P: Deref,
1361 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
1362 EventHandler: Fn(Event) -> EventHandlerFuture,
1363 ES: Deref + Send,
1364 M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
1365 + Send
1366 + Sync,
1367 CM: Deref + Send + Sync,
1368 OM: Deref,
1369 PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
1370 RGS: Deref<Target = RapidGossipSync<G, L>>,
1371 PM: Deref,
1372 LM: Deref,
1373 D: Deref,
1374 O: Deref,
1375 K: Deref,
1376 OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
1377 S: Deref<Target = SC> + Send + Sync,
1378 SC: for<'b> WriteableScore<'b>,
1379 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1380 Sleeper: Fn(Duration) -> SleepFuture,
1381 FetchTime: Fn() -> Option<Duration>,
1382>(
1383 kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
1384 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1385 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1386 sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
1387) -> Result<(), lightning::io::Error>
1388where
1389 UL::Target: UtxoLookup,
1390 CF::Target: chain::Filter,
1391 T::Target: BroadcasterInterface,
1392 F::Target: FeeEstimator,
1393 L::Target: Logger,
1394 P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
1395 ES::Target: EntropySource,
1396 CM::Target: AChannelManager,
1397 OM::Target: AOnionMessenger,
1398 PM::Target: APeerManager,
1399 LM::Target: ALiquidityManager,
1400 O::Target: OutputSpender,
1401 D::Target: ChangeDestinationSourceSync,
1402 K::Target: KVStoreSync,
1403{
1404 let kv_store = KVStoreSyncWrapper(kv_store);
1405 process_events_async(
1406 kv_store,
1407 event_handler,
1408 chain_monitor,
1409 channel_manager,
1410 onion_messenger,
1411 gossip_sync,
1412 peer_manager,
1413 liquidity_manager,
1414 sweeper.as_ref().map(|os| os.sweeper_async()),
1415 logger,
1416 scorer,
1417 sleeper,
1418 mobile_interruptable_platform,
1419 fetch_time,
1420 )
1421 .await
1422}
1423
1424#[cfg(feature = "std")]
1425impl BackgroundProcessor {
1426 pub fn start<
1469 'a,
1470 UL: 'static + Deref,
1471 CF: 'static + Deref,
1472 T: 'static + Deref,
1473 F: 'static + Deref + Send,
1474 G: 'static + Deref<Target = NetworkGraph<L>>,
1475 L: 'static + Deref + Send,
1476 P: 'static + Deref,
1477 EH: 'static + EventHandler + Send,
1478 ES: 'static + Deref + Send,
1479 M: 'static
1480 + Deref<
1481 Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1482 >
1483 + Send
1484 + Sync,
1485 CM: 'static + Deref + Send,
1486 OM: 'static + Deref + Send,
1487 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
1488 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
1489 PM: 'static + Deref + Send,
1490 LM: 'static + Deref + Send,
1491 S: 'static + Deref<Target = SC> + Send + Sync,
1492 SC: for<'b> WriteableScore<'b>,
1493 D: 'static + Deref,
1494 O: 'static + Deref,
1495 K: 'static + Deref + Send,
1496 OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
1497 >(
1498 kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
1499 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
1500 liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
1501 ) -> Self
1502 where
1503 UL::Target: 'static + UtxoLookup,
1504 CF::Target: 'static + chain::Filter,
1505 T::Target: 'static + BroadcasterInterface,
1506 F::Target: 'static + FeeEstimator,
1507 L::Target: 'static + Logger,
1508 P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1509 ES::Target: 'static + EntropySource,
1510 CM::Target: AChannelManager,
1511 OM::Target: AOnionMessenger,
1512 PM::Target: APeerManager,
1513 LM::Target: ALiquidityManagerSync,
1514 D::Target: ChangeDestinationSourceSync,
1515 O::Target: 'static + OutputSpender,
1516 K::Target: 'static + KVStoreSync,
1517 {
1518 let stop_thread = Arc::new(AtomicBool::new(false));
1519 let stop_thread_clone = Arc::clone(&stop_thread);
1520 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1521 let event_handler = |event| {
1522 let network_graph = gossip_sync.network_graph();
1523 if let Some(network_graph) = network_graph {
1524 handle_network_graph_update(network_graph, &event)
1525 }
1526 if let Some(ref scorer) = scorer {
1527 use std::time::SystemTime;
1528 let duration_since_epoch = SystemTime::now()
1529 .duration_since(SystemTime::UNIX_EPOCH)
1530 .expect("Time should be sometime after 1970");
1531 if update_scorer(scorer, &event, duration_since_epoch) {
1532 log_trace!(logger, "Persisting scorer after update");
1533 if let Err(e) = kv_store.write(
1534 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1535 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1536 SCORER_PERSISTENCE_KEY,
1537 scorer.encode(),
1538 ) {
1539 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1540 }
1541 }
1542 }
1543 event_handler.handle_event(event)
1544 };
1545 let mut batch_delay = BatchDelay::new();
1546
1547 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
1548 channel_manager.get_cm().timer_tick_occurred();
1549 log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
1550 chain_monitor.rebroadcast_pending_claims();
1551
1552 let mut last_freshness_call = Instant::now();
1553 let mut last_onion_message_handler_call = Instant::now();
1554 let mut last_ping_call = Instant::now();
1555 let mut last_prune_call = Instant::now();
1556 let mut last_scorer_persist_call = Instant::now();
1557 let mut last_rebroadcast_call = Instant::now();
1558 let mut last_sweeper_call = Instant::now();
1559 let mut have_pruned = false;
1560 let mut have_decayed_scorer = false;
1561
1562 let mut cur_batch_delay = batch_delay.get();
1563 let mut last_forwards_processing_call = Instant::now();
1564
1565 loop {
1566 channel_manager.get_cm().process_pending_events(&event_handler);
1567 chain_monitor.process_pending_events(&event_handler);
1568 if let Some(om) = &onion_messenger {
1569 om.get_om().process_pending_events(&event_handler)
1570 };
1571
1572 peer_manager.as_ref().process_events();
1584 if last_forwards_processing_call.elapsed() > cur_batch_delay {
1585 channel_manager.get_cm().process_pending_htlc_forwards();
1586 cur_batch_delay = batch_delay.next();
1587 last_forwards_processing_call = Instant::now();
1588 }
1589 if stop_thread.load(Ordering::Acquire) {
1590 log_trace!(logger, "Terminating background processor.");
1591 break;
1592 }
1593 let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1594 (Some(om), Some(lm)) => Sleeper::from_four_futures(
1595 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1596 &chain_monitor.get_update_future(),
1597 &om.get_om().get_update_future(),
1598 &lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1599 ),
1600 (Some(om), None) => Sleeper::from_three_futures(
1601 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1602 &chain_monitor.get_update_future(),
1603 &om.get_om().get_update_future(),
1604 ),
1605 (None, Some(lm)) => Sleeper::from_three_futures(
1606 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1607 &chain_monitor.get_update_future(),
1608 &lm.get_lm().get_pending_msgs_or_needs_persist_future(),
1609 ),
1610 (None, None) => Sleeper::from_two_futures(
1611 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
1612 &chain_monitor.get_update_future(),
1613 ),
1614 };
1615 let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
1616 batch_delay.get()
1617 } else {
1618 Duration::MAX
1619 };
1620 let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1621 sleeper.wait_timeout(fastest_timeout);
1622 if stop_thread.load(Ordering::Acquire) {
1623 log_trace!(logger, "Terminating background processor.");
1624 break;
1625 }
1626 if last_freshness_call.elapsed() > FRESHNESS_TIMER {
1627 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
1628 channel_manager.get_cm().timer_tick_occurred();
1629 last_freshness_call = Instant::now();
1630 }
1631 if channel_manager.get_cm().get_and_clear_needs_persistence() {
1632 log_trace!(logger, "Persisting ChannelManager...");
1633 (kv_store.write(
1634 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1635 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1636 CHANNEL_MANAGER_PERSISTENCE_KEY,
1637 channel_manager.get_cm().encode(),
1638 ))?;
1639 log_trace!(logger, "Done persisting ChannelManager.");
1640 }
1641
1642 if let Some(liquidity_manager) = liquidity_manager.as_ref() {
1643 log_trace!(logger, "Persisting LiquidityManager...");
1644 let _ = liquidity_manager.get_lm().persist().map_err(|e| {
1645 log_error!(logger, "Persisting LiquidityManager failed: {}", e);
1646 });
1647 }
1648
1649 let prune_timer =
1655 if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1656 let prune_timer_elapsed = last_prune_call.elapsed() > prune_timer;
1657 let should_prune = match gossip_sync {
1658 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
1659 _ => prune_timer_elapsed,
1660 };
1661 if should_prune {
1662 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
1664 let duration_since_epoch = std::time::SystemTime::now()
1665 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1666 .expect("Time should be sometime after 1970");
1667
1668 log_trace!(logger, "Pruning and persisting network graph.");
1669 network_graph.remove_stale_channels_and_tracking_with_time(
1670 duration_since_epoch.as_secs(),
1671 );
1672 if let Err(e) = kv_store.write(
1673 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1674 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1675 NETWORK_GRAPH_PERSISTENCE_KEY,
1676 network_graph.encode(),
1677 ) {
1678 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
1679 }
1680 have_pruned = true;
1681 }
1682 last_prune_call = Instant::now();
1683 }
1684 if !have_decayed_scorer {
1685 if let Some(ref scorer) = scorer {
1686 let duration_since_epoch = std::time::SystemTime::now()
1687 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1688 .expect("Time should be sometime after 1970");
1689 log_trace!(logger, "Calling time_passed on scorer at startup");
1690 scorer.write_lock().time_passed(duration_since_epoch);
1691 }
1692 have_decayed_scorer = true;
1693 }
1694 if last_scorer_persist_call.elapsed() > SCORER_PERSIST_TIMER {
1695 if let Some(ref scorer) = scorer {
1696 let duration_since_epoch = std::time::SystemTime::now()
1697 .duration_since(std::time::SystemTime::UNIX_EPOCH)
1698 .expect("Time should be sometime after 1970");
1699 log_trace!(logger, "Calling time_passed and persisting scorer");
1700 scorer.write_lock().time_passed(duration_since_epoch);
1701 if let Err(e) = kv_store.write(
1702 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1703 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1704 SCORER_PERSISTENCE_KEY,
1705 scorer.encode(),
1706 ) {
1707 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
1708 }
1709 }
1710 last_scorer_persist_call = Instant::now();
1711 }
1712 if last_sweeper_call.elapsed() > SWEEPER_TIMER {
1713 log_trace!(logger, "Regenerating sweeper spends if necessary");
1714 if let Some(ref sweeper) = sweeper {
1715 let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1716 }
1717 last_sweeper_call = Instant::now();
1718 }
1719 if last_onion_message_handler_call.elapsed() > ONION_MESSAGE_HANDLER_TIMER {
1720 if let Some(om) = &onion_messenger {
1721 log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
1722 om.get_om().timer_tick_occurred();
1723 }
1724 last_onion_message_handler_call = Instant::now();
1725 }
1726 if last_ping_call.elapsed() > PING_TIMER {
1727 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
1728 peer_manager.as_ref().timer_tick_occurred();
1729 last_ping_call = Instant::now();
1730 }
1731 if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
1732 log_trace!(logger, "Rebroadcasting monitor's pending claims");
1733 chain_monitor.rebroadcast_pending_claims();
1734 last_rebroadcast_call = Instant::now();
1735 }
1736 }
1737
1738 kv_store.write(
1742 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1743 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1744 CHANNEL_MANAGER_PERSISTENCE_KEY,
1745 channel_manager.get_cm().encode(),
1746 )?;
1747 if let Some(ref scorer) = scorer {
1748 kv_store.write(
1749 SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1750 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1751 SCORER_PERSISTENCE_KEY,
1752 scorer.encode(),
1753 )?;
1754 }
1755 if let Some(network_graph) = gossip_sync.network_graph() {
1756 kv_store.write(
1757 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1758 NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1759 NETWORK_GRAPH_PERSISTENCE_KEY,
1760 network_graph.encode(),
1761 )?;
1762 }
1763 Ok(())
1764 });
1765 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1766 }
1767
1768 pub fn join(mut self) -> Result<(), std::io::Error> {
1778 assert!(self.thread_handle.is_some());
1779 self.join_thread()
1780 }
1781
1782 pub fn stop(mut self) -> Result<(), std::io::Error> {
1792 assert!(self.thread_handle.is_some());
1793 self.stop_and_join_thread()
1794 }
1795
1796 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1797 self.stop_thread.store(true, Ordering::Release);
1798 self.join_thread()
1799 }
1800
1801 fn join_thread(&mut self) -> Result<(), std::io::Error> {
1802 match self.thread_handle.take() {
1803 Some(handle) => handle.join().unwrap(),
1804 None => Ok(()),
1805 }
1806 }
1807}
1808
1809#[cfg(feature = "std")]
1810impl Drop for BackgroundProcessor {
1811 fn drop(&mut self) {
1812 self.stop_and_join_thread().unwrap();
1813 }
1814}
1815
1816#[cfg(all(feature = "std", test))]
1817mod tests {
1818 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1819 use bitcoin::constants::{genesis_block, ChainHash};
1820 use bitcoin::hashes::Hash;
1821 use bitcoin::locktime::absolute::LockTime;
1822 use bitcoin::network::Network;
1823 use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1824 use bitcoin::transaction::Version;
1825 use bitcoin::transaction::{Transaction, TxOut};
1826 use bitcoin::{Amount, ScriptBuf, Txid};
1827 use core::sync::atomic::{AtomicBool, Ordering};
1828 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1829 use lightning::chain::transaction::OutPoint;
1830 use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1831 use lightning::events::{Event, PathFailure, ReplayEvent};
1832 use lightning::ln::channelmanager;
1833 use lightning::ln::channelmanager::{
1834 ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1835 };
1836 use lightning::ln::functional_test_utils::*;
1837 use lightning::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, Init, MessageSendEvent};
1838 use lightning::ln::peer_handler::{
1839 IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1840 };
1841 use lightning::ln::types::ChannelId;
1842 use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1843 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1844 use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1845 use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1846 use lightning::sign::{ChangeDestinationSourceSync, InMemorySigner, KeysManager, NodeSigner};
1847 use lightning::types::features::{ChannelFeatures, NodeFeatures};
1848 use lightning::types::payment::PaymentHash;
1849 use lightning::util::config::UserConfig;
1850 use lightning::util::persist::{
1851 KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
1852 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1853 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1854 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1855 SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1856 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1857 };
1858 use lightning::util::ser::Writeable;
1859 use lightning::util::sweep::{
1860 OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS,
1861 };
1862 use lightning::util::test_utils;
1863 use lightning::{get_event, get_event_msg};
1864 use lightning_liquidity::utils::time::DefaultTimeProvider;
1865 use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync};
1866 use lightning_persister::fs_store::FilesystemStore;
1867 use lightning_rapid_gossip_sync::RapidGossipSync;
1868 use std::collections::VecDeque;
1869 use std::path::PathBuf;
1870 use std::sync::mpsc::SyncSender;
1871 use std::sync::Arc;
1872 use std::time::Duration;
1873 use std::{env, fs};
1874
1875 const EVENT_DEADLINE: Duration =
1876 Duration::from_millis(5 * (FRESHNESS_TIMER.as_millis() as u64));
1877
1878 #[derive(Clone, Hash, PartialEq, Eq)]
1879 struct TestDescriptor {}
1880 impl SocketDescriptor for TestDescriptor {
1881 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
1882 0
1883 }
1884
1885 fn disconnect_socket(&mut self) {}
1886 }
1887
1888 #[cfg(c_bindings)]
1889 type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1890 #[cfg(not(c_bindings))]
1891 type LockingWrapper<T> = std::sync::Mutex<T>;
1892
1893 type ChannelManager = channelmanager::ChannelManager<
1894 Arc<ChainMonitor>,
1895 Arc<test_utils::TestBroadcaster>,
1896 Arc<KeysManager>,
1897 Arc<KeysManager>,
1898 Arc<KeysManager>,
1899 Arc<test_utils::TestFeeEstimator>,
1900 Arc<
1901 DefaultRouter<
1902 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1903 Arc<test_utils::TestLogger>,
1904 Arc<KeysManager>,
1905 Arc<LockingWrapper<TestScorer>>,
1906 (),
1907 TestScorer,
1908 >,
1909 >,
1910 Arc<
1911 DefaultMessageRouter<
1912 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1913 Arc<test_utils::TestLogger>,
1914 Arc<KeysManager>,
1915 >,
1916 >,
1917 Arc<test_utils::TestLogger>,
1918 >;
1919
1920 type ChainMonitor = chainmonitor::ChainMonitor<
1921 InMemorySigner,
1922 Arc<test_utils::TestChainSource>,
1923 Arc<test_utils::TestBroadcaster>,
1924 Arc<test_utils::TestFeeEstimator>,
1925 Arc<test_utils::TestLogger>,
1926 Arc<Persister>,
1927 Arc<KeysManager>,
1928 >;
1929
1930 type PGS = Arc<
1931 P2PGossipSync<
1932 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1933 Arc<test_utils::TestChainSource>,
1934 Arc<test_utils::TestLogger>,
1935 >,
1936 >;
1937 type RGS = Arc<
1938 RapidGossipSync<
1939 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1940 Arc<test_utils::TestLogger>,
1941 >,
1942 >;
1943
1944 type OM = OnionMessenger<
1945 Arc<KeysManager>,
1946 Arc<KeysManager>,
1947 Arc<test_utils::TestLogger>,
1948 Arc<ChannelManager>,
1949 Arc<
1950 DefaultMessageRouter<
1951 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1952 Arc<test_utils::TestLogger>,
1953 Arc<KeysManager>,
1954 >,
1955 >,
1956 IgnoringMessageHandler,
1957 Arc<ChannelManager>,
1958 IgnoringMessageHandler,
1959 IgnoringMessageHandler,
1960 >;
1961
1962 type LM = LiquidityManagerSync<
1963 Arc<KeysManager>,
1964 Arc<KeysManager>,
1965 Arc<ChannelManager>,
1966 Arc<dyn Filter + Sync + Send>,
1967 Arc<Persister>,
1968 DefaultTimeProvider,
1969 Arc<test_utils::TestBroadcaster>,
1970 >;
1971
1972 struct Node {
1973 node: Arc<ChannelManager>,
1974 messenger: Arc<OM>,
1975 p2p_gossip_sync: PGS,
1976 rapid_gossip_sync: RGS,
1977 peer_manager: Arc<
1978 PeerManager<
1979 TestDescriptor,
1980 Arc<test_utils::TestChannelMessageHandler>,
1981 Arc<test_utils::TestRoutingMessageHandler>,
1982 Arc<OM>,
1983 Arc<test_utils::TestLogger>,
1984 IgnoringMessageHandler,
1985 Arc<KeysManager>,
1986 IgnoringMessageHandler,
1987 >,
1988 >,
1989 liquidity_manager: Arc<LM>,
1990 chain_monitor: Arc<ChainMonitor>,
1991 kv_store: Arc<Persister>,
1992 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1993 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1994 logger: Arc<test_utils::TestLogger>,
1995 best_block: BestBlock,
1996 scorer: Arc<LockingWrapper<TestScorer>>,
1997 sweeper: Arc<
1998 OutputSweeperSync<
1999 Arc<test_utils::TestBroadcaster>,
2000 Arc<TestWallet>,
2001 Arc<test_utils::TestFeeEstimator>,
2002 Arc<test_utils::TestChainSource>,
2003 Arc<Persister>,
2004 Arc<test_utils::TestLogger>,
2005 Arc<KeysManager>,
2006 >,
2007 >,
2008 }
2009
2010 impl Node {
2011 fn p2p_gossip_sync(
2012 &self,
2013 ) -> GossipSync<
2014 PGS,
2015 RGS,
2016 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2017 Arc<test_utils::TestChainSource>,
2018 Arc<test_utils::TestLogger>,
2019 > {
2020 GossipSync::P2P(Arc::clone(&self.p2p_gossip_sync))
2021 }
2022
2023 fn rapid_gossip_sync(
2024 &self,
2025 ) -> GossipSync<
2026 PGS,
2027 RGS,
2028 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2029 Arc<test_utils::TestChainSource>,
2030 Arc<test_utils::TestLogger>,
2031 > {
2032 GossipSync::Rapid(Arc::clone(&self.rapid_gossip_sync))
2033 }
2034
2035 fn no_gossip_sync(
2036 &self,
2037 ) -> GossipSync<
2038 PGS,
2039 RGS,
2040 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
2041 Arc<test_utils::TestChainSource>,
2042 Arc<test_utils::TestLogger>,
2043 > {
2044 GossipSync::None
2045 }
2046 }
2047
2048 impl Drop for Node {
2049 fn drop(&mut self) {
2050 let data_dir = self.kv_store.get_data_dir();
2051 match fs::remove_dir_all(data_dir.clone()) {
2052 Err(e) => {
2053 println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
2054 },
2055 _ => {},
2056 }
2057 }
2058 }
2059
2060 struct Persister {
2061 graph_error: Option<(std::io::ErrorKind, &'static str)>,
2062 graph_persistence_notifier: Option<SyncSender<()>>,
2063 manager_error: Option<(std::io::ErrorKind, &'static str)>,
2064 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
2065 kv_store: FilesystemStore,
2066 }
2067
2068 impl Persister {
2069 fn new(data_dir: PathBuf) -> Self {
2070 let kv_store = FilesystemStore::new(data_dir);
2071 Self {
2072 graph_error: None,
2073 graph_persistence_notifier: None,
2074 manager_error: None,
2075 scorer_error: None,
2076 kv_store,
2077 }
2078 }
2079
2080 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2081 Self { graph_error: Some((error, message)), ..self }
2082 }
2083
2084 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
2085 Self { graph_persistence_notifier: Some(sender), ..self }
2086 }
2087
2088 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2089 Self { manager_error: Some((error, message)), ..self }
2090 }
2091
2092 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
2093 Self { scorer_error: Some((error, message)), ..self }
2094 }
2095
2096 pub fn get_data_dir(&self) -> PathBuf {
2097 self.kv_store.get_data_dir()
2098 }
2099 }
2100
2101 impl KVStoreSync for Persister {
2102 fn read(
2103 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2104 ) -> lightning::io::Result<Vec<u8>> {
2105 self.kv_store.read(primary_namespace, secondary_namespace, key)
2106 }
2107
2108 fn write(
2109 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
2110 ) -> lightning::io::Result<()> {
2111 if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
2112 && secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
2113 && key == CHANNEL_MANAGER_PERSISTENCE_KEY
2114 {
2115 if let Some((error, message)) = self.manager_error {
2116 return Err(std::io::Error::new(error, message).into());
2117 }
2118 }
2119
2120 if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
2121 && secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
2122 && key == NETWORK_GRAPH_PERSISTENCE_KEY
2123 {
2124 if let Some(sender) = &self.graph_persistence_notifier {
2125 match sender.send(()) {
2126 Ok(()) => {},
2127 Err(std::sync::mpsc::SendError(())) => {
2128 println!("Persister failed to notify as receiver went away.")
2129 },
2130 }
2131 };
2132
2133 if let Some((error, message)) = self.graph_error {
2134 return Err(std::io::Error::new(error, message).into());
2135 }
2136 }
2137
2138 if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
2139 && secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
2140 && key == SCORER_PERSISTENCE_KEY
2141 {
2142 if let Some((error, message)) = self.scorer_error {
2143 return Err(std::io::Error::new(error, message).into());
2144 }
2145 }
2146
2147 self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
2148 }
2149
2150 fn remove(
2151 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2152 ) -> lightning::io::Result<()> {
2153 self.kv_store.remove(primary_namespace, secondary_namespace, key)
2154 }
2155
2156 fn list(
2157 &self, primary_namespace: &str, secondary_namespace: &str,
2158 ) -> lightning::io::Result<Vec<String>> {
2159 self.kv_store.list(primary_namespace, secondary_namespace)
2160 }
2161 }
2162
2163 struct TestScorer {
2164 event_expectations: Option<VecDeque<TestResult>>,
2165 }
2166
2167 #[derive(Debug)]
2168 enum TestResult {
2169 PaymentFailure { path: Path, short_channel_id: u64 },
2170 PaymentSuccess { path: Path },
2171 ProbeFailure { path: Path },
2172 ProbeSuccess { path: Path },
2173 }
2174
2175 impl TestScorer {
2176 fn new() -> Self {
2177 Self { event_expectations: None }
2178 }
2179
2180 fn expect(&mut self, expectation: TestResult) {
2181 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
2182 }
2183 }
2184
2185 impl lightning::util::ser::Writeable for TestScorer {
2186 fn write<W: lightning::util::ser::Writer>(
2187 &self, _: &mut W,
2188 ) -> Result<(), lightning::io::Error> {
2189 Ok(())
2190 }
2191 }
2192
2193 impl ScoreLookUp for TestScorer {
2194 type ScoreParams = ();
2195 fn channel_penalty_msat(
2196 &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
2197 _score_params: &Self::ScoreParams,
2198 ) -> u64 {
2199 unimplemented!();
2200 }
2201 }
2202
2203 impl ScoreUpdate for TestScorer {
2204 fn payment_path_failed(
2205 &mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
2206 ) {
2207 if let Some(expectations) = &mut self.event_expectations {
2208 match expectations.pop_front().unwrap() {
2209 TestResult::PaymentFailure { path, short_channel_id } => {
2210 assert_eq!(actual_path, &path);
2211 assert_eq!(actual_short_channel_id, short_channel_id);
2212 },
2213 TestResult::PaymentSuccess { path } => {
2214 panic!("Unexpected successful payment path: {:?}", path)
2215 },
2216 TestResult::ProbeFailure { path } => {
2217 panic!("Unexpected probe failure: {:?}", path)
2218 },
2219 TestResult::ProbeSuccess { path } => {
2220 panic!("Unexpected probe success: {:?}", path)
2221 },
2222 }
2223 }
2224 }
2225
2226 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
2227 if let Some(expectations) = &mut self.event_expectations {
2228 match expectations.pop_front().unwrap() {
2229 TestResult::PaymentFailure { path, .. } => {
2230 panic!("Unexpected payment path failure: {:?}", path)
2231 },
2232 TestResult::PaymentSuccess { path } => {
2233 assert_eq!(actual_path, &path);
2234 },
2235 TestResult::ProbeFailure { path } => {
2236 panic!("Unexpected probe failure: {:?}", path)
2237 },
2238 TestResult::ProbeSuccess { path } => {
2239 panic!("Unexpected probe success: {:?}", path)
2240 },
2241 }
2242 }
2243 }
2244
2245 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
2246 if let Some(expectations) = &mut self.event_expectations {
2247 match expectations.pop_front().unwrap() {
2248 TestResult::PaymentFailure { path, .. } => {
2249 panic!("Unexpected payment path failure: {:?}", path)
2250 },
2251 TestResult::PaymentSuccess { path } => {
2252 panic!("Unexpected payment path success: {:?}", path)
2253 },
2254 TestResult::ProbeFailure { path } => {
2255 assert_eq!(actual_path, &path);
2256 },
2257 TestResult::ProbeSuccess { path } => {
2258 panic!("Unexpected probe success: {:?}", path)
2259 },
2260 }
2261 }
2262 }
2263 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
2264 if let Some(expectations) = &mut self.event_expectations {
2265 match expectations.pop_front().unwrap() {
2266 TestResult::PaymentFailure { path, .. } => {
2267 panic!("Unexpected payment path failure: {:?}", path)
2268 },
2269 TestResult::PaymentSuccess { path } => {
2270 panic!("Unexpected payment path success: {:?}", path)
2271 },
2272 TestResult::ProbeFailure { path } => {
2273 panic!("Unexpected probe failure: {:?}", path)
2274 },
2275 TestResult::ProbeSuccess { path } => {
2276 assert_eq!(actual_path, &path);
2277 },
2278 }
2279 }
2280 }
2281 fn time_passed(&mut self, _: Duration) {}
2282 }
2283
2284 #[cfg(c_bindings)]
2285 impl lightning::routing::scoring::Score for TestScorer {}
2286
2287 impl Drop for TestScorer {
2288 fn drop(&mut self) {
2289 if std::thread::panicking() {
2290 return;
2291 }
2292
2293 if let Some(event_expectations) = &self.event_expectations {
2294 if !event_expectations.is_empty() {
2295 panic!("Unsatisfied event expectations: {:?}", event_expectations);
2296 }
2297 }
2298 }
2299 }
2300
2301 struct TestWallet {}
2302
2303 impl ChangeDestinationSourceSync for TestWallet {
2304 fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
2305 Ok(ScriptBuf::new())
2306 }
2307 }
2308
2309 fn get_full_filepath(filepath: String, filename: String) -> String {
2310 let mut path = PathBuf::from(filepath);
2311 path.push(filename);
2312 path.to_str().unwrap().to_string()
2313 }
2314
2315 fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
2316 let persist_temp_path = env::temp_dir().join(persist_dir);
2317 let persist_dir = persist_temp_path.to_string_lossy().to_string();
2318 let network = Network::Bitcoin;
2319 let mut nodes = Vec::new();
2320 for i in 0..num_nodes {
2321 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
2322 let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
2323 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
2324 let genesis_block = genesis_block(network);
2325 let network_graph = Arc::new(NetworkGraph::new(network, Arc::clone(&logger)));
2326 let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
2327 let now = Duration::from_secs(genesis_block.header.time as u64);
2328 let seed = [i as u8; 32];
2329 let keys_manager =
2330 Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2331 let router = Arc::new(DefaultRouter::new(
2332 Arc::clone(&network_graph),
2333 Arc::clone(&logger),
2334 Arc::clone(&keys_manager),
2335 Arc::clone(&scorer),
2336 Default::default(),
2337 ));
2338 let msg_router = Arc::new(DefaultMessageRouter::new(
2339 Arc::clone(&network_graph),
2340 Arc::clone(&keys_manager),
2341 ));
2342 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
2343 let kv_store =
2344 Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
2345 let now = Duration::from_secs(genesis_block.header.time as u64);
2346 let keys_manager =
2347 Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2348 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2349 Some(Arc::clone(&chain_source)),
2350 Arc::clone(&tx_broadcaster),
2351 Arc::clone(&logger),
2352 Arc::clone(&fee_estimator),
2353 Arc::clone(&kv_store),
2354 Arc::clone(&keys_manager),
2355 keys_manager.get_peer_storage_key(),
2356 ));
2357 let best_block = BestBlock::from_network(network);
2358 let params = ChainParameters { network, best_block };
2359 let manager = Arc::new(ChannelManager::new(
2360 Arc::clone(&fee_estimator),
2361 Arc::clone(&chain_monitor),
2362 Arc::clone(&tx_broadcaster),
2363 Arc::clone(&router),
2364 Arc::clone(&msg_router),
2365 Arc::clone(&logger),
2366 Arc::clone(&keys_manager),
2367 Arc::clone(&keys_manager),
2368 Arc::clone(&keys_manager),
2369 UserConfig::default(),
2370 params,
2371 genesis_block.header.time,
2372 ));
2373 let messenger = Arc::new(OnionMessenger::new(
2374 Arc::clone(&keys_manager),
2375 Arc::clone(&keys_manager),
2376 Arc::clone(&logger),
2377 Arc::clone(&manager),
2378 Arc::clone(&msg_router),
2379 IgnoringMessageHandler {},
2380 Arc::clone(&manager),
2381 IgnoringMessageHandler {},
2382 IgnoringMessageHandler {},
2383 ));
2384 let wallet = Arc::new(TestWallet {});
2385 let sweeper = Arc::new(OutputSweeperSync::new(
2386 best_block,
2387 Arc::clone(&tx_broadcaster),
2388 Arc::clone(&fee_estimator),
2389 None::<Arc<test_utils::TestChainSource>>,
2390 Arc::clone(&keys_manager),
2391 wallet,
2392 Arc::clone(&kv_store),
2393 Arc::clone(&logger),
2394 ));
2395 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
2396 Arc::clone(&network_graph),
2397 Some(Arc::clone(&chain_source)),
2398 Arc::clone(&logger),
2399 ));
2400 let rapid_gossip_sync =
2401 Arc::new(RapidGossipSync::new(Arc::clone(&network_graph), Arc::clone(&logger)));
2402 let msg_handler = MessageHandler {
2403 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
2404 ChainHash::using_genesis_block(Network::Testnet),
2405 )),
2406 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
2407 onion_message_handler: Arc::clone(&messenger),
2408 custom_message_handler: IgnoringMessageHandler {},
2409 send_only_message_handler: IgnoringMessageHandler {},
2410 };
2411 let peer_manager = Arc::new(PeerManager::new(
2412 msg_handler,
2413 0,
2414 &seed,
2415 Arc::clone(&logger),
2416 Arc::clone(&keys_manager),
2417 ));
2418 let liquidity_manager = Arc::new(
2419 LiquidityManagerSync::new(
2420 Arc::clone(&keys_manager),
2421 Arc::clone(&keys_manager),
2422 Arc::clone(&manager),
2423 None,
2424 None,
2425 Arc::clone(&kv_store),
2426 Arc::clone(&tx_broadcaster),
2427 None,
2428 None,
2429 )
2430 .unwrap(),
2431 );
2432 let node = Node {
2433 node: manager,
2434 p2p_gossip_sync,
2435 rapid_gossip_sync,
2436 peer_manager,
2437 liquidity_manager,
2438 chain_monitor,
2439 kv_store,
2440 tx_broadcaster,
2441 network_graph,
2442 logger,
2443 best_block,
2444 scorer,
2445 sweeper,
2446 messenger,
2447 };
2448 nodes.push(node);
2449 }
2450
2451 for i in 0..num_nodes {
2452 for j in (i + 1)..num_nodes {
2453 let init_i = Init {
2454 features: nodes[j].node.init_features(),
2455 networks: None,
2456 remote_network_address: None,
2457 };
2458 nodes[i]
2459 .node
2460 .peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
2461 .unwrap();
2462 let init_j = Init {
2463 features: nodes[i].node.init_features(),
2464 networks: None,
2465 remote_network_address: None,
2466 };
2467 nodes[j]
2468 .node
2469 .peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
2470 .unwrap();
2471 }
2472 }
2473
2474 (persist_dir, nodes)
2475 }
2476
2477 macro_rules! open_channel {
2478 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2479 begin_open_channel!($node_a, $node_b, $channel_value);
2480 let events = $node_a.node.get_and_clear_pending_events();
2481 assert_eq!(events.len(), 1);
2482 let (temporary_channel_id, tx) =
2483 handle_funding_generation_ready!(events[0], $channel_value);
2484 $node_a
2485 .node
2486 .funding_transaction_generated(
2487 temporary_channel_id,
2488 $node_b.node.get_our_node_id(),
2489 tx.clone(),
2490 )
2491 .unwrap();
2492 let msg_a = get_event_msg!(
2493 $node_a,
2494 MessageSendEvent::SendFundingCreated,
2495 $node_b.node.get_our_node_id()
2496 );
2497 $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2498 get_event!($node_b, Event::ChannelPending);
2499 let msg_b = get_event_msg!(
2500 $node_b,
2501 MessageSendEvent::SendFundingSigned,
2502 $node_a.node.get_our_node_id()
2503 );
2504 $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2505 get_event!($node_a, Event::ChannelPending);
2506 tx
2507 }};
2508 }
2509
2510 macro_rules! begin_open_channel {
2511 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
2512 $node_a
2513 .node
2514 .create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
2515 .unwrap();
2516 let msg_a = get_event_msg!(
2517 $node_a,
2518 MessageSendEvent::SendOpenChannel,
2519 $node_b.node.get_our_node_id()
2520 );
2521 $node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
2522 let msg_b = get_event_msg!(
2523 $node_b,
2524 MessageSendEvent::SendAcceptChannel,
2525 $node_a.node.get_our_node_id()
2526 );
2527 $node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
2528 }};
2529 }
2530
2531 macro_rules! handle_funding_generation_ready {
2532 ($event: expr, $channel_value: expr) => {{
2533 match $event {
2534 Event::FundingGenerationReady {
2535 temporary_channel_id,
2536 channel_value_satoshis,
2537 ref output_script,
2538 user_channel_id,
2539 ..
2540 } => {
2541 assert_eq!(channel_value_satoshis, $channel_value);
2542 assert_eq!(user_channel_id, 42);
2543
2544 let tx = Transaction {
2545 version: Version::ONE,
2546 lock_time: LockTime::ZERO,
2547 input: Vec::new(),
2548 output: vec![TxOut {
2549 value: Amount::from_sat(channel_value_satoshis),
2550 script_pubkey: output_script.clone(),
2551 }],
2552 };
2553 (temporary_channel_id, tx)
2554 },
2555 _ => panic!("Unexpected event"),
2556 }
2557 }};
2558 }
2559
2560 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
2561 for i in 1..=depth {
2562 let prev_blockhash = node.best_block.block_hash;
2563 let height = node.best_block.height + 1;
2564 let header = create_dummy_header(prev_blockhash, height);
2565 let txdata = vec![(0, tx)];
2566 node.best_block = BestBlock::new(header.block_hash(), height);
2567 match i {
2568 1 => {
2569 node.node.transactions_confirmed(&header, &txdata, height);
2570 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
2571 node.sweeper.transactions_confirmed(&header, &txdata, height);
2572 },
2573 x if x == depth => {
2574 let block = (genesis_block(Network::Bitcoin), height);
2578 node.tx_broadcaster.blocks.lock().unwrap().push(block);
2579 node.node.best_block_updated(&header, height);
2580 node.chain_monitor.best_block_updated(&header, height);
2581 node.sweeper.best_block_updated(&header, height);
2582 },
2583 _ => {},
2584 }
2585 }
2586 }
2587
2588 fn advance_chain(node: &mut Node, num_blocks: u32) {
2589 for i in 1..=num_blocks {
2590 let prev_blockhash = node.best_block.block_hash;
2591 let height = node.best_block.height + 1;
2592 let header = create_dummy_header(prev_blockhash, height);
2593 node.best_block = BestBlock::new(header.block_hash(), height);
2594 if i == num_blocks {
2595 let block = (genesis_block(Network::Bitcoin), height);
2599 node.tx_broadcaster.blocks.lock().unwrap().push(block);
2600 node.node.best_block_updated(&header, height);
2601 node.chain_monitor.best_block_updated(&header, height);
2602 node.sweeper.best_block_updated(&header, height);
2603 }
2604 }
2605 }
2606
2607 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
2608 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
2609 }
2610
2611 #[test]
2612 fn test_background_processor() {
2613 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
2617
2618 let tx = open_channel!(nodes[0], nodes[1], 100000);
2622
2623 let data_dir = nodes[0].kv_store.get_data_dir();
2625 let persister = Arc::new(Persister::new(data_dir));
2626 let event_handler = |_: _| Ok(());
2627 let bg_processor = BackgroundProcessor::start(
2628 persister,
2629 event_handler,
2630 Arc::clone(&nodes[0].chain_monitor),
2631 Arc::clone(&nodes[0].node),
2632 Some(Arc::clone(&nodes[0].messenger)),
2633 nodes[0].p2p_gossip_sync(),
2634 Arc::clone(&nodes[0].peer_manager),
2635 Some(Arc::clone(&nodes[0].liquidity_manager)),
2636 Some(Arc::clone(&nodes[0].sweeper)),
2637 Arc::clone(&nodes[0].logger),
2638 Some(Arc::clone(&nodes[0].scorer)),
2639 );
2640
2641 macro_rules! check_persisted_data {
2642 ($node: expr, $filepath: expr) => {
2643 let mut expected_bytes = Vec::new();
2644 loop {
2645 expected_bytes.clear();
2646 match $node.write(&mut expected_bytes) {
2647 Ok(()) => match std::fs::read($filepath) {
2648 Ok(bytes) => {
2649 if bytes == expected_bytes {
2650 break;
2651 } else {
2652 continue;
2653 }
2654 },
2655 Err(_) => continue,
2656 },
2657 Err(e) => panic!("Unexpected error: {}", e),
2658 }
2659 }
2660 };
2661 }
2662
2663 let filepath =
2665 get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
2666 check_persisted_data!(nodes[0].node, filepath.clone());
2667
2668 loop {
2669 if !nodes[0].node.get_event_or_persist_condvar_value() {
2670 break;
2671 }
2672 }
2673
2674 let error_message = "Channel force-closed";
2676 nodes[0]
2677 .node
2678 .force_close_broadcasting_latest_txn(
2679 &ChannelId::v1_from_funding_outpoint(OutPoint {
2680 txid: tx.compute_txid(),
2681 index: 0,
2682 }),
2683 &nodes[1].node.get_our_node_id(),
2684 error_message.to_string(),
2685 )
2686 .unwrap();
2687
2688 check_persisted_data!(nodes[0].node, filepath.clone());
2690 loop {
2691 if !nodes[0].node.get_event_or_persist_condvar_value() {
2692 break;
2693 }
2694 }
2695
2696 let filepath =
2698 get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
2699 check_persisted_data!(nodes[0].network_graph, filepath.clone());
2700
2701 let filepath =
2703 get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
2704 check_persisted_data!(nodes[0].scorer, filepath.clone());
2705
2706 if !std::thread::panicking() {
2707 bg_processor.stop().unwrap();
2708 }
2709 }
2710
2711 #[test]
2712 fn test_timer_tick_called() {
2713 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
2719 let data_dir = nodes[0].kv_store.get_data_dir();
2720 let persister = Arc::new(Persister::new(data_dir));
2721 let event_handler = |_: _| Ok(());
2722 let bg_processor = BackgroundProcessor::start(
2723 persister,
2724 event_handler,
2725 Arc::clone(&nodes[0].chain_monitor),
2726 Arc::clone(&nodes[0].node),
2727 Some(Arc::clone(&nodes[0].messenger)),
2728 nodes[0].no_gossip_sync(),
2729 Arc::clone(&nodes[0].peer_manager),
2730 Some(Arc::clone(&nodes[0].liquidity_manager)),
2731 Some(Arc::clone(&nodes[0].sweeper)),
2732 Arc::clone(&nodes[0].logger),
2733 Some(Arc::clone(&nodes[0].scorer)),
2734 );
2735 loop {
2736 let log_entries = nodes[0].logger.lines.lock().unwrap();
2737 let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
2738 let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
2739 let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
2740 let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
2741 if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
2742 && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
2743 && log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
2744 && log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
2745 {
2746 break;
2747 }
2748 }
2749
2750 if !std::thread::panicking() {
2751 bg_processor.stop().unwrap();
2752 }
2753 }
2754
2755 #[test]
2756 fn test_channel_manager_persist_error() {
2757 let (_, nodes) = create_nodes(2, "test_persist_error");
2759 open_channel!(nodes[0], nodes[1], 100000);
2760
2761 let data_dir = nodes[0].kv_store.get_data_dir();
2762 let persister = Arc::new(
2763 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2764 );
2765 let event_handler = |_: _| Ok(());
2766 let bg_processor = BackgroundProcessor::start(
2767 persister,
2768 event_handler,
2769 Arc::clone(&nodes[0].chain_monitor),
2770 Arc::clone(&nodes[0].node),
2771 Some(Arc::clone(&nodes[0].messenger)),
2772 nodes[0].no_gossip_sync(),
2773 Arc::clone(&nodes[0].peer_manager),
2774 Some(Arc::clone(&nodes[0].liquidity_manager)),
2775 Some(Arc::clone(&nodes[0].sweeper)),
2776 Arc::clone(&nodes[0].logger),
2777 Some(Arc::clone(&nodes[0].scorer)),
2778 );
2779 match bg_processor.join() {
2780 Ok(_) => panic!("Expected error persisting manager"),
2781 Err(e) => {
2782 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2783 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2784 },
2785 }
2786 }
2787
2788 #[tokio::test]
2789 async fn test_channel_manager_persist_error_async() {
2790 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
2792 open_channel!(nodes[0], nodes[1], 100000);
2793
2794 let data_dir = nodes[0].kv_store.get_data_dir();
2795 let kv_store_sync = Arc::new(
2796 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2797 );
2798 let kv_store = KVStoreSyncWrapper(kv_store_sync);
2799
2800 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
2802 &*(nodes[0].liquidity_manager.get_lm_async()
2803 as *const LiquidityManager<_, _, _, _, _, _, _>)
2804 as &'static LiquidityManager<_, _, _, _, _, _, _>
2805 };
2806 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
2807 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
2808 as &'static OutputSweeper<_, _, _, _, _, _, _>
2809 };
2810
2811 let bp_future = super::process_events_async(
2812 kv_store,
2813 |_: _| async { Ok(()) },
2814 Arc::clone(&nodes[0].chain_monitor),
2815 Arc::clone(&nodes[0].node),
2816 Some(Arc::clone(&nodes[0].messenger)),
2817 nodes[0].rapid_gossip_sync(),
2818 Arc::clone(&nodes[0].peer_manager),
2819 Some(lm_async),
2820 Some(sweeper_async),
2821 Arc::clone(&nodes[0].logger),
2822 Some(Arc::clone(&nodes[0].scorer)),
2823 move |dur: Duration| {
2824 Box::pin(async move {
2825 tokio::time::sleep(dur).await;
2826 false })
2828 },
2829 false,
2830 || Some(Duration::ZERO),
2831 );
2832 match bp_future.await {
2833 Ok(_) => panic!("Expected error persisting manager"),
2834 Err(e) => {
2835 assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2836 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2837 },
2838 }
2839 }
2840
2841 #[test]
2842 fn test_network_graph_persist_error() {
2843 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2845 let data_dir = nodes[0].kv_store.get_data_dir();
2846 let persister =
2847 Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2848 let event_handler = |_: _| Ok(());
2849 let bg_processor = BackgroundProcessor::start(
2850 persister,
2851 event_handler,
2852 Arc::clone(&nodes[0].chain_monitor),
2853 Arc::clone(&nodes[0].node),
2854 Some(Arc::clone(&nodes[0].messenger)),
2855 nodes[0].p2p_gossip_sync(),
2856 Arc::clone(&nodes[0].peer_manager),
2857 Some(Arc::clone(&nodes[0].liquidity_manager)),
2858 Some(Arc::clone(&nodes[0].sweeper)),
2859 Arc::clone(&nodes[0].logger),
2860 Some(Arc::clone(&nodes[0].scorer)),
2861 );
2862
2863 match bg_processor.stop() {
2864 Ok(_) => panic!("Expected error persisting network graph"),
2865 Err(e) => {
2866 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2867 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2868 },
2869 }
2870 }
2871
2872 #[test]
2873 fn test_scorer_persist_error() {
2874 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2876 let data_dir = nodes[0].kv_store.get_data_dir();
2877 let persister =
2878 Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2879 let event_handler = |_: _| Ok(());
2880 let bg_processor = BackgroundProcessor::start(
2881 persister,
2882 event_handler,
2883 Arc::clone(&nodes[0].chain_monitor),
2884 Arc::clone(&nodes[0].node),
2885 Some(Arc::clone(&nodes[0].messenger)),
2886 nodes[0].no_gossip_sync(),
2887 Arc::clone(&nodes[0].peer_manager),
2888 Some(Arc::clone(&nodes[0].liquidity_manager)),
2889 Some(Arc::clone(&nodes[0].sweeper)),
2890 Arc::clone(&nodes[0].logger),
2891 Some(Arc::clone(&nodes[0].scorer)),
2892 );
2893
2894 match bg_processor.stop() {
2895 Ok(_) => panic!("Expected error persisting scorer"),
2896 Err(e) => {
2897 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2898 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2899 },
2900 }
2901 }
2902
2903 #[test]
2904 fn test_background_event_handling() {
2905 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2906 let node_0_id = nodes[0].node.get_our_node_id();
2907 let node_1_id = nodes[1].node.get_our_node_id();
2908
2909 let channel_value = 100000;
2910 let data_dir = nodes[0].kv_store.get_data_dir();
2911 let persister = Arc::new(Persister::new(data_dir.clone()));
2912
2913 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2915 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2916 let event_handler = move |event: Event| {
2917 match event {
2918 Event::FundingGenerationReady { .. } => funding_generation_send
2919 .send(handle_funding_generation_ready!(event, channel_value))
2920 .unwrap(),
2921 Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2922 Event::ChannelReady { .. } => {},
2923 _ => panic!("Unexpected event: {:?}", event),
2924 }
2925 Ok(())
2926 };
2927
2928 let bg_processor = BackgroundProcessor::start(
2929 persister,
2930 event_handler,
2931 Arc::clone(&nodes[0].chain_monitor),
2932 Arc::clone(&nodes[0].node),
2933 Some(Arc::clone(&nodes[0].messenger)),
2934 nodes[0].no_gossip_sync(),
2935 Arc::clone(&nodes[0].peer_manager),
2936 Some(Arc::clone(&nodes[0].liquidity_manager)),
2937 Some(Arc::clone(&nodes[0].sweeper)),
2938 Arc::clone(&nodes[0].logger),
2939 Some(Arc::clone(&nodes[0].scorer)),
2940 );
2941
2942 begin_open_channel!(nodes[0], nodes[1], channel_value);
2944 let (temporary_channel_id, funding_tx) = funding_generation_recv
2945 .recv_timeout(EVENT_DEADLINE)
2946 .expect("FundingGenerationReady not handled within deadline");
2947 nodes[0]
2948 .node
2949 .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2950 .unwrap();
2951 let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2952 nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2953 get_event!(nodes[1], Event::ChannelPending);
2954 let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2955 nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2956 channel_pending_recv
2957 .recv_timeout(EVENT_DEADLINE)
2958 .expect("ChannelPending not handled within deadline");
2959
2960 confirm_transaction(&mut nodes[0], &funding_tx);
2962 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2963 confirm_transaction(&mut nodes[1], &funding_tx);
2964 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2965 nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2966 let _as_channel_update =
2967 get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2968 nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2969 let _bs_channel_update =
2970 get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2971 let broadcast_funding =
2972 nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2973 assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2974 assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2975
2976 if !std::thread::panicking() {
2977 bg_processor.stop().unwrap();
2978 }
2979
2980 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2982 let event_handler = move |event: Event| {
2983 match event {
2984 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2985 Event::ChannelReady { .. } => {},
2986 Event::ChannelClosed { .. } => {},
2987 _ => panic!("Unexpected event: {:?}", event),
2988 }
2989 Ok(())
2990 };
2991 let persister = Arc::new(Persister::new(data_dir));
2992 let bg_processor = BackgroundProcessor::start(
2993 persister,
2994 event_handler,
2995 Arc::clone(&nodes[0].chain_monitor),
2996 Arc::clone(&nodes[0].node),
2997 Some(Arc::clone(&nodes[0].messenger)),
2998 nodes[0].no_gossip_sync(),
2999 Arc::clone(&nodes[0].peer_manager),
3000 Some(Arc::clone(&nodes[0].liquidity_manager)),
3001 Some(Arc::clone(&nodes[0].sweeper)),
3002 Arc::clone(&nodes[0].logger),
3003 Some(Arc::clone(&nodes[0].scorer)),
3004 );
3005
3006 let error_message = "Channel force-closed";
3008 nodes[0]
3009 .node
3010 .force_close_broadcasting_latest_txn(
3011 &nodes[0].node.list_channels()[0].channel_id,
3012 &node_1_id,
3013 error_message.to_string(),
3014 )
3015 .unwrap();
3016 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
3017 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
3018
3019 let event =
3020 receiver.recv_timeout(EVENT_DEADLINE).expect("Events not handled within deadline");
3021 match event {
3022 Event::SpendableOutputs { outputs, channel_id } => {
3023 nodes[0]
3024 .sweeper
3025 .track_spendable_outputs(outputs, channel_id, false, Some(153))
3026 .unwrap();
3027 },
3028 _ => panic!("Unexpected event: {:?}", event),
3029 }
3030
3031 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3033 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3034 if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
3035 assert!(!tracked_output.is_spent_in(&sweep_tx_0));
3036 match tracked_output.status {
3037 OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
3038 assert_eq!(delayed_until_height, Some(153));
3039 },
3040 _ => panic!("Unexpected status"),
3041 }
3042 }
3043
3044 advance_chain(&mut nodes[0], 3);
3045
3046 let tx_broadcaster = Arc::clone(&nodes[0].tx_broadcaster);
3047 let wait_for_sweep_tx = || -> Transaction {
3048 loop {
3049 let sweep_tx = tx_broadcaster.txn_broadcasted.lock().unwrap().pop();
3050 if let Some(sweep_tx) = sweep_tx {
3051 return sweep_tx;
3052 }
3053
3054 std::thread::sleep(Duration::from_millis(10));
3055 }
3056 };
3057
3058 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3060 let sweep_tx_0 = wait_for_sweep_tx();
3061 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3062 match tracked_output.status {
3063 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3064 assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
3065 },
3066 _ => panic!("Unexpected status"),
3067 }
3068
3069 advance_chain(&mut nodes[0], 1);
3071 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3072 let sweep_tx_1 = wait_for_sweep_tx();
3073 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3074 match tracked_output.status {
3075 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3076 assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
3077 },
3078 _ => panic!("Unexpected status"),
3079 }
3080 assert_ne!(sweep_tx_0, sweep_tx_1);
3081
3082 advance_chain(&mut nodes[0], 1);
3083 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3084 let sweep_tx_2 = wait_for_sweep_tx();
3085 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3086 match tracked_output.status {
3087 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
3088 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3089 },
3090 _ => panic!("Unexpected status"),
3091 }
3092 assert_ne!(sweep_tx_0, sweep_tx_2);
3093 assert_ne!(sweep_tx_1, sweep_tx_2);
3094
3095 confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
3097 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3098 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3099 match tracked_output.status {
3100 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3101 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3102 },
3103 _ => panic!("Unexpected status"),
3104 }
3105
3106 let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
3110 nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
3111
3112 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
3113 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
3114 match tracked_output.status {
3115 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
3116 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
3117 },
3118 _ => panic!("Unexpected status"),
3119 }
3120
3121 confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, PRUNE_DELAY_BLOCKS);
3124 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
3125
3126 if !std::thread::panicking() {
3127 bg_processor.stop().unwrap();
3128 }
3129 }
3130
3131 #[test]
3132 fn test_event_handling_failures_are_replayed() {
3133 let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
3134 let channel_value = 100000;
3135 let data_dir = nodes[0].kv_store.get_data_dir();
3136 let persister = Arc::new(Persister::new(data_dir.clone()));
3137
3138 let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
3139 let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
3140 let should_fail_event_handling = Arc::new(AtomicBool::new(true));
3141 let event_handler = move |event: Event| {
3142 if let Ok(true) = should_fail_event_handling.compare_exchange(
3143 true,
3144 false,
3145 Ordering::Acquire,
3146 Ordering::Relaxed,
3147 ) {
3148 first_event_send.send(event).unwrap();
3149 return Err(ReplayEvent());
3150 }
3151
3152 second_event_send.send(event).unwrap();
3153 Ok(())
3154 };
3155
3156 let bg_processor = BackgroundProcessor::start(
3157 persister,
3158 event_handler,
3159 Arc::clone(&nodes[0].chain_monitor),
3160 Arc::clone(&nodes[0].node),
3161 Some(Arc::clone(&nodes[0].messenger)),
3162 nodes[0].no_gossip_sync(),
3163 Arc::clone(&nodes[0].peer_manager),
3164 Some(Arc::clone(&nodes[0].liquidity_manager)),
3165 Some(Arc::clone(&nodes[0].sweeper)),
3166 Arc::clone(&nodes[0].logger),
3167 Some(Arc::clone(&nodes[0].scorer)),
3168 );
3169
3170 begin_open_channel!(nodes[0], nodes[1], channel_value);
3171 assert_eq!(
3172 first_event_recv.recv_timeout(EVENT_DEADLINE).unwrap(),
3173 second_event_recv.recv_timeout(EVENT_DEADLINE).unwrap()
3174 );
3175
3176 if !std::thread::panicking() {
3177 bg_processor.stop().unwrap();
3178 }
3179 }
3180
3181 #[test]
3182 fn test_scorer_persistence() {
3183 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
3184 let data_dir = nodes[0].kv_store.get_data_dir();
3185 let persister = Arc::new(Persister::new(data_dir));
3186 let event_handler = |_: _| Ok(());
3187 let bg_processor = BackgroundProcessor::start(
3188 persister,
3189 event_handler,
3190 Arc::clone(&nodes[0].chain_monitor),
3191 Arc::clone(&nodes[0].node),
3192 Some(Arc::clone(&nodes[0].messenger)),
3193 nodes[0].no_gossip_sync(),
3194 Arc::clone(&nodes[0].peer_manager),
3195 Some(Arc::clone(&nodes[0].liquidity_manager)),
3196 Some(Arc::clone(&nodes[0].sweeper)),
3197 Arc::clone(&nodes[0].logger),
3198 Some(Arc::clone(&nodes[0].scorer)),
3199 );
3200
3201 loop {
3202 let log_entries = nodes[0].logger.lines.lock().unwrap();
3203 let expected_log = "Calling time_passed and persisting scorer".to_string();
3204 if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
3205 break;
3206 }
3207 }
3208
3209 if !std::thread::panicking() {
3210 bg_processor.stop().unwrap();
3211 }
3212 }
3213
3214 macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
3215 ($nodes: expr, $receive: expr, $sleep: expr) => {
3216 let features = ChannelFeatures::empty();
3217 $nodes[0]
3218 .network_graph
3219 .add_channel_from_partial_announcement(
3220 42,
3221 None,
3222 53,
3223 features,
3224 $nodes[0].node.get_our_node_id().into(),
3225 $nodes[1].node.get_our_node_id().into(),
3226 )
3227 .expect("Failed to update channel from partial announcement");
3228 let original_graph_description = $nodes[0].network_graph.to_string();
3229 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
3230 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
3231
3232 loop {
3233 $sleep;
3234 let log_entries = $nodes[0].logger.lines.lock().unwrap();
3235 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
3236 if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
3237 > 1
3238 {
3239 break;
3241 }
3242 }
3243
3244 let initialization_input = vec![
3245 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
3246 247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
3247 98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
3248 250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
3249 67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
3250 115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
3251 1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
3252 65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
3253 149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
3254 175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
3255 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
3256 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
3257 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
3258 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
3259 ];
3260 $nodes[0]
3261 .rapid_gossip_sync
3262 .update_network_graph_no_std(&initialization_input[..], Some(1642291930))
3263 .unwrap();
3264
3265 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
3267
3268 $receive.expect("Network graph not pruned within deadline");
3269
3270 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
3272 };
3273 }
3274
3275 #[test]
3276 fn test_not_pruning_network_graph_until_graph_sync_completion() {
3277 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3278
3279 let (_, nodes) =
3280 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
3281 let data_dir = nodes[0].kv_store.get_data_dir();
3282 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3283
3284 let event_handler = |_: _| Ok(());
3285 let background_processor = BackgroundProcessor::start(
3286 persister,
3287 event_handler,
3288 Arc::clone(&nodes[0].chain_monitor),
3289 Arc::clone(&nodes[0].node),
3290 Some(Arc::clone(&nodes[0].messenger)),
3291 nodes[0].rapid_gossip_sync(),
3292 Arc::clone(&nodes[0].peer_manager),
3293 Some(Arc::clone(&nodes[0].liquidity_manager)),
3294 Some(Arc::clone(&nodes[0].sweeper)),
3295 Arc::clone(&nodes[0].logger),
3296 Some(Arc::clone(&nodes[0].scorer)),
3297 );
3298
3299 do_test_not_pruning_network_graph_until_graph_sync_completion!(
3300 nodes,
3301 receiver.recv_timeout(super::FIRST_NETWORK_PRUNE_TIMER * 5),
3302 std::thread::sleep(Duration::from_millis(1))
3303 );
3304
3305 background_processor.stop().unwrap();
3306 }
3307
3308 #[tokio::test]
3309 async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
3310 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3311
3312 let (_, nodes) =
3313 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
3314 let data_dir = nodes[0].kv_store.get_data_dir();
3315 let kv_store_sync =
3316 Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
3317 let kv_store = KVStoreSyncWrapper(kv_store_sync);
3318
3319 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3321 &*(nodes[0].liquidity_manager.get_lm_async()
3322 as *const LiquidityManager<_, _, _, _, _, _, _>)
3323 as &'static LiquidityManager<_, _, _, _, _, _, _>
3324 };
3325 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3326 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3327 as &'static OutputSweeper<_, _, _, _, _, _, _>
3328 };
3329
3330 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3331 let bp_future = super::process_events_async(
3332 kv_store,
3333 |_: _| async { Ok(()) },
3334 Arc::clone(&nodes[0].chain_monitor),
3335 Arc::clone(&nodes[0].node),
3336 Some(Arc::clone(&nodes[0].messenger)),
3337 nodes[0].rapid_gossip_sync(),
3338 Arc::clone(&nodes[0].peer_manager),
3339 Some(lm_async),
3340 Some(sweeper_async),
3341 Arc::clone(&nodes[0].logger),
3342 Some(Arc::clone(&nodes[0].scorer)),
3343 move |dur: Duration| {
3344 let mut exit_receiver = exit_receiver.clone();
3345 Box::pin(async move {
3346 tokio::select! {
3347 _ = tokio::time::sleep(dur) => false,
3348 _ = exit_receiver.changed() => true,
3349 }
3350 })
3351 },
3352 false,
3353 || Some(Duration::from_secs(1696300000)),
3354 );
3355
3356 let t1 = tokio::spawn(bp_future);
3357 let t2 = tokio::spawn(async move {
3358 do_test_not_pruning_network_graph_until_graph_sync_completion!(
3359 nodes,
3360 {
3361 let mut i = 0;
3362 loop {
3363 tokio::time::sleep(super::FIRST_NETWORK_PRUNE_TIMER).await;
3364 if let Ok(()) = receiver.try_recv() {
3365 break Ok::<(), ()>(());
3366 }
3367 assert!(i < 5);
3368 i += 1;
3369 }
3370 },
3371 tokio::time::sleep(Duration::from_millis(1)).await
3372 );
3373 exit_sender.send(()).unwrap();
3374 });
3375 let (r1, r2) = tokio::join!(t1, t2);
3376 r1.unwrap().unwrap();
3377 r2.unwrap()
3378 }
3379
3380 macro_rules! do_test_payment_path_scoring {
3381 ($nodes: expr, $receive: expr) => {
3382 let scored_scid = 4242;
3388 let secp_ctx = Secp256k1::new();
3389 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
3390 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
3391
3392 let path = Path { hops: vec![RouteHop {
3393 pubkey: node_1_id,
3394 node_features: NodeFeatures::empty(),
3395 short_channel_id: scored_scid,
3396 channel_features: ChannelFeatures::empty(),
3397 fee_msat: 0,
3398 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
3399 maybe_announced_channel: true,
3400 }], blinded_tail: None };
3401
3402 $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
3403 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3404 payment_id: None,
3405 payment_hash: PaymentHash([42; 32]),
3406 payment_failed_permanently: false,
3407 failure: PathFailure::OnPath { network_update: None },
3408 path: path.clone(),
3409 short_channel_id: Some(scored_scid),
3410 error_code: None,
3411 error_data: None,
3412 hold_times: Vec::new(),
3413 });
3414 let event = $receive.expect("PaymentPathFailed not handled within deadline");
3415 match event {
3416 Event::PaymentPathFailed { .. } => {},
3417 _ => panic!("Unexpected event"),
3418 }
3419
3420 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3423 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
3424 payment_id: None,
3425 payment_hash: PaymentHash([42; 32]),
3426 payment_failed_permanently: true,
3427 failure: PathFailure::OnPath { network_update: None },
3428 path: path.clone(),
3429 short_channel_id: None,
3430 error_code: None,
3431 error_data: None,
3432 hold_times: Vec::new(),
3433 });
3434 let event = $receive.expect("PaymentPathFailed not handled within deadline");
3435 match event {
3436 Event::PaymentPathFailed { .. } => {},
3437 _ => panic!("Unexpected event"),
3438 }
3439
3440 $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
3441 $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
3442 payment_id: PaymentId([42; 32]),
3443 payment_hash: None,
3444 path: path.clone(),
3445 hold_times: Vec::new(),
3446 });
3447 let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
3448 match event {
3449 Event::PaymentPathSuccessful { .. } => {},
3450 _ => panic!("Unexpected event"),
3451 }
3452
3453 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
3454 $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
3455 payment_id: PaymentId([42; 32]),
3456 payment_hash: PaymentHash([42; 32]),
3457 path: path.clone(),
3458 });
3459 let event = $receive.expect("ProbeSuccessful not handled within deadline");
3460 match event {
3461 Event::ProbeSuccessful { .. } => {},
3462 _ => panic!("Unexpected event"),
3463 }
3464
3465 $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
3466 $nodes[0].node.push_pending_event(Event::ProbeFailed {
3467 payment_id: PaymentId([42; 32]),
3468 payment_hash: PaymentHash([42; 32]),
3469 path,
3470 short_channel_id: Some(scored_scid),
3471 });
3472 let event = $receive.expect("ProbeFailure not handled within deadline");
3473 match event {
3474 Event::ProbeFailed { .. } => {},
3475 _ => panic!("Unexpected event"),
3476 }
3477 }
3478 }
3479
3480 #[test]
3481 fn test_payment_path_scoring() {
3482 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
3483 let event_handler = move |event: Event| {
3484 match event {
3485 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
3486 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
3487 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
3488 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
3489 _ => panic!("Unexpected event: {:?}", event),
3490 }
3491 Ok(())
3492 };
3493
3494 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
3495 let data_dir = nodes[0].kv_store.get_data_dir();
3496 let persister = Arc::new(Persister::new(data_dir));
3497 let bg_processor = BackgroundProcessor::start(
3498 persister,
3499 event_handler,
3500 Arc::clone(&nodes[0].chain_monitor),
3501 Arc::clone(&nodes[0].node),
3502 Some(Arc::clone(&nodes[0].messenger)),
3503 nodes[0].no_gossip_sync(),
3504 Arc::clone(&nodes[0].peer_manager),
3505 Some(Arc::clone(&nodes[0].liquidity_manager)),
3506 Some(Arc::clone(&nodes[0].sweeper)),
3507 Arc::clone(&nodes[0].logger),
3508 Some(Arc::clone(&nodes[0].scorer)),
3509 );
3510
3511 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(EVENT_DEADLINE));
3512
3513 if !std::thread::panicking() {
3514 bg_processor.stop().unwrap();
3515 }
3516
3517 let log_entries = nodes[0].logger.lines.lock().unwrap();
3518 let expected_log = "Persisting scorer after update".to_string();
3519 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
3520 }
3521
3522 #[tokio::test]
3523 async fn test_payment_path_scoring_async() {
3524 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
3525 let event_handler = move |event: Event| {
3526 let sender_ref = sender.clone();
3527 async move {
3528 match event {
3529 Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
3530 Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3531 Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
3532 Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
3533 _ => panic!("Unexpected event: {:?}", event),
3534 }
3535 Ok(())
3536 }
3537 };
3538
3539 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
3540 let data_dir = nodes[0].kv_store.get_data_dir();
3541 let kv_store_sync = Arc::new(Persister::new(data_dir));
3542 let kv_store = KVStoreSyncWrapper(kv_store_sync);
3543
3544 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3545
3546 let lm_async: &'static LiquidityManager<_, _, _, _, _, _, _> = unsafe {
3548 &*(nodes[0].liquidity_manager.get_lm_async()
3549 as *const LiquidityManager<_, _, _, _, _, _, _>)
3550 as &'static LiquidityManager<_, _, _, _, _, _, _>
3551 };
3552 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3553 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3554 as &'static OutputSweeper<_, _, _, _, _, _, _>
3555 };
3556
3557 let bp_future = super::process_events_async(
3558 kv_store,
3559 event_handler,
3560 Arc::clone(&nodes[0].chain_monitor),
3561 Arc::clone(&nodes[0].node),
3562 Some(Arc::clone(&nodes[0].messenger)),
3563 nodes[0].no_gossip_sync(),
3564 Arc::clone(&nodes[0].peer_manager),
3565 Some(lm_async),
3566 Some(sweeper_async),
3567 Arc::clone(&nodes[0].logger),
3568 Some(Arc::clone(&nodes[0].scorer)),
3569 move |dur: Duration| {
3570 let mut exit_receiver = exit_receiver.clone();
3571 Box::pin(async move {
3572 tokio::select! {
3573 _ = tokio::time::sleep(dur) => false,
3574 _ = exit_receiver.changed() => true,
3575 }
3576 })
3577 },
3578 false,
3579 || Some(Duration::ZERO),
3580 );
3581 let t1 = tokio::spawn(bp_future);
3582 let t2 = tokio::spawn(async move {
3583 do_test_payment_path_scoring!(nodes, receiver.recv().await);
3584 exit_sender.send(()).unwrap();
3585
3586 let log_entries = nodes[0].logger.lines.lock().unwrap();
3587 let expected_log = "Persisting scorer after update".to_string();
3588 assert_eq!(
3589 *log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
3590 5
3591 );
3592 });
3593
3594 let (r1, r2) = tokio::join!(t1, t2);
3595 r1.unwrap().unwrap();
3596 r2.unwrap()
3597 }
3598
3599 #[tokio::test]
3600 #[cfg(not(c_bindings))]
3601 async fn test_no_consts() {
3602 let (_, nodes) = create_nodes(1, "test_no_consts");
3604 let bg_processor = BackgroundProcessor::start(
3605 Arc::clone(&nodes[0].kv_store),
3606 move |_: Event| Ok(()),
3607 Arc::clone(&nodes[0].chain_monitor),
3608 Arc::clone(&nodes[0].node),
3609 crate::NO_ONION_MESSENGER,
3610 nodes[0].no_gossip_sync(),
3611 Arc::clone(&nodes[0].peer_manager),
3612 crate::NO_LIQUIDITY_MANAGER_SYNC,
3613 Some(Arc::clone(&nodes[0].sweeper)),
3614 Arc::clone(&nodes[0].logger),
3615 Some(Arc::clone(&nodes[0].scorer)),
3616 );
3617
3618 if !std::thread::panicking() {
3619 bg_processor.stop().unwrap();
3620 }
3621
3622 let kv_store = KVStoreSyncWrapper(Arc::clone(&nodes[0].kv_store));
3623 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
3624 let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe {
3625 &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>)
3626 as &'static OutputSweeper<_, _, _, _, _, _, _>
3627 };
3628 let bp_future = super::process_events_async(
3629 kv_store,
3630 move |_: Event| async move { Ok(()) },
3631 Arc::clone(&nodes[0].chain_monitor),
3632 Arc::clone(&nodes[0].node),
3633 crate::NO_ONION_MESSENGER,
3634 nodes[0].no_gossip_sync(),
3635 Arc::clone(&nodes[0].peer_manager),
3636 crate::NO_LIQUIDITY_MANAGER,
3637 Some(sweeper_async),
3638 Arc::clone(&nodes[0].logger),
3639 Some(Arc::clone(&nodes[0].scorer)),
3640 move |dur: Duration| {
3641 let mut exit_receiver = exit_receiver.clone();
3642 Box::pin(async move {
3643 tokio::select! {
3644 _ = tokio::time::sleep(dur) => false,
3645 _ = exit_receiver.changed() => true,
3646 }
3647 })
3648 },
3649 false,
3650 || Some(Duration::ZERO),
3651 );
3652 let t1 = tokio::spawn(bp_future);
3653 exit_sender.send(()).unwrap();
3654 t1.await.unwrap().unwrap();
3655 }
3656}