snarkos_node_router/routing.rs
1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{Heartbeat, Inbound, Outbound};
17use snarkos_node_tcp::{
18 P2P,
19 protocols::{Disconnect, Handshake, OnConnect, Writing},
20};
21use snarkvm::prelude::Network;
22
23use std::time::{Duration, Instant};
24
25#[async_trait]
26pub trait Routing<N: Network>:
27 P2P + Disconnect + OnConnect + Handshake + Inbound<N> + Outbound<N> + Heartbeat<N>
28{
29 /// Initialize the routing.
30 async fn initialize_routing(&self) {
31 // Enable the TCP protocols.
32 self.enable_handshake().await;
33 self.enable_reading().await;
34 self.router().enable_writing().await;
35 self.enable_disconnect().await;
36 self.enable_on_connect().await;
37 // Enable the TCP listener. Note: This must be called after the above protocols.
38 self.enable_listener().await;
39 // Initialize the heartbeat.
40 self.initialize_heartbeat();
41 }
42
43 // Start listening for inbound connections.
44 async fn enable_listener(&self) {
45 let listen_addr = self.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
46 debug!("Listening for peer connections at address {listen_addr:?}");
47 }
48
49 /// Spawns the heartbeat background task for this instance of `Routing`.
50 fn initialize_heartbeat(&self) {
51 let self_clone = self.clone();
52 self.router().spawn(async move {
53 // Sleep for `HEARTBEAT_IN_SECS` seconds.
54 let min_heartbeat_interval = Duration::from_secs(Self::HEARTBEAT_IN_SECS);
55 let mut last_update = Instant::now();
56
57 loop {
58 // Process a heartbeat in the router.
59 self_clone.heartbeat().await;
60
61 // Figure out how long the heartbeat took
62 let now = Instant::now();
63 let elapsed = now.saturating_duration_since(last_update);
64 last_update = now;
65
66 // (Potentially) sleep to avoid invoking heartbeat too frequently.
67 let sleep_time = min_heartbeat_interval.saturating_sub(elapsed);
68 if !sleep_time.is_zero() {
69 tokio::time::sleep(sleep_time).await;
70 }
71 }
72 });
73 }
74}