snarkos_node_router/
routing.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{Heartbeat, Inbound, Outbound};
17use snarkos_node_tcp::{
18    P2P,
19    protocols::{Disconnect, Handshake, OnConnect, Writing},
20};
21use snarkvm::prelude::Network;
22
23use std::time::{Duration, Instant};
24
25#[async_trait]
26pub trait Routing<N: Network>:
27    P2P + Disconnect + OnConnect + Handshake + Inbound<N> + Outbound<N> + Heartbeat<N>
28{
29    /// Initialize the routing.
30    async fn initialize_routing(&self) {
31        // Enable the TCP protocols.
32        self.enable_handshake().await;
33        self.enable_reading().await;
34        self.router().enable_writing().await;
35        self.enable_disconnect().await;
36        self.enable_on_connect().await;
37        // Enable the TCP listener. Note: This must be called after the above protocols.
38        self.enable_listener().await;
39        // Initialize the heartbeat.
40        self.initialize_heartbeat();
41    }
42
43    // Start listening for inbound connections.
44    async fn enable_listener(&self) {
45        let listen_addr = self.tcp().enable_listener().await.expect("Failed to enable the TCP listener");
46        debug!("Listening for peer connections at address {listen_addr:?}");
47    }
48
49    /// Spawns the heartbeat background task for this instance of `Routing`.
50    fn initialize_heartbeat(&self) {
51        let self_clone = self.clone();
52        self.router().spawn(async move {
53            // Sleep for `HEARTBEAT_IN_SECS` seconds.
54            let min_heartbeat_interval = Duration::from_secs(Self::HEARTBEAT_IN_SECS);
55            let mut last_update = Instant::now();
56
57            loop {
58                // Process a heartbeat in the router.
59                self_clone.heartbeat().await;
60
61                // Figure out how long the heartbeat took
62                let now = Instant::now();
63                let elapsed = now.saturating_duration_since(last_update);
64                last_update = now;
65
66                // (Potentially) sleep to avoid invoking heartbeat too frequently.
67                let sleep_time = min_heartbeat_interval.saturating_sub(elapsed);
68                if !sleep_time.is_zero() {
69                    tokio::time::sleep(sleep_time).await;
70                }
71            }
72        });
73    }
74}