1#![cfg_attr(feature = "std", doc = "See docs for [`BackgroundProcessor`] for more details.")]
4#![deny(rustdoc::broken_intra_doc_links)]
5#![deny(rustdoc::private_intra_doc_links)]
6#![deny(missing_docs)]
7#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
8#![cfg_attr(docsrs, feature(doc_auto_cfg))]
9#![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
10
11#[cfg(any(test, feature = "std"))]
12extern crate core;
13
14#[cfg(not(feature = "std"))]
15extern crate alloc;
16
17#[macro_use]
18extern crate lightning;
19extern crate lightning_rapid_gossip_sync;
20
21use lightning::chain;
22use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
23use lightning::chain::chainmonitor::{ChainMonitor, Persist};
24#[cfg(feature = "std")]
25use lightning::events::EventHandler;
26#[cfg(feature = "std")]
27use lightning::events::EventsProvider;
28#[cfg(feature = "futures")]
29use lightning::events::ReplayEvent;
30use lightning::events::{Event, PathFailure};
31
32use lightning::ln::channelmanager::AChannelManager;
33use lightning::ln::msgs::OnionMessageHandler;
34use lightning::ln::peer_handler::APeerManager;
35use lightning::onion_message::messenger::AOnionMessenger;
36use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
37use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
38use lightning::routing::utxo::UtxoLookup;
39use lightning::util::logger::Logger;
40use lightning::util::persist::Persister;
41#[cfg(feature = "std")]
42use lightning::util::wakers::Sleeper;
43use lightning_rapid_gossip_sync::RapidGossipSync;
44
45use core::ops::Deref;
46use core::time::Duration;
47
48#[cfg(feature = "std")]
49use core::sync::atomic::{AtomicBool, Ordering};
50#[cfg(feature = "std")]
51use std::sync::Arc;
52#[cfg(feature = "std")]
53use std::thread::{self, JoinHandle};
54#[cfg(feature = "std")]
55use std::time::Instant;
56
57#[cfg(not(feature = "std"))]
58use alloc::boxed::Box;
59
60#[cfg(feature = "std")]
89#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
90pub struct BackgroundProcessor {
91 stop_thread: Arc<AtomicBool>,
92 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
93}
94
95#[cfg(not(test))]
96const FRESHNESS_TIMER: u64 = 60;
97#[cfg(test)]
98const FRESHNESS_TIMER: u64 = 1;
99
100#[cfg(all(not(test), not(debug_assertions)))]
101const PING_TIMER: u64 = 10;
102#[cfg(all(not(test), debug_assertions))]
106const PING_TIMER: u64 = 30;
107#[cfg(test)]
108const PING_TIMER: u64 = 1;
109
110#[cfg(not(test))]
111const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
112#[cfg(test)]
113const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
114
115const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
117
118#[cfg(not(test))]
119const SCORER_PERSIST_TIMER: u64 = 60 * 5;
120#[cfg(test)]
121const SCORER_PERSIST_TIMER: u64 = 1;
122
123#[cfg(not(test))]
124const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
125#[cfg(test)]
126const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
127
128#[cfg(not(test))]
129const REBROADCAST_TIMER: u64 = 30;
130#[cfg(test)]
131const REBROADCAST_TIMER: u64 = 1;
132
133#[cfg(feature = "futures")]
134const fn min_u64(a: u64, b: u64) -> u64 {
136 if a < b {
137 a
138 } else {
139 b
140 }
141}
142#[cfg(feature = "futures")]
143const FASTEST_TIMER: u64 = min_u64(
144 min_u64(FRESHNESS_TIMER, PING_TIMER),
145 min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)),
146);
147
148pub enum GossipSync<
150 P: Deref<Target = P2PGossipSync<G, U, L>>,
151 R: Deref<Target = RapidGossipSync<G, L>>,
152 G: Deref<Target = NetworkGraph<L>>,
153 U: Deref,
154 L: Deref,
155> where
156 U::Target: UtxoLookup,
157 L::Target: Logger,
158{
159 P2P(P),
161 Rapid(R),
163 None,
165}
166
167impl<
168 P: Deref<Target = P2PGossipSync<G, U, L>>,
169 R: Deref<Target = RapidGossipSync<G, L>>,
170 G: Deref<Target = NetworkGraph<L>>,
171 U: Deref,
172 L: Deref,
173 > GossipSync<P, R, G, U, L>
174where
175 U::Target: UtxoLookup,
176 L::Target: Logger,
177{
178 fn network_graph(&self) -> Option<&G> {
179 match self {
180 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
181 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
182 GossipSync::None => None,
183 }
184 }
185
186 fn prunable_network_graph(&self) -> Option<&G> {
187 match self {
188 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
189 GossipSync::Rapid(gossip_sync) => {
190 if gossip_sync.is_initial_sync_complete() {
191 Some(gossip_sync.network_graph())
192 } else {
193 None
194 }
195 },
196 GossipSync::None => None,
197 }
198 }
199}
200
201impl<
203 P: Deref<Target = P2PGossipSync<G, U, L>>,
204 G: Deref<Target = NetworkGraph<L>>,
205 U: Deref,
206 L: Deref,
207 > GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
208where
209 U::Target: UtxoLookup,
210 L::Target: Logger,
211{
212 pub fn p2p(gossip_sync: P) -> Self {
214 GossipSync::P2P(gossip_sync)
215 }
216}
217
218impl<
220 'a,
221 R: Deref<Target = RapidGossipSync<G, L>>,
222 G: Deref<Target = NetworkGraph<L>>,
223 L: Deref,
224 >
225 GossipSync<
226 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
227 R,
228 G,
229 &'a (dyn UtxoLookup + Send + Sync),
230 L,
231 > where
232 L::Target: Logger,
233{
234 pub fn rapid(gossip_sync: R) -> Self {
236 GossipSync::Rapid(gossip_sync)
237 }
238}
239
240impl<'a, L: Deref>
242 GossipSync<
243 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
244 &RapidGossipSync<&'a NetworkGraph<L>, L>,
245 &'a NetworkGraph<L>,
246 &'a (dyn UtxoLookup + Send + Sync),
247 L,
248 > where
249 L::Target: Logger,
250{
251 pub fn none() -> Self {
253 GossipSync::None
254 }
255}
256
257fn handle_network_graph_update<L: Deref>(network_graph: &NetworkGraph<L>, event: &Event)
258where
259 L::Target: Logger,
260{
261 if let Event::PaymentPathFailed {
262 failure: PathFailure::OnPath { network_update: Some(ref upd) },
263 ..
264 } = event
265 {
266 network_graph.handle_network_update(upd);
267 }
268}
269
270fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
273 scorer: &'a S, event: &Event, duration_since_epoch: Duration,
274) -> bool {
275 match event {
276 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
277 let mut score = scorer.write_lock();
278 score.payment_path_failed(path, *scid, duration_since_epoch);
279 },
280 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
281 let mut score = scorer.write_lock();
284 score.probe_successful(path, duration_since_epoch);
285 },
286 Event::PaymentPathSuccessful { path, .. } => {
287 let mut score = scorer.write_lock();
288 score.payment_path_successful(path, duration_since_epoch);
289 },
290 Event::ProbeSuccessful { path, .. } => {
291 let mut score = scorer.write_lock();
292 score.probe_successful(path, duration_since_epoch);
293 },
294 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
295 let mut score = scorer.write_lock();
296 score.probe_failed(path, *scid, duration_since_epoch);
297 },
298 _ => return false,
299 }
300 true
301}
302
303macro_rules! define_run_body {
304 (
305 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
306 $channel_manager: ident, $process_channel_manager_events: expr,
307 $onion_messenger: ident, $process_onion_message_handler_events: expr,
308 $peer_manager: ident, $gossip_sync: ident,
309 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
310 $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
311 ) => { {
312 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
313 $channel_manager.get_cm().timer_tick_occurred();
314 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
315 $chain_monitor.rebroadcast_pending_claims();
316
317 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
318 let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
319 let mut last_ping_call = $get_timer(PING_TIMER);
320 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
321 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
322 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
323 let mut have_pruned = false;
324 let mut have_decayed_scorer = false;
325
326 loop {
327 $process_channel_manager_events;
328 $process_chain_monitor_events;
329 $process_onion_message_handler_events;
330
331 $peer_manager.as_ref().process_events();
343
344 if $loop_exit_check {
346 log_trace!($logger, "Terminating background processor.");
347 break;
348 }
349
350 let mut await_start = None;
353 if $check_slow_await { await_start = Some($get_timer(1)); }
354 $await;
355 let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
356
357 if $loop_exit_check {
359 log_trace!($logger, "Terminating background processor.");
360 break;
361 }
362
363 if $channel_manager.get_cm().get_and_clear_needs_persistence() {
364 log_trace!($logger, "Persisting ChannelManager...");
365 $persister.persist_manager(&$channel_manager)?;
366 log_trace!($logger, "Done persisting ChannelManager.");
367 }
368 if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
369 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
370 $channel_manager.get_cm().timer_tick_occurred();
371 last_freshness_call = $get_timer(FRESHNESS_TIMER);
372 }
373 if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
374 if let Some(om) = &$onion_messenger {
375 log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
376 om.get_om().timer_tick_occurred();
377 }
378 last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
379 }
380 if await_slow {
381 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
394 $peer_manager.as_ref().disconnect_all_peers();
395 last_ping_call = $get_timer(PING_TIMER);
396 } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
397 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
398 $peer_manager.as_ref().timer_tick_occurred();
399 last_ping_call = $get_timer(PING_TIMER);
400 }
401
402 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
408 let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
409 let should_prune = match $gossip_sync {
410 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
411 _ => prune_timer_elapsed,
412 };
413 if should_prune {
414 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
416 if let Some(duration_since_epoch) = $time_fetch() {
417 log_trace!($logger, "Pruning and persisting network graph.");
418 network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
419 } else {
420 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
421 log_trace!($logger, "Persisting network graph.");
422 }
423
424 if let Err(e) = $persister.persist_graph(network_graph) {
425 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
426 }
427
428 have_pruned = true;
429 }
430 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
431 last_prune_call = $get_timer(prune_timer);
432 }
433
434 if !have_decayed_scorer {
435 if let Some(ref scorer) = $scorer {
436 if let Some(duration_since_epoch) = $time_fetch() {
437 log_trace!($logger, "Calling time_passed on scorer at startup");
438 scorer.write_lock().time_passed(duration_since_epoch);
439 }
440 }
441 have_decayed_scorer = true;
442 }
443
444 if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
445 if let Some(ref scorer) = $scorer {
446 if let Some(duration_since_epoch) = $time_fetch() {
447 log_trace!($logger, "Calling time_passed and persisting scorer");
448 scorer.write_lock().time_passed(duration_since_epoch);
449 } else {
450 log_trace!($logger, "Persisting scorer");
451 }
452 if let Err(e) = $persister.persist_scorer(&scorer) {
453 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
454 }
455 }
456 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
457 }
458
459 if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
460 log_trace!($logger, "Rebroadcasting monitor's pending claims");
461 $chain_monitor.rebroadcast_pending_claims();
462 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
463 }
464 }
465
466 $persister.persist_manager(&$channel_manager)?;
470
471 if let Some(ref scorer) = $scorer {
473 $persister.persist_scorer(&scorer)?;
474 }
475
476 if let Some(network_graph) = $gossip_sync.network_graph() {
478 $persister.persist_graph(network_graph)?;
479 }
480
481 Ok(())
482 } }
483}
484
485#[cfg(feature = "futures")]
486pub(crate) mod futures_util {
487 use core::future::Future;
488 use core::marker::Unpin;
489 use core::pin::Pin;
490 use core::task::{Poll, RawWaker, RawWakerVTable, Waker};
491 pub(crate) struct Selector<
492 A: Future<Output = ()> + Unpin,
493 B: Future<Output = ()> + Unpin,
494 C: Future<Output = ()> + Unpin,
495 D: Future<Output = bool> + Unpin,
496 > {
497 pub a: A,
498 pub b: B,
499 pub c: C,
500 pub d: D,
501 }
502
503 pub(crate) enum SelectorOutput {
504 A,
505 B,
506 C,
507 D(bool),
508 }
509
510 impl<
511 A: Future<Output = ()> + Unpin,
512 B: Future<Output = ()> + Unpin,
513 C: Future<Output = ()> + Unpin,
514 D: Future<Output = bool> + Unpin,
515 > Future for Selector<A, B, C, D>
516 {
517 type Output = SelectorOutput;
518 fn poll(
519 mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>,
520 ) -> Poll<SelectorOutput> {
521 match Pin::new(&mut self.a).poll(ctx) {
522 Poll::Ready(()) => {
523 return Poll::Ready(SelectorOutput::A);
524 },
525 Poll::Pending => {},
526 }
527 match Pin::new(&mut self.b).poll(ctx) {
528 Poll::Ready(()) => {
529 return Poll::Ready(SelectorOutput::B);
530 },
531 Poll::Pending => {},
532 }
533 match Pin::new(&mut self.c).poll(ctx) {
534 Poll::Ready(()) => {
535 return Poll::Ready(SelectorOutput::C);
536 },
537 Poll::Pending => {},
538 }
539 match Pin::new(&mut self.d).poll(ctx) {
540 Poll::Ready(res) => {
541 return Poll::Ready(SelectorOutput::D(res));
542 },
543 Poll::Pending => {},
544 }
545 Poll::Pending
546 }
547 }
548
549 pub(crate) struct OptionalSelector<F: Future<Output = ()> + Unpin> {
552 pub optional_future: Option<F>,
553 }
554
555 impl<F: Future<Output = ()> + Unpin> Future for OptionalSelector<F> {
556 type Output = ();
557 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
558 match self.optional_future.as_mut() {
559 Some(f) => match Pin::new(f).poll(ctx) {
560 Poll::Ready(()) => {
561 self.optional_future.take();
562 Poll::Ready(())
563 },
564 Poll::Pending => Poll::Pending,
565 },
566 None => Poll::Pending,
567 }
568 }
569 }
570
571 fn dummy_waker_clone(_: *const ()) -> RawWaker {
575 RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)
576 }
577 fn dummy_waker_action(_: *const ()) {}
578
579 const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
580 dummy_waker_clone,
581 dummy_waker_action,
582 dummy_waker_action,
583 dummy_waker_action,
584 );
585 pub(crate) fn dummy_waker() -> Waker {
586 unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
587 }
588}
589#[cfg(feature = "futures")]
590use core::task;
591#[cfg(feature = "futures")]
592use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
593
594#[cfg(feature = "futures")]
727pub async fn process_events_async<
728 'a,
729 UL: 'static + Deref + Send + Sync,
730 CF: 'static + Deref + Send + Sync,
731 T: 'static + Deref + Send + Sync,
732 F: 'static + Deref + Send + Sync,
733 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
734 L: 'static + Deref + Send + Sync,
735 P: 'static + Deref + Send + Sync,
736 EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
737 EventHandler: Fn(Event) -> EventHandlerFuture,
738 PS: 'static + Deref + Send,
739 M: 'static
740 + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
741 + Send
742 + Sync,
743 CM: 'static + Deref + Send + Sync,
744 OM: 'static + Deref + Send + Sync,
745 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
746 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
747 PM: 'static + Deref + Send + Sync,
748 S: 'static + Deref<Target = SC> + Send + Sync,
749 SC: for<'b> WriteableScore<'b>,
750 SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
751 Sleeper: Fn(Duration) -> SleepFuture,
752 FetchTime: Fn() -> Option<Duration>,
753>(
754 persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
755 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
756 logger: L, scorer: Option<S>, sleeper: Sleeper, mobile_interruptable_platform: bool,
757 fetch_time: FetchTime,
758) -> Result<(), lightning::io::Error>
759where
760 UL::Target: 'static + UtxoLookup,
761 CF::Target: 'static + chain::Filter,
762 T::Target: 'static + BroadcasterInterface,
763 F::Target: 'static + FeeEstimator,
764 L::Target: 'static + Logger,
765 P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
766 PS::Target: 'static + Persister<'a, CM, L, S>,
767 CM::Target: AChannelManager + Send + Sync,
768 OM::Target: AOnionMessenger + Send + Sync,
769 PM::Target: APeerManager + Send + Sync,
770{
771 let mut should_break = false;
772 let async_event_handler = |event| {
773 let network_graph = gossip_sync.network_graph();
774 let event_handler = &event_handler;
775 let scorer = &scorer;
776 let logger = &logger;
777 let persister = &persister;
778 let fetch_time = &fetch_time;
779 Box::pin(async move {
781 if let Some(network_graph) = network_graph {
782 handle_network_graph_update(network_graph, &event)
783 }
784 if let Some(ref scorer) = scorer {
785 if let Some(duration_since_epoch) = fetch_time() {
786 if update_scorer(scorer, &event, duration_since_epoch) {
787 log_trace!(logger, "Persisting scorer after update");
788 if let Err(e) = persister.persist_scorer(&*scorer) {
789 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
790 }
795 }
796 }
797 }
798 event_handler(event).await
799 })
800 };
801 define_run_body!(
802 persister,
803 chain_monitor,
804 chain_monitor.process_pending_events_async(async_event_handler).await,
805 channel_manager,
806 channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
807 onion_messenger,
808 if let Some(om) = &onion_messenger {
809 om.get_om().process_pending_events_async(async_event_handler).await
810 },
811 peer_manager,
812 gossip_sync,
813 logger,
814 scorer,
815 should_break,
816 {
817 let om_fut = if let Some(om) = onion_messenger.as_ref() {
818 let fut = om.get_om().get_update_future();
819 OptionalSelector { optional_future: Some(fut) }
820 } else {
821 OptionalSelector { optional_future: None }
822 };
823 let fut = Selector {
824 a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
825 b: chain_monitor.get_update_future(),
826 c: om_fut,
827 d: sleeper(if mobile_interruptable_platform {
828 Duration::from_millis(100)
829 } else {
830 Duration::from_secs(FASTEST_TIMER)
831 }),
832 };
833 match fut.await {
834 SelectorOutput::A | SelectorOutput::B | SelectorOutput::C => {},
835 SelectorOutput::D(exit) => {
836 should_break = exit;
837 },
838 }
839 },
840 |t| sleeper(Duration::from_secs(t)),
841 |fut: &mut SleepFuture, _| {
842 let mut waker = dummy_waker();
843 let mut ctx = task::Context::from_waker(&mut waker);
844 match core::pin::Pin::new(fut).poll(&mut ctx) {
845 task::Poll::Ready(exit) => {
846 should_break = exit;
847 true
848 },
849 task::Poll::Pending => false,
850 }
851 },
852 mobile_interruptable_platform,
853 fetch_time,
854 )
855}
856
857#[cfg(feature = "std")]
858impl BackgroundProcessor {
859 pub fn start<
904 'a,
905 UL: 'static + Deref + Send + Sync,
906 CF: 'static + Deref + Send + Sync,
907 T: 'static + Deref + Send + Sync,
908 F: 'static + Deref + Send + Sync,
909 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
910 L: 'static + Deref + Send + Sync,
911 P: 'static + Deref + Send + Sync,
912 EH: 'static + EventHandler + Send,
913 PS: 'static + Deref + Send,
914 M: 'static
915 + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>>
916 + Send
917 + Sync,
918 CM: 'static + Deref + Send + Sync,
919 OM: 'static + Deref + Send + Sync,
920 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
921 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
922 PM: 'static + Deref + Send + Sync,
923 S: 'static + Deref<Target = SC> + Send + Sync,
924 SC: for<'b> WriteableScore<'b>,
925 >(
926 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
927 onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
928 logger: L, scorer: Option<S>,
929 ) -> Self
930 where
931 UL::Target: 'static + UtxoLookup,
932 CF::Target: 'static + chain::Filter,
933 T::Target: 'static + BroadcasterInterface,
934 F::Target: 'static + FeeEstimator,
935 L::Target: 'static + Logger,
936 P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
937 PS::Target: 'static + Persister<'a, CM, L, S>,
938 CM::Target: AChannelManager + Send + Sync,
939 OM::Target: AOnionMessenger + Send + Sync,
940 PM::Target: APeerManager + Send + Sync,
941 {
942 let stop_thread = Arc::new(AtomicBool::new(false));
943 let stop_thread_clone = stop_thread.clone();
944 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
945 let event_handler = |event| {
946 let network_graph = gossip_sync.network_graph();
947 if let Some(network_graph) = network_graph {
948 handle_network_graph_update(network_graph, &event)
949 }
950 if let Some(ref scorer) = scorer {
951 use std::time::SystemTime;
952 let duration_since_epoch = SystemTime::now()
953 .duration_since(SystemTime::UNIX_EPOCH)
954 .expect("Time should be sometime after 1970");
955 if update_scorer(scorer, &event, duration_since_epoch) {
956 log_trace!(logger, "Persisting scorer after update");
957 if let Err(e) = persister.persist_scorer(&scorer) {
958 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
959 }
960 }
961 }
962 event_handler.handle_event(event)
963 };
964 define_run_body!(
965 persister,
966 chain_monitor,
967 chain_monitor.process_pending_events(&event_handler),
968 channel_manager,
969 channel_manager.get_cm().process_pending_events(&event_handler),
970 onion_messenger,
971 if let Some(om) = &onion_messenger {
972 om.get_om().process_pending_events(&event_handler)
973 },
974 peer_manager,
975 gossip_sync,
976 logger,
977 scorer,
978 stop_thread.load(Ordering::Acquire),
979 {
980 let sleeper = if let Some(om) = onion_messenger.as_ref() {
981 Sleeper::from_three_futures(
982 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
983 &chain_monitor.get_update_future(),
984 &om.get_om().get_update_future(),
985 )
986 } else {
987 Sleeper::from_two_futures(
988 &channel_manager.get_cm().get_event_or_persistence_needed_future(),
989 &chain_monitor.get_update_future(),
990 )
991 };
992 sleeper.wait_timeout(Duration::from_millis(100));
993 },
994 |_| Instant::now(),
995 |time: &Instant, dur| time.elapsed().as_secs() > dur,
996 false,
997 || {
998 use std::time::SystemTime;
999 Some(
1000 SystemTime::now()
1001 .duration_since(SystemTime::UNIX_EPOCH)
1002 .expect("Time should be sometime after 1970"),
1003 )
1004 },
1005 )
1006 });
1007 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
1008 }
1009
1010 pub fn join(mut self) -> Result<(), std::io::Error> {
1020 assert!(self.thread_handle.is_some());
1021 self.join_thread()
1022 }
1023
1024 pub fn stop(mut self) -> Result<(), std::io::Error> {
1034 assert!(self.thread_handle.is_some());
1035 self.stop_and_join_thread()
1036 }
1037
1038 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
1039 self.stop_thread.store(true, Ordering::Release);
1040 self.join_thread()
1041 }
1042
1043 fn join_thread(&mut self) -> Result<(), std::io::Error> {
1044 match self.thread_handle.take() {
1045 Some(handle) => handle.join().unwrap(),
1046 None => Ok(()),
1047 }
1048 }
1049}
1050
1051#[cfg(feature = "std")]
1052impl Drop for BackgroundProcessor {
1053 fn drop(&mut self) {
1054 self.stop_and_join_thread().unwrap();
1055 }
1056}
1057
1058#[cfg(all(feature = "std", test))]
1059mod tests {
1060 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
1061 use bitcoin::constants::{genesis_block, ChainHash};
1062 use bitcoin::hashes::Hash;
1063 use bitcoin::locktime::absolute::LockTime;
1064 use bitcoin::network::Network;
1065 use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
1066 use bitcoin::transaction::Version;
1067 use bitcoin::transaction::{Transaction, TxOut};
1068 use bitcoin::{Amount, ScriptBuf, Txid};
1069 use core::sync::atomic::{AtomicBool, Ordering};
1070 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1071 use lightning::chain::transaction::OutPoint;
1072 use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1073 use lightning::events::{
1074 Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent,
1075 };
1076 use lightning::ln::channelmanager;
1077 use lightning::ln::channelmanager::{
1078 ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA,
1079 };
1080 use lightning::ln::functional_test_utils::*;
1081 use lightning::ln::msgs::{ChannelMessageHandler, Init};
1082 use lightning::ln::peer_handler::{
1083 IgnoringMessageHandler, MessageHandler, PeerManager, SocketDescriptor,
1084 };
1085 use lightning::ln::types::ChannelId;
1086 use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
1087 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
1088 use lightning::routing::router::{CandidateRouteHop, DefaultRouter, Path, RouteHop};
1089 use lightning::routing::scoring::{ChannelUsage, LockableScore, ScoreLookUp, ScoreUpdate};
1090 use lightning::sign::{ChangeDestinationSource, InMemorySigner, KeysManager};
1091 use lightning::types::features::{ChannelFeatures, NodeFeatures};
1092 use lightning::types::payment::PaymentHash;
1093 use lightning::util::config::UserConfig;
1094 use lightning::util::persist::{
1095 KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1096 CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1097 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1098 SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1099 SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1100 };
1101 use lightning::util::ser::Writeable;
1102 use lightning::util::sweep::{OutputSpendStatus, OutputSweeper};
1103 use lightning::util::test_utils;
1104 use lightning::{get_event, get_event_msg};
1105 use lightning_persister::fs_store::FilesystemStore;
1106 use lightning_rapid_gossip_sync::RapidGossipSync;
1107 use std::collections::VecDeque;
1108 use std::path::PathBuf;
1109 use std::sync::mpsc::SyncSender;
1110 use std::sync::Arc;
1111 use std::time::Duration;
1112 use std::{env, fs};
1113
1114 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
1115
1116 #[derive(Clone, Hash, PartialEq, Eq)]
1117 struct TestDescriptor {}
1118 impl SocketDescriptor for TestDescriptor {
1119 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
1120 0
1121 }
1122
1123 fn disconnect_socket(&mut self) {}
1124 }
1125
1126 #[cfg(c_bindings)]
1127 type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
1128 #[cfg(not(c_bindings))]
1129 type LockingWrapper<T> = std::sync::Mutex<T>;
1130
1131 type ChannelManager = channelmanager::ChannelManager<
1132 Arc<ChainMonitor>,
1133 Arc<test_utils::TestBroadcaster>,
1134 Arc<KeysManager>,
1135 Arc<KeysManager>,
1136 Arc<KeysManager>,
1137 Arc<test_utils::TestFeeEstimator>,
1138 Arc<
1139 DefaultRouter<
1140 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1141 Arc<test_utils::TestLogger>,
1142 Arc<KeysManager>,
1143 Arc<LockingWrapper<TestScorer>>,
1144 (),
1145 TestScorer,
1146 >,
1147 >,
1148 Arc<
1149 DefaultMessageRouter<
1150 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1151 Arc<test_utils::TestLogger>,
1152 Arc<KeysManager>,
1153 >,
1154 >,
1155 Arc<test_utils::TestLogger>,
1156 >;
1157
1158 type ChainMonitor = chainmonitor::ChainMonitor<
1159 InMemorySigner,
1160 Arc<test_utils::TestChainSource>,
1161 Arc<test_utils::TestBroadcaster>,
1162 Arc<test_utils::TestFeeEstimator>,
1163 Arc<test_utils::TestLogger>,
1164 Arc<FilesystemStore>,
1165 >;
1166
1167 type PGS = Arc<
1168 P2PGossipSync<
1169 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1170 Arc<test_utils::TestChainSource>,
1171 Arc<test_utils::TestLogger>,
1172 >,
1173 >;
1174 type RGS = Arc<
1175 RapidGossipSync<
1176 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1177 Arc<test_utils::TestLogger>,
1178 >,
1179 >;
1180
1181 type OM = OnionMessenger<
1182 Arc<KeysManager>,
1183 Arc<KeysManager>,
1184 Arc<test_utils::TestLogger>,
1185 Arc<ChannelManager>,
1186 Arc<
1187 DefaultMessageRouter<
1188 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1189 Arc<test_utils::TestLogger>,
1190 Arc<KeysManager>,
1191 >,
1192 >,
1193 IgnoringMessageHandler,
1194 Arc<ChannelManager>,
1195 IgnoringMessageHandler,
1196 IgnoringMessageHandler,
1197 >;
1198
1199 struct Node {
1200 node: Arc<ChannelManager>,
1201 messenger: Arc<OM>,
1202 p2p_gossip_sync: PGS,
1203 rapid_gossip_sync: RGS,
1204 peer_manager: Arc<
1205 PeerManager<
1206 TestDescriptor,
1207 Arc<test_utils::TestChannelMessageHandler>,
1208 Arc<test_utils::TestRoutingMessageHandler>,
1209 Arc<OM>,
1210 Arc<test_utils::TestLogger>,
1211 IgnoringMessageHandler,
1212 Arc<KeysManager>,
1213 >,
1214 >,
1215 chain_monitor: Arc<ChainMonitor>,
1216 kv_store: Arc<FilesystemStore>,
1217 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1218 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1219 logger: Arc<test_utils::TestLogger>,
1220 best_block: BestBlock,
1221 scorer: Arc<LockingWrapper<TestScorer>>,
1222 sweeper: Arc<
1223 OutputSweeper<
1224 Arc<test_utils::TestBroadcaster>,
1225 Arc<TestWallet>,
1226 Arc<test_utils::TestFeeEstimator>,
1227 Arc<dyn Filter + Sync + Send>,
1228 Arc<FilesystemStore>,
1229 Arc<test_utils::TestLogger>,
1230 Arc<KeysManager>,
1231 >,
1232 >,
1233 }
1234
1235 impl Node {
1236 fn p2p_gossip_sync(
1237 &self,
1238 ) -> GossipSync<
1239 PGS,
1240 RGS,
1241 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1242 Arc<test_utils::TestChainSource>,
1243 Arc<test_utils::TestLogger>,
1244 > {
1245 GossipSync::P2P(self.p2p_gossip_sync.clone())
1246 }
1247
1248 fn rapid_gossip_sync(
1249 &self,
1250 ) -> GossipSync<
1251 PGS,
1252 RGS,
1253 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1254 Arc<test_utils::TestChainSource>,
1255 Arc<test_utils::TestLogger>,
1256 > {
1257 GossipSync::Rapid(self.rapid_gossip_sync.clone())
1258 }
1259
1260 fn no_gossip_sync(
1261 &self,
1262 ) -> GossipSync<
1263 PGS,
1264 RGS,
1265 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1266 Arc<test_utils::TestChainSource>,
1267 Arc<test_utils::TestLogger>,
1268 > {
1269 GossipSync::None
1270 }
1271 }
1272
1273 impl Drop for Node {
1274 fn drop(&mut self) {
1275 let data_dir = self.kv_store.get_data_dir();
1276 match fs::remove_dir_all(data_dir.clone()) {
1277 Err(e) => {
1278 println!("Failed to remove test store directory {}: {}", data_dir.display(), e)
1279 },
1280 _ => {},
1281 }
1282 }
1283 }
1284
1285 struct Persister {
1286 graph_error: Option<(std::io::ErrorKind, &'static str)>,
1287 graph_persistence_notifier: Option<SyncSender<()>>,
1288 manager_error: Option<(std::io::ErrorKind, &'static str)>,
1289 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
1290 kv_store: FilesystemStore,
1291 }
1292
1293 impl Persister {
1294 fn new(data_dir: PathBuf) -> Self {
1295 let kv_store = FilesystemStore::new(data_dir);
1296 Self {
1297 graph_error: None,
1298 graph_persistence_notifier: None,
1299 manager_error: None,
1300 scorer_error: None,
1301 kv_store,
1302 }
1303 }
1304
1305 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1306 Self { graph_error: Some((error, message)), ..self }
1307 }
1308
1309 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
1310 Self { graph_persistence_notifier: Some(sender), ..self }
1311 }
1312
1313 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1314 Self { manager_error: Some((error, message)), ..self }
1315 }
1316
1317 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1318 Self { scorer_error: Some((error, message)), ..self }
1319 }
1320 }
1321
1322 impl KVStore for Persister {
1323 fn read(
1324 &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1325 ) -> lightning::io::Result<Vec<u8>> {
1326 self.kv_store.read(primary_namespace, secondary_namespace, key)
1327 }
1328
1329 fn write(
1330 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1331 ) -> lightning::io::Result<()> {
1332 if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
1333 && secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
1334 && key == CHANNEL_MANAGER_PERSISTENCE_KEY
1335 {
1336 if let Some((error, message)) = self.manager_error {
1337 return Err(std::io::Error::new(error, message).into());
1338 }
1339 }
1340
1341 if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE
1342 && secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE
1343 && key == NETWORK_GRAPH_PERSISTENCE_KEY
1344 {
1345 if let Some(sender) = &self.graph_persistence_notifier {
1346 match sender.send(()) {
1347 Ok(()) => {},
1348 Err(std::sync::mpsc::SendError(())) => {
1349 println!("Persister failed to notify as receiver went away.")
1350 },
1351 }
1352 };
1353
1354 if let Some((error, message)) = self.graph_error {
1355 return Err(std::io::Error::new(error, message).into());
1356 }
1357 }
1358
1359 if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE
1360 && secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE
1361 && key == SCORER_PERSISTENCE_KEY
1362 {
1363 if let Some((error, message)) = self.scorer_error {
1364 return Err(std::io::Error::new(error, message).into());
1365 }
1366 }
1367
1368 self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
1369 }
1370
1371 fn remove(
1372 &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1373 ) -> lightning::io::Result<()> {
1374 self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
1375 }
1376
1377 fn list(
1378 &self, primary_namespace: &str, secondary_namespace: &str,
1379 ) -> lightning::io::Result<Vec<String>> {
1380 self.kv_store.list(primary_namespace, secondary_namespace)
1381 }
1382 }
1383
1384 struct TestScorer {
1385 event_expectations: Option<VecDeque<TestResult>>,
1386 }
1387
1388 #[derive(Debug)]
1389 enum TestResult {
1390 PaymentFailure { path: Path, short_channel_id: u64 },
1391 PaymentSuccess { path: Path },
1392 ProbeFailure { path: Path },
1393 ProbeSuccess { path: Path },
1394 }
1395
1396 impl TestScorer {
1397 fn new() -> Self {
1398 Self { event_expectations: None }
1399 }
1400
1401 fn expect(&mut self, expectation: TestResult) {
1402 self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1403 }
1404 }
1405
1406 impl lightning::util::ser::Writeable for TestScorer {
1407 fn write<W: lightning::util::ser::Writer>(
1408 &self, _: &mut W,
1409 ) -> Result<(), lightning::io::Error> {
1410 Ok(())
1411 }
1412 }
1413
1414 impl ScoreLookUp for TestScorer {
1415 type ScoreParams = ();
1416 fn channel_penalty_msat(
1417 &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage,
1418 _score_params: &Self::ScoreParams,
1419 ) -> u64 {
1420 unimplemented!();
1421 }
1422 }
1423
1424 impl ScoreUpdate for TestScorer {
1425 fn payment_path_failed(
1426 &mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration,
1427 ) {
1428 if let Some(expectations) = &mut self.event_expectations {
1429 match expectations.pop_front().unwrap() {
1430 TestResult::PaymentFailure { path, short_channel_id } => {
1431 assert_eq!(actual_path, &path);
1432 assert_eq!(actual_short_channel_id, short_channel_id);
1433 },
1434 TestResult::PaymentSuccess { path } => {
1435 panic!("Unexpected successful payment path: {:?}", path)
1436 },
1437 TestResult::ProbeFailure { path } => {
1438 panic!("Unexpected probe failure: {:?}", path)
1439 },
1440 TestResult::ProbeSuccess { path } => {
1441 panic!("Unexpected probe success: {:?}", path)
1442 },
1443 }
1444 }
1445 }
1446
1447 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
1448 if let Some(expectations) = &mut self.event_expectations {
1449 match expectations.pop_front().unwrap() {
1450 TestResult::PaymentFailure { path, .. } => {
1451 panic!("Unexpected payment path failure: {:?}", path)
1452 },
1453 TestResult::PaymentSuccess { path } => {
1454 assert_eq!(actual_path, &path);
1455 },
1456 TestResult::ProbeFailure { path } => {
1457 panic!("Unexpected probe failure: {:?}", path)
1458 },
1459 TestResult::ProbeSuccess { path } => {
1460 panic!("Unexpected probe success: {:?}", path)
1461 },
1462 }
1463 }
1464 }
1465
1466 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
1467 if let Some(expectations) = &mut self.event_expectations {
1468 match expectations.pop_front().unwrap() {
1469 TestResult::PaymentFailure { path, .. } => {
1470 panic!("Unexpected payment path failure: {:?}", path)
1471 },
1472 TestResult::PaymentSuccess { path } => {
1473 panic!("Unexpected payment path success: {:?}", path)
1474 },
1475 TestResult::ProbeFailure { path } => {
1476 assert_eq!(actual_path, &path);
1477 },
1478 TestResult::ProbeSuccess { path } => {
1479 panic!("Unexpected probe success: {:?}", path)
1480 },
1481 }
1482 }
1483 }
1484 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
1485 if let Some(expectations) = &mut self.event_expectations {
1486 match expectations.pop_front().unwrap() {
1487 TestResult::PaymentFailure { path, .. } => {
1488 panic!("Unexpected payment path failure: {:?}", path)
1489 },
1490 TestResult::PaymentSuccess { path } => {
1491 panic!("Unexpected payment path success: {:?}", path)
1492 },
1493 TestResult::ProbeFailure { path } => {
1494 panic!("Unexpected probe failure: {:?}", path)
1495 },
1496 TestResult::ProbeSuccess { path } => {
1497 assert_eq!(actual_path, &path);
1498 },
1499 }
1500 }
1501 }
1502 fn time_passed(&mut self, _: Duration) {}
1503 }
1504
1505 #[cfg(c_bindings)]
1506 impl lightning::routing::scoring::Score for TestScorer {}
1507
1508 impl Drop for TestScorer {
1509 fn drop(&mut self) {
1510 if std::thread::panicking() {
1511 return;
1512 }
1513
1514 if let Some(event_expectations) = &self.event_expectations {
1515 if !event_expectations.is_empty() {
1516 panic!("Unsatisfied event expectations: {:?}", event_expectations);
1517 }
1518 }
1519 }
1520 }
1521
1522 struct TestWallet {}
1523
1524 impl ChangeDestinationSource for TestWallet {
1525 fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
1526 Ok(ScriptBuf::new())
1527 }
1528 }
1529
1530 fn get_full_filepath(filepath: String, filename: String) -> String {
1531 let mut path = PathBuf::from(filepath);
1532 path.push(filename);
1533 path.to_str().unwrap().to_string()
1534 }
1535
1536 fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1537 let persist_temp_path = env::temp_dir().join(persist_dir);
1538 let persist_dir = persist_temp_path.to_string_lossy().to_string();
1539 let network = Network::Bitcoin;
1540 let mut nodes = Vec::new();
1541 for i in 0..num_nodes {
1542 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1543 let fee_estimator = Arc::new(test_utils::TestFeeEstimator::new(253));
1544 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1545 let genesis_block = genesis_block(network);
1546 let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1547 let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
1548 let now = Duration::from_secs(genesis_block.header.time as u64);
1549 let seed = [i as u8; 32];
1550 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1551 let router = Arc::new(DefaultRouter::new(
1552 network_graph.clone(),
1553 logger.clone(),
1554 Arc::clone(&keys_manager),
1555 scorer.clone(),
1556 Default::default(),
1557 ));
1558 let msg_router = Arc::new(DefaultMessageRouter::new(
1559 network_graph.clone(),
1560 Arc::clone(&keys_manager),
1561 ));
1562 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1563 let kv_store =
1564 Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1565 let now = Duration::from_secs(genesis_block.header.time as u64);
1566 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1567 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
1568 Some(chain_source.clone()),
1569 tx_broadcaster.clone(),
1570 logger.clone(),
1571 fee_estimator.clone(),
1572 kv_store.clone(),
1573 ));
1574 let best_block = BestBlock::from_network(network);
1575 let params = ChainParameters { network, best_block };
1576 let manager = Arc::new(ChannelManager::new(
1577 fee_estimator.clone(),
1578 chain_monitor.clone(),
1579 tx_broadcaster.clone(),
1580 router.clone(),
1581 msg_router.clone(),
1582 logger.clone(),
1583 keys_manager.clone(),
1584 keys_manager.clone(),
1585 keys_manager.clone(),
1586 UserConfig::default(),
1587 params,
1588 genesis_block.header.time,
1589 ));
1590 let messenger = Arc::new(OnionMessenger::new(
1591 keys_manager.clone(),
1592 keys_manager.clone(),
1593 logger.clone(),
1594 manager.clone(),
1595 msg_router.clone(),
1596 IgnoringMessageHandler {},
1597 manager.clone(),
1598 IgnoringMessageHandler {},
1599 IgnoringMessageHandler {},
1600 ));
1601 let wallet = Arc::new(TestWallet {});
1602 let sweeper = Arc::new(OutputSweeper::new(
1603 best_block,
1604 Arc::clone(&tx_broadcaster),
1605 Arc::clone(&fee_estimator),
1606 None::<Arc<dyn Filter + Sync + Send>>,
1607 Arc::clone(&keys_manager),
1608 wallet,
1609 Arc::clone(&kv_store),
1610 Arc::clone(&logger),
1611 ));
1612 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(
1613 network_graph.clone(),
1614 Some(chain_source.clone()),
1615 logger.clone(),
1616 ));
1617 let rapid_gossip_sync =
1618 Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1619 let msg_handler = MessageHandler {
1620 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(
1621 ChainHash::using_genesis_block(Network::Testnet),
1622 )),
1623 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1624 onion_message_handler: messenger.clone(),
1625 custom_message_handler: IgnoringMessageHandler {},
1626 };
1627 let peer_manager = Arc::new(PeerManager::new(
1628 msg_handler,
1629 0,
1630 &seed,
1631 logger.clone(),
1632 keys_manager.clone(),
1633 ));
1634 let node = Node {
1635 node: manager,
1636 p2p_gossip_sync,
1637 rapid_gossip_sync,
1638 peer_manager,
1639 chain_monitor,
1640 kv_store,
1641 tx_broadcaster,
1642 network_graph,
1643 logger,
1644 best_block,
1645 scorer,
1646 sweeper,
1647 messenger,
1648 };
1649 nodes.push(node);
1650 }
1651
1652 for i in 0..num_nodes {
1653 for j in (i + 1)..num_nodes {
1654 let init_i = Init {
1655 features: nodes[j].node.init_features(),
1656 networks: None,
1657 remote_network_address: None,
1658 };
1659 nodes[i]
1660 .node
1661 .peer_connected(nodes[j].node.get_our_node_id(), &init_i, true)
1662 .unwrap();
1663 let init_j = Init {
1664 features: nodes[i].node.init_features(),
1665 networks: None,
1666 remote_network_address: None,
1667 };
1668 nodes[j]
1669 .node
1670 .peer_connected(nodes[i].node.get_our_node_id(), &init_j, false)
1671 .unwrap();
1672 }
1673 }
1674
1675 (persist_dir, nodes)
1676 }
1677
1678 macro_rules! open_channel {
1679 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1680 begin_open_channel!($node_a, $node_b, $channel_value);
1681 let events = $node_a.node.get_and_clear_pending_events();
1682 assert_eq!(events.len(), 1);
1683 let (temporary_channel_id, tx) =
1684 handle_funding_generation_ready!(events[0], $channel_value);
1685 $node_a
1686 .node
1687 .funding_transaction_generated(
1688 temporary_channel_id,
1689 $node_b.node.get_our_node_id(),
1690 tx.clone(),
1691 )
1692 .unwrap();
1693 let msg_a = get_event_msg!(
1694 $node_a,
1695 MessageSendEvent::SendFundingCreated,
1696 $node_b.node.get_our_node_id()
1697 );
1698 $node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
1699 get_event!($node_b, Event::ChannelPending);
1700 let msg_b = get_event_msg!(
1701 $node_b,
1702 MessageSendEvent::SendFundingSigned,
1703 $node_a.node.get_our_node_id()
1704 );
1705 $node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
1706 get_event!($node_a, Event::ChannelPending);
1707 tx
1708 }};
1709 }
1710
1711 macro_rules! begin_open_channel {
1712 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1713 $node_a
1714 .node
1715 .create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None)
1716 .unwrap();
1717 let msg_a = get_event_msg!(
1718 $node_a,
1719 MessageSendEvent::SendOpenChannel,
1720 $node_b.node.get_our_node_id()
1721 );
1722 $node_b.node.handle_open_channel($node_a.node.get_our_node_id(), &msg_a);
1723 let msg_b = get_event_msg!(
1724 $node_b,
1725 MessageSendEvent::SendAcceptChannel,
1726 $node_a.node.get_our_node_id()
1727 );
1728 $node_a.node.handle_accept_channel($node_b.node.get_our_node_id(), &msg_b);
1729 }};
1730 }
1731
1732 macro_rules! handle_funding_generation_ready {
1733 ($event: expr, $channel_value: expr) => {{
1734 match $event {
1735 Event::FundingGenerationReady {
1736 temporary_channel_id,
1737 channel_value_satoshis,
1738 ref output_script,
1739 user_channel_id,
1740 ..
1741 } => {
1742 assert_eq!(channel_value_satoshis, $channel_value);
1743 assert_eq!(user_channel_id, 42);
1744
1745 let tx = Transaction {
1746 version: Version::ONE,
1747 lock_time: LockTime::ZERO,
1748 input: Vec::new(),
1749 output: vec![TxOut {
1750 value: Amount::from_sat(channel_value_satoshis),
1751 script_pubkey: output_script.clone(),
1752 }],
1753 };
1754 (temporary_channel_id, tx)
1755 },
1756 _ => panic!("Unexpected event"),
1757 }
1758 }};
1759 }
1760
1761 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1762 for i in 1..=depth {
1763 let prev_blockhash = node.best_block.block_hash;
1764 let height = node.best_block.height + 1;
1765 let header = create_dummy_header(prev_blockhash, height);
1766 let txdata = vec![(0, tx)];
1767 node.best_block = BestBlock::new(header.block_hash(), height);
1768 match i {
1769 1 => {
1770 node.node.transactions_confirmed(&header, &txdata, height);
1771 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1772 node.sweeper.transactions_confirmed(&header, &txdata, height);
1773 },
1774 x if x == depth => {
1775 let block = (genesis_block(Network::Bitcoin), height);
1779 node.tx_broadcaster.blocks.lock().unwrap().push(block);
1780 node.node.best_block_updated(&header, height);
1781 node.chain_monitor.best_block_updated(&header, height);
1782 node.sweeper.best_block_updated(&header, height);
1783 },
1784 _ => {},
1785 }
1786 }
1787 }
1788
1789 fn advance_chain(node: &mut Node, num_blocks: u32) {
1790 for i in 1..=num_blocks {
1791 let prev_blockhash = node.best_block.block_hash;
1792 let height = node.best_block.height + 1;
1793 let header = create_dummy_header(prev_blockhash, height);
1794 node.best_block = BestBlock::new(header.block_hash(), height);
1795 if i == num_blocks {
1796 let block = (genesis_block(Network::Bitcoin), height);
1800 node.tx_broadcaster.blocks.lock().unwrap().push(block);
1801 node.node.best_block_updated(&header, height);
1802 node.chain_monitor.best_block_updated(&header, height);
1803 node.sweeper.best_block_updated(&header, height);
1804 }
1805 }
1806 }
1807
1808 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1809 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1810 }
1811
1812 #[test]
1813 fn test_background_processor() {
1814 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1818
1819 let tx = open_channel!(nodes[0], nodes[1], 100000);
1823
1824 let data_dir = nodes[0].kv_store.get_data_dir();
1826 let persister = Arc::new(Persister::new(data_dir));
1827 let event_handler = |_: _| Ok(());
1828 let bg_processor = BackgroundProcessor::start(
1829 persister,
1830 event_handler,
1831 nodes[0].chain_monitor.clone(),
1832 nodes[0].node.clone(),
1833 Some(nodes[0].messenger.clone()),
1834 nodes[0].p2p_gossip_sync(),
1835 nodes[0].peer_manager.clone(),
1836 nodes[0].logger.clone(),
1837 Some(nodes[0].scorer.clone()),
1838 );
1839
1840 macro_rules! check_persisted_data {
1841 ($node: expr, $filepath: expr) => {
1842 let mut expected_bytes = Vec::new();
1843 loop {
1844 expected_bytes.clear();
1845 match $node.write(&mut expected_bytes) {
1846 Ok(()) => match std::fs::read($filepath) {
1847 Ok(bytes) => {
1848 if bytes == expected_bytes {
1849 break;
1850 } else {
1851 continue;
1852 }
1853 },
1854 Err(_) => continue,
1855 },
1856 Err(e) => panic!("Unexpected error: {}", e),
1857 }
1858 }
1859 };
1860 }
1861
1862 let filepath =
1864 get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1865 check_persisted_data!(nodes[0].node, filepath.clone());
1866
1867 loop {
1868 if !nodes[0].node.get_event_or_persist_condvar_value() {
1869 break;
1870 }
1871 }
1872
1873 let error_message = "Channel force-closed";
1875 nodes[0]
1876 .node
1877 .force_close_broadcasting_latest_txn(
1878 &ChannelId::v1_from_funding_outpoint(OutPoint {
1879 txid: tx.compute_txid(),
1880 index: 0,
1881 }),
1882 &nodes[1].node.get_our_node_id(),
1883 error_message.to_string(),
1884 )
1885 .unwrap();
1886
1887 check_persisted_data!(nodes[0].node, filepath.clone());
1889 loop {
1890 if !nodes[0].node.get_event_or_persist_condvar_value() {
1891 break;
1892 }
1893 }
1894
1895 let filepath =
1897 get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1898 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1899
1900 let filepath =
1902 get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1903 check_persisted_data!(nodes[0].scorer, filepath.clone());
1904
1905 if !std::thread::panicking() {
1906 bg_processor.stop().unwrap();
1907 }
1908 }
1909
1910 #[test]
1911 fn test_timer_tick_called() {
1912 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1918 let data_dir = nodes[0].kv_store.get_data_dir();
1919 let persister = Arc::new(Persister::new(data_dir));
1920 let event_handler = |_: _| Ok(());
1921 let bg_processor = BackgroundProcessor::start(
1922 persister,
1923 event_handler,
1924 nodes[0].chain_monitor.clone(),
1925 nodes[0].node.clone(),
1926 Some(nodes[0].messenger.clone()),
1927 nodes[0].no_gossip_sync(),
1928 nodes[0].peer_manager.clone(),
1929 nodes[0].logger.clone(),
1930 Some(nodes[0].scorer.clone()),
1931 );
1932 loop {
1933 let log_entries = nodes[0].logger.lines.lock().unwrap();
1934 let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1935 let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1936 let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1937 let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
1938 if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some()
1939 && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some()
1940 && log_entries.get(&("lightning_background_processor", desired_log_3)).is_some()
1941 && log_entries.get(&("lightning_background_processor", desired_log_4)).is_some()
1942 {
1943 break;
1944 }
1945 }
1946
1947 if !std::thread::panicking() {
1948 bg_processor.stop().unwrap();
1949 }
1950 }
1951
1952 #[test]
1953 fn test_channel_manager_persist_error() {
1954 let (_, nodes) = create_nodes(2, "test_persist_error");
1956 open_channel!(nodes[0], nodes[1], 100000);
1957
1958 let data_dir = nodes[0].kv_store.get_data_dir();
1959 let persister = Arc::new(
1960 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
1961 );
1962 let event_handler = |_: _| Ok(());
1963 let bg_processor = BackgroundProcessor::start(
1964 persister,
1965 event_handler,
1966 nodes[0].chain_monitor.clone(),
1967 nodes[0].node.clone(),
1968 Some(nodes[0].messenger.clone()),
1969 nodes[0].no_gossip_sync(),
1970 nodes[0].peer_manager.clone(),
1971 nodes[0].logger.clone(),
1972 Some(nodes[0].scorer.clone()),
1973 );
1974 match bg_processor.join() {
1975 Ok(_) => panic!("Expected error persisting manager"),
1976 Err(e) => {
1977 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1978 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1979 },
1980 }
1981 }
1982
1983 #[tokio::test]
1984 #[cfg(feature = "futures")]
1985 async fn test_channel_manager_persist_error_async() {
1986 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1988 open_channel!(nodes[0], nodes[1], 100000);
1989
1990 let data_dir = nodes[0].kv_store.get_data_dir();
1991 let persister = Arc::new(
1992 Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
1993 );
1994
1995 let bp_future = super::process_events_async(
1996 persister,
1997 |_: _| async { Ok(()) },
1998 nodes[0].chain_monitor.clone(),
1999 nodes[0].node.clone(),
2000 Some(nodes[0].messenger.clone()),
2001 nodes[0].rapid_gossip_sync(),
2002 nodes[0].peer_manager.clone(),
2003 nodes[0].logger.clone(),
2004 Some(nodes[0].scorer.clone()),
2005 move |dur: Duration| {
2006 Box::pin(async move {
2007 tokio::time::sleep(dur).await;
2008 false })
2010 },
2011 false,
2012 || Some(Duration::ZERO),
2013 );
2014 match bp_future.await {
2015 Ok(_) => panic!("Expected error persisting manager"),
2016 Err(e) => {
2017 assert_eq!(e.kind(), lightning::io::ErrorKind::Other);
2018 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2019 },
2020 }
2021 }
2022
2023 #[test]
2024 fn test_network_graph_persist_error() {
2025 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
2027 let data_dir = nodes[0].kv_store.get_data_dir();
2028 let persister =
2029 Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
2030 let event_handler = |_: _| Ok(());
2031 let bg_processor = BackgroundProcessor::start(
2032 persister,
2033 event_handler,
2034 nodes[0].chain_monitor.clone(),
2035 nodes[0].node.clone(),
2036 Some(nodes[0].messenger.clone()),
2037 nodes[0].p2p_gossip_sync(),
2038 nodes[0].peer_manager.clone(),
2039 nodes[0].logger.clone(),
2040 Some(nodes[0].scorer.clone()),
2041 );
2042
2043 match bg_processor.stop() {
2044 Ok(_) => panic!("Expected error persisting network graph"),
2045 Err(e) => {
2046 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2047 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2048 },
2049 }
2050 }
2051
2052 #[test]
2053 fn test_scorer_persist_error() {
2054 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
2056 let data_dir = nodes[0].kv_store.get_data_dir();
2057 let persister =
2058 Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
2059 let event_handler = |_: _| Ok(());
2060 let bg_processor = BackgroundProcessor::start(
2061 persister,
2062 event_handler,
2063 nodes[0].chain_monitor.clone(),
2064 nodes[0].node.clone(),
2065 Some(nodes[0].messenger.clone()),
2066 nodes[0].no_gossip_sync(),
2067 nodes[0].peer_manager.clone(),
2068 nodes[0].logger.clone(),
2069 Some(nodes[0].scorer.clone()),
2070 );
2071
2072 match bg_processor.stop() {
2073 Ok(_) => panic!("Expected error persisting scorer"),
2074 Err(e) => {
2075 assert_eq!(e.kind(), std::io::ErrorKind::Other);
2076 assert_eq!(e.get_ref().unwrap().to_string(), "test");
2077 },
2078 }
2079 }
2080
2081 #[test]
2082 fn test_background_event_handling() {
2083 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
2084 let node_0_id = nodes[0].node.get_our_node_id();
2085 let node_1_id = nodes[1].node.get_our_node_id();
2086
2087 let channel_value = 100000;
2088 let data_dir = nodes[0].kv_store.get_data_dir();
2089 let persister = Arc::new(Persister::new(data_dir.clone()));
2090
2091 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
2093 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
2094 let event_handler = move |event: Event| {
2095 match event {
2096 Event::FundingGenerationReady { .. } => funding_generation_send
2097 .send(handle_funding_generation_ready!(event, channel_value))
2098 .unwrap(),
2099 Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
2100 Event::ChannelReady { .. } => {},
2101 _ => panic!("Unexpected event: {:?}", event),
2102 }
2103 Ok(())
2104 };
2105
2106 let bg_processor = BackgroundProcessor::start(
2107 persister,
2108 event_handler,
2109 nodes[0].chain_monitor.clone(),
2110 nodes[0].node.clone(),
2111 Some(nodes[0].messenger.clone()),
2112 nodes[0].no_gossip_sync(),
2113 nodes[0].peer_manager.clone(),
2114 nodes[0].logger.clone(),
2115 Some(nodes[0].scorer.clone()),
2116 );
2117
2118 begin_open_channel!(nodes[0], nodes[1], channel_value);
2120 let (temporary_channel_id, funding_tx) = funding_generation_recv
2121 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2122 .expect("FundingGenerationReady not handled within deadline");
2123 nodes[0]
2124 .node
2125 .funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
2126 .unwrap();
2127 let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
2128 nodes[1].node.handle_funding_created(node_0_id, &msg_0);
2129 get_event!(nodes[1], Event::ChannelPending);
2130 let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
2131 nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
2132 let _ = channel_pending_recv
2133 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2134 .expect("ChannelPending not handled within deadline");
2135
2136 confirm_transaction(&mut nodes[0], &funding_tx);
2138 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, node_1_id);
2139 confirm_transaction(&mut nodes[1], &funding_tx);
2140 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, node_0_id);
2141 nodes[0].node.handle_channel_ready(node_1_id, &bs_funding);
2142 let _as_channel_update =
2143 get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, node_1_id);
2144 nodes[1].node.handle_channel_ready(node_0_id, &as_funding);
2145 let _bs_channel_update =
2146 get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, node_0_id);
2147 let broadcast_funding =
2148 nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2149 assert_eq!(broadcast_funding.compute_txid(), funding_tx.compute_txid());
2150 assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
2151
2152 if !std::thread::panicking() {
2153 bg_processor.stop().unwrap();
2154 }
2155
2156 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2158 let event_handler = move |event: Event| {
2159 match event {
2160 Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
2161 Event::ChannelReady { .. } => {},
2162 Event::ChannelClosed { .. } => {},
2163 _ => panic!("Unexpected event: {:?}", event),
2164 }
2165 Ok(())
2166 };
2167 let persister = Arc::new(Persister::new(data_dir));
2168 let bg_processor = BackgroundProcessor::start(
2169 persister,
2170 event_handler,
2171 nodes[0].chain_monitor.clone(),
2172 nodes[0].node.clone(),
2173 Some(nodes[0].messenger.clone()),
2174 nodes[0].no_gossip_sync(),
2175 nodes[0].peer_manager.clone(),
2176 nodes[0].logger.clone(),
2177 Some(nodes[0].scorer.clone()),
2178 );
2179
2180 let error_message = "Channel force-closed";
2182 nodes[0]
2183 .node
2184 .force_close_broadcasting_latest_txn(
2185 &nodes[0].node.list_channels()[0].channel_id,
2186 &node_1_id,
2187 error_message.to_string(),
2188 )
2189 .unwrap();
2190 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2191 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
2192
2193 let event = receiver
2194 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2195 .expect("Events not handled within deadline");
2196 match event {
2197 Event::SpendableOutputs { outputs, channel_id } => {
2198 nodes[0]
2199 .sweeper
2200 .track_spendable_outputs(outputs, channel_id, false, Some(153))
2201 .unwrap();
2202 },
2203 _ => panic!("Unexpected event: {:?}", event),
2204 }
2205
2206 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2208 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2209 if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
2210 assert!(!tracked_output.is_spent_in(&sweep_tx_0));
2211 match tracked_output.status {
2212 OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
2213 assert_eq!(delayed_until_height, Some(153));
2214 },
2215 _ => panic!("Unexpected status"),
2216 }
2217 }
2218
2219 advance_chain(&mut nodes[0], 3);
2220
2221 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2223 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2224 let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2225 match tracked_output.status {
2226 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
2227 assert_eq!(sweep_tx_0.compute_txid(), latest_spending_tx.compute_txid());
2228 },
2229 _ => panic!("Unexpected status"),
2230 }
2231
2232 advance_chain(&mut nodes[0], 1);
2234 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2235 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2236 let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2237 match tracked_output.status {
2238 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
2239 assert_eq!(sweep_tx_1.compute_txid(), latest_spending_tx.compute_txid());
2240 },
2241 _ => panic!("Unexpected status"),
2242 }
2243 assert_ne!(sweep_tx_0, sweep_tx_1);
2244
2245 advance_chain(&mut nodes[0], 1);
2246 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2247 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2248 let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
2249 match tracked_output.status {
2250 OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
2251 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
2252 },
2253 _ => panic!("Unexpected status"),
2254 }
2255 assert_ne!(sweep_tx_0, sweep_tx_2);
2256 assert_ne!(sweep_tx_1, sweep_tx_2);
2257
2258 confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
2260 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2261 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2262 match tracked_output.status {
2263 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
2264 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
2265 },
2266 _ => panic!("Unexpected status"),
2267 }
2268
2269 let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
2273 nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
2274
2275 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
2276 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
2277 match tracked_output.status {
2278 OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
2279 assert_eq!(sweep_tx_2.compute_txid(), latest_spending_tx.compute_txid());
2280 },
2281 _ => panic!("Unexpected status"),
2282 }
2283
2284 confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY);
2287 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
2288
2289 if !std::thread::panicking() {
2290 bg_processor.stop().unwrap();
2291 }
2292 }
2293
2294 #[test]
2295 fn test_event_handling_failures_are_replayed() {
2296 let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed");
2297 let channel_value = 100000;
2298 let data_dir = nodes[0].kv_store.get_data_dir();
2299 let persister = Arc::new(Persister::new(data_dir.clone()));
2300
2301 let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1);
2302 let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1);
2303 let should_fail_event_handling = Arc::new(AtomicBool::new(true));
2304 let event_handler = move |event: Event| {
2305 if let Ok(true) = should_fail_event_handling.compare_exchange(
2306 true,
2307 false,
2308 Ordering::Acquire,
2309 Ordering::Relaxed,
2310 ) {
2311 first_event_send.send(event).unwrap();
2312 return Err(ReplayEvent());
2313 }
2314
2315 second_event_send.send(event).unwrap();
2316 Ok(())
2317 };
2318
2319 let bg_processor = BackgroundProcessor::start(
2320 persister,
2321 event_handler,
2322 nodes[0].chain_monitor.clone(),
2323 nodes[0].node.clone(),
2324 Some(nodes[0].messenger.clone()),
2325 nodes[0].no_gossip_sync(),
2326 nodes[0].peer_manager.clone(),
2327 nodes[0].logger.clone(),
2328 Some(nodes[0].scorer.clone()),
2329 );
2330
2331 begin_open_channel!(nodes[0], nodes[1], channel_value);
2332 assert_eq!(
2333 first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap(),
2334 second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)).unwrap()
2335 );
2336
2337 if !std::thread::panicking() {
2338 bg_processor.stop().unwrap();
2339 }
2340 }
2341
2342 #[test]
2343 fn test_scorer_persistence() {
2344 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
2345 let data_dir = nodes[0].kv_store.get_data_dir();
2346 let persister = Arc::new(Persister::new(data_dir));
2347 let event_handler = |_: _| Ok(());
2348 let bg_processor = BackgroundProcessor::start(
2349 persister,
2350 event_handler,
2351 nodes[0].chain_monitor.clone(),
2352 nodes[0].node.clone(),
2353 Some(nodes[0].messenger.clone()),
2354 nodes[0].no_gossip_sync(),
2355 nodes[0].peer_manager.clone(),
2356 nodes[0].logger.clone(),
2357 Some(nodes[0].scorer.clone()),
2358 );
2359
2360 loop {
2361 let log_entries = nodes[0].logger.lines.lock().unwrap();
2362 let expected_log = "Calling time_passed and persisting scorer".to_string();
2363 if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
2364 break;
2365 }
2366 }
2367
2368 if !std::thread::panicking() {
2369 bg_processor.stop().unwrap();
2370 }
2371 }
2372
2373 macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
2374 ($nodes: expr, $receive: expr, $sleep: expr) => {
2375 let features = ChannelFeatures::empty();
2376 $nodes[0]
2377 .network_graph
2378 .add_channel_from_partial_announcement(
2379 42,
2380 53,
2381 features,
2382 $nodes[0].node.get_our_node_id(),
2383 $nodes[1].node.get_our_node_id(),
2384 )
2385 .expect("Failed to update channel from partial announcement");
2386 let original_graph_description = $nodes[0].network_graph.to_string();
2387 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
2388 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
2389
2390 loop {
2391 $sleep;
2392 let log_entries = $nodes[0].logger.lines.lock().unwrap();
2393 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
2394 if *log_entries.get(&("lightning_background_processor", loop_counter)).unwrap_or(&0)
2395 > 1
2396 {
2397 break;
2399 }
2400 }
2401
2402 let initialization_input = vec![
2403 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99,
2404 247, 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227,
2405 98, 218, 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61,
2406 250, 251, 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6,
2407 67, 2, 36, 125, 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47,
2408 115, 172, 63, 136, 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158,
2409 1, 242, 121, 152, 106, 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95,
2410 65, 3, 83, 185, 58, 138, 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136,
2411 149, 185, 226, 156, 137, 175, 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219,
2412 175, 168, 77, 4, 143, 38, 128, 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2,
2413 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0,
2414 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255,
2415 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6,
2416 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
2417 ];
2418 $nodes[0]
2419 .rapid_gossip_sync
2420 .update_network_graph_no_std(&initialization_input[..], Some(1642291930))
2421 .unwrap();
2422
2423 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
2425
2426 $receive.expect("Network graph not pruned within deadline");
2427
2428 assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
2430 };
2431 }
2432
2433 #[test]
2434 fn test_not_pruning_network_graph_until_graph_sync_completion() {
2435 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2436
2437 let (_, nodes) =
2438 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
2439 let data_dir = nodes[0].kv_store.get_data_dir();
2440 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2441
2442 let event_handler = |_: _| Ok(());
2443 let background_processor = BackgroundProcessor::start(
2444 persister,
2445 event_handler,
2446 nodes[0].chain_monitor.clone(),
2447 nodes[0].node.clone(),
2448 Some(nodes[0].messenger.clone()),
2449 nodes[0].rapid_gossip_sync(),
2450 nodes[0].peer_manager.clone(),
2451 nodes[0].logger.clone(),
2452 Some(nodes[0].scorer.clone()),
2453 );
2454
2455 do_test_not_pruning_network_graph_until_graph_sync_completion!(
2456 nodes,
2457 receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
2458 std::thread::sleep(Duration::from_millis(1))
2459 );
2460
2461 background_processor.stop().unwrap();
2462 }
2463
2464 #[tokio::test]
2465 #[cfg(feature = "futures")]
2466 async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
2467 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2468
2469 let (_, nodes) =
2470 create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
2471 let data_dir = nodes[0].kv_store.get_data_dir();
2472 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
2473
2474 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2475 let bp_future = super::process_events_async(
2476 persister,
2477 |_: _| async { Ok(()) },
2478 nodes[0].chain_monitor.clone(),
2479 nodes[0].node.clone(),
2480 Some(nodes[0].messenger.clone()),
2481 nodes[0].rapid_gossip_sync(),
2482 nodes[0].peer_manager.clone(),
2483 nodes[0].logger.clone(),
2484 Some(nodes[0].scorer.clone()),
2485 move |dur: Duration| {
2486 let mut exit_receiver = exit_receiver.clone();
2487 Box::pin(async move {
2488 tokio::select! {
2489 _ = tokio::time::sleep(dur) => false,
2490 _ = exit_receiver.changed() => true,
2491 }
2492 })
2493 },
2494 false,
2495 || Some(Duration::from_secs(1696300000)),
2496 );
2497
2498 let t1 = tokio::spawn(bp_future);
2499 let t2 = tokio::spawn(async move {
2500 do_test_not_pruning_network_graph_until_graph_sync_completion!(
2501 nodes,
2502 {
2503 let mut i = 0;
2504 loop {
2505 tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER))
2506 .await;
2507 if let Ok(()) = receiver.try_recv() {
2508 break Ok::<(), ()>(());
2509 }
2510 assert!(i < 5);
2511 i += 1;
2512 }
2513 },
2514 tokio::time::sleep(Duration::from_millis(1)).await
2515 );
2516 exit_sender.send(()).unwrap();
2517 });
2518 let (r1, r2) = tokio::join!(t1, t2);
2519 r1.unwrap().unwrap();
2520 r2.unwrap()
2521 }
2522
2523 macro_rules! do_test_payment_path_scoring {
2524 ($nodes: expr, $receive: expr) => {
2525 let scored_scid = 4242;
2531 let secp_ctx = Secp256k1::new();
2532 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
2533 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
2534
2535 let path = Path { hops: vec![RouteHop {
2536 pubkey: node_1_id,
2537 node_features: NodeFeatures::empty(),
2538 short_channel_id: scored_scid,
2539 channel_features: ChannelFeatures::empty(),
2540 fee_msat: 0,
2541 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
2542 maybe_announced_channel: true,
2543 }], blinded_tail: None };
2544
2545 $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
2546 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
2547 payment_id: None,
2548 payment_hash: PaymentHash([42; 32]),
2549 payment_failed_permanently: false,
2550 failure: PathFailure::OnPath { network_update: None },
2551 path: path.clone(),
2552 short_channel_id: Some(scored_scid),
2553 });
2554 let event = $receive.expect("PaymentPathFailed not handled within deadline");
2555 match event {
2556 Event::PaymentPathFailed { .. } => {},
2557 _ => panic!("Unexpected event"),
2558 }
2559
2560 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
2563 $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
2564 payment_id: None,
2565 payment_hash: PaymentHash([42; 32]),
2566 payment_failed_permanently: true,
2567 failure: PathFailure::OnPath { network_update: None },
2568 path: path.clone(),
2569 short_channel_id: None,
2570 });
2571 let event = $receive.expect("PaymentPathFailed not handled within deadline");
2572 match event {
2573 Event::PaymentPathFailed { .. } => {},
2574 _ => panic!("Unexpected event"),
2575 }
2576
2577 $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
2578 $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
2579 payment_id: PaymentId([42; 32]),
2580 payment_hash: None,
2581 path: path.clone(),
2582 });
2583 let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
2584 match event {
2585 Event::PaymentPathSuccessful { .. } => {},
2586 _ => panic!("Unexpected event"),
2587 }
2588
2589 $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
2590 $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
2591 payment_id: PaymentId([42; 32]),
2592 payment_hash: PaymentHash([42; 32]),
2593 path: path.clone(),
2594 });
2595 let event = $receive.expect("ProbeSuccessful not handled within deadline");
2596 match event {
2597 Event::ProbeSuccessful { .. } => {},
2598 _ => panic!("Unexpected event"),
2599 }
2600
2601 $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
2602 $nodes[0].node.push_pending_event(Event::ProbeFailed {
2603 payment_id: PaymentId([42; 32]),
2604 payment_hash: PaymentHash([42; 32]),
2605 path,
2606 short_channel_id: Some(scored_scid),
2607 });
2608 let event = $receive.expect("ProbeFailure not handled within deadline");
2609 match event {
2610 Event::ProbeFailed { .. } => {},
2611 _ => panic!("Unexpected event"),
2612 }
2613 }
2614 }
2615
2616 #[test]
2617 fn test_payment_path_scoring() {
2618 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
2619 let event_handler = move |event: Event| {
2620 match event {
2621 Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2622 Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2623 Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2624 Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2625 _ => panic!("Unexpected event: {:?}", event),
2626 }
2627 Ok(())
2628 };
2629
2630 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
2631 let data_dir = nodes[0].kv_store.get_data_dir();
2632 let persister = Arc::new(Persister::new(data_dir));
2633 let bg_processor = BackgroundProcessor::start(
2634 persister,
2635 event_handler,
2636 nodes[0].chain_monitor.clone(),
2637 nodes[0].node.clone(),
2638 Some(nodes[0].messenger.clone()),
2639 nodes[0].no_gossip_sync(),
2640 nodes[0].peer_manager.clone(),
2641 nodes[0].logger.clone(),
2642 Some(nodes[0].scorer.clone()),
2643 );
2644
2645 do_test_payment_path_scoring!(
2646 nodes,
2647 receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
2648 );
2649
2650 if !std::thread::panicking() {
2651 bg_processor.stop().unwrap();
2652 }
2653
2654 let log_entries = nodes[0].logger.lines.lock().unwrap();
2655 let expected_log = "Persisting scorer after update".to_string();
2656 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
2657 }
2658
2659 #[tokio::test]
2660 #[cfg(feature = "futures")]
2661 async fn test_payment_path_scoring_async() {
2662 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
2663 let event_handler = move |event: Event| {
2664 let sender_ref = sender.clone();
2665 async move {
2666 match event {
2667 Event::PaymentPathFailed { .. } => sender_ref.send(event).await.unwrap(),
2668 Event::PaymentPathSuccessful { .. } => sender_ref.send(event).await.unwrap(),
2669 Event::ProbeSuccessful { .. } => sender_ref.send(event).await.unwrap(),
2670 Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(),
2671 _ => panic!("Unexpected event: {:?}", event),
2672 }
2673 Ok(())
2674 }
2675 };
2676
2677 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
2678 let data_dir = nodes[0].kv_store.get_data_dir();
2679 let persister = Arc::new(Persister::new(data_dir));
2680
2681 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2682
2683 let bp_future = super::process_events_async(
2684 persister,
2685 event_handler,
2686 nodes[0].chain_monitor.clone(),
2687 nodes[0].node.clone(),
2688 Some(nodes[0].messenger.clone()),
2689 nodes[0].no_gossip_sync(),
2690 nodes[0].peer_manager.clone(),
2691 nodes[0].logger.clone(),
2692 Some(nodes[0].scorer.clone()),
2693 move |dur: Duration| {
2694 let mut exit_receiver = exit_receiver.clone();
2695 Box::pin(async move {
2696 tokio::select! {
2697 _ = tokio::time::sleep(dur) => false,
2698 _ = exit_receiver.changed() => true,
2699 }
2700 })
2701 },
2702 false,
2703 || Some(Duration::ZERO),
2704 );
2705 let t1 = tokio::spawn(bp_future);
2706 let t2 = tokio::spawn(async move {
2707 do_test_payment_path_scoring!(nodes, receiver.recv().await);
2708 exit_sender.send(()).unwrap();
2709
2710 let log_entries = nodes[0].logger.lines.lock().unwrap();
2711 let expected_log = "Persisting scorer after update".to_string();
2712 assert_eq!(
2713 *log_entries.get(&("lightning_background_processor", expected_log)).unwrap(),
2714 5
2715 );
2716 });
2717
2718 let (r1, r2) = tokio::join!(t1, t2);
2719 r1.unwrap().unwrap();
2720 r2.unwrap()
2721 }
2722}