pub async fn process_events_async<'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, ES: 'static + Deref + Send + Sync, NS: 'static + Deref + Send + Sync, SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, R: 'static + Deref + Send + Sync, G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, Descriptor: 'static + SocketDescriptor + Send + Sync, CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, OMH: 'static + Deref + Send + Sync, EventHandlerFuture: Future<Output = ()>, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync, PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync, RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync, S: 'static + Deref<Target = SC> + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: Future<Output = bool> + Unpin, Sleeper: Fn(Duration) -> SleepFuture>(
persister: PS,
event_handler: EventHandler,
chain_monitor: M,
channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>,
peer_manager: PM,
logger: L,
scorer: Option<S>,
sleeper: Sleeper,
mobile_interruptable_platform: bool
) -> Result<(), Error>where
UL::Target: 'static + UtxoLookup,
CF::Target: 'static + Filter,
CW::Target: 'static + Watch<<SP::Target as SignerProvider>::Signer>,
T::Target: 'static + BroadcasterInterface,
ES::Target: 'static + EntropySource,
NS::Target: 'static + NodeSigner,
SP::Target: 'static + SignerProvider,
F::Target: 'static + FeeEstimator,
R::Target: 'static + Router,
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
CMH::Target: 'static + ChannelMessageHandler,
OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
futures
only.Expand description
Processes background events in a future.
sleeper
should return a future which completes in the given amount of time and returns a
boolean indicating whether the background processing should exit. Once sleeper
returns a
future which outputs true
, the loop will exit and this function’s future will complete.
The sleeper
future is free to return early after it has triggered the exit condition.
See BackgroundProcessor::start
for information on which actions this handles.
Requires the futures
feature. Note that while this method is available without the std
feature, doing so will skip calling NetworkGraph::remove_stale_channels_and_tracking
,
you should call NetworkGraph::remove_stale_channels_and_tracking_with_time
regularly
manually instead.
The mobile_interruptable_platform
flag should be set if we’re currently running on a
mobile device, where we may need to check for interruption of the application regularly. If you
are unsure, you should set the flag, as the performance impact of it is minimal unless there
are hundreds or thousands of simultaneous process calls running.
For example, in order to process background events in a Tokio task, you
could setup process_events_async
like this:
let background_persister = Arc::clone(&my_persister);
let background_event_handler = Arc::clone(&my_event_handler);
let background_chain_mon = Arc::clone(&my_chain_monitor);
let background_chan_man = Arc::clone(&my_channel_manager);
let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
let background_peer_man = Arc::clone(&my_peer_manager);
let background_logger = Arc::clone(&my_logger);
let background_scorer = Arc::clone(&my_scorer);
// Setup the sleeper.
let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
let sleeper = move |d| {
let mut receiver = stop_receiver.clone();
Box::pin(async move {
tokio::select!{
_ = tokio::time::sleep(d) => false,
_ = receiver.changed() => true,
}
})
};
let mobile_interruptable_platform = false;
let handle = tokio::spawn(async move {
process_events_async(
background_persister,
|e| background_event_handler.handle_event(e),
background_chain_mon,
background_chan_man,
background_gossip_sync,
background_peer_man,
background_logger,
Some(background_scorer),
sleeper,
mobile_interruptable_platform,
)
.await
.expect("Failed to process events");
});
// Stop the background processing.
stop_sender.send(()).unwrap();
handle.await.unwrap();