snarkos_node_sync/
ping.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::locators::BlockLocators;
17use snarkos_node_router::Router;
18use snarkvm::prelude::Network;
19
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::Mutex;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::Mutex;
24use std::{
25    collections::BTreeMap,
26    net::SocketAddr,
27    sync::Arc,
28    time::{Duration, Instant},
29};
30use tokio::{sync::Notify, time::timeout};
31
32/// Internal state of the ping logic
33///
34/// Essentially, ping keeps an ordered map `next_ping` of time(rs) to peer IPs.
35/// When a new peer connects or a Pong message is received, an entry in next ping is created
36/// for when a peer should next be pinged.
37///
38/// TODO (kaimast): maybe keep track of the last ping too, to not trigger spam detection?
39struct PingInner<N: Network> {
40    /// The next time we should ping a peer.
41    next_ping: BTreeMap<Instant, SocketAddr>,
42    /// The most recent block locators.
43    /// (or None if this node does not offer block sync)
44    block_locators: Option<BlockLocators<N>>,
45}
46
47/// Manages sending Ping messages to all connected peers.
48pub struct Ping<N: Network> {
49    router: Router<N>,
50    inner: Arc<Mutex<PingInner<N>>>,
51    notify: Arc<Notify>,
52}
53
54impl<N: Network> PingInner<N> {
55    fn new(block_locators: Option<BlockLocators<N>>) -> Self {
56        Self { block_locators, next_ping: Default::default() }
57    }
58}
59
60impl<N: Network> Ping<N> {
61    /// The duration in seconds to wait between sending ping requests to a peer.
62    const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
63
64    /// Create a new instance of the ping logic.
65    /// There should only be one per node.
66    ///
67    /// # Usage
68    /// Initialize this with the most up-to-date block locators and call
69    /// update_block_locators, whenever a new block is received/created.
70    pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
71        let notify = Arc::new(Notify::default());
72        let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
73
74        {
75            let inner = inner.clone();
76            let router = router.clone();
77            let notify = notify.clone();
78
79            tokio::spawn(async move {
80                Self::ping_task(&inner, &router, &notify).await;
81            });
82        }
83
84        Self { inner, router, notify }
85    }
86
87    /// Same as [`Self::new`] but for nodes that peers cannot sync from
88    /// such as provers.
89    pub fn new_nosync(router: Router<N>) -> Self {
90        let notify = Arc::new(Notify::default());
91        let inner = Arc::new(Mutex::new(PingInner::new(None)));
92
93        {
94            let inner = inner.clone();
95            let router = router.clone();
96            let notify = notify.clone();
97
98            tokio::spawn(async move {
99                Self::ping_task(&inner, &router, &notify).await;
100            });
101        }
102
103        Self { inner, router, notify }
104    }
105
106    /// Notify the ping logic that we received a Pong response.
107    pub fn on_pong_received(&self, peer_ip: SocketAddr) {
108        let now = Instant::now();
109        let mut inner = self.inner.lock();
110
111        inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
112
113        // self.notify.notify() is not needed as ping_task wakes up every MAX_PING_INTERVAL
114    }
115
116    /// Notify the ping logic that a new peer connected.
117    pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
118        // Send the first ping.
119        let locators = self.inner.lock().block_locators.clone();
120        if !self.router.send_ping(peer_ip, locators) {
121            warn!("Peer {peer_ip} connected and immediately disconnected?");
122        }
123    }
124
125    /// Notify the ping logic that new blocks were created or synced.
126    pub fn update_block_locators(&self, locators: BlockLocators<N>) {
127        self.inner.lock().block_locators = Some(locators);
128
129        // wake up the ping task
130        self.notify.notify_one();
131    }
132
133    /// Background task that periodically sends out new ping messages.
134    async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
135        let mut new_block = false;
136
137        loop {
138            // Do not hold the lock while waiting.
139            let sleep_time = {
140                let mut inner = inner.lock();
141                let now = Instant::now();
142
143                // Ping peers.
144                if new_block {
145                    Self::ping_all_peers(&mut inner, router);
146                    new_block = false;
147                } else {
148                    Self::ping_expired_peers(now, &mut inner, router);
149                }
150
151                // Figure out how long to sleep.
152                if let Some((time, _)) = inner.next_ping.first_key_value() {
153                    time.saturating_duration_since(now)
154                } else {
155                    Self::MAX_PING_INTERVAL
156                }
157            };
158
159            // wait to be woke up, either by timer or notify
160            if timeout(sleep_time, notify.notified()).await.is_ok() {
161                // If the timer is not expired, it means we got woken up by a new block.
162                new_block = true;
163            }
164        }
165    }
166
167    /// Ping all peers that have an expired timer.
168    fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
169        loop {
170            // Find next peer to contact.
171            let peer_ip = {
172                let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
173                    return;
174                };
175
176                if *time > now {
177                    return;
178                }
179
180                *peer_ip
181            };
182
183            // Send new ping
184            let locators = inner.block_locators.clone();
185            let success = router.send_ping(peer_ip, locators.clone());
186            inner.next_ping.pop_first();
187
188            if !success {
189                trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
190            }
191        }
192    }
193
194    /// Ping all known peers.
195    fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
196        let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
197        inner.next_ping.clear();
198
199        for peer_ip in peers {
200            let locators = inner.block_locators.clone();
201            let success = router.send_ping(peer_ip, locators);
202
203            if !success {
204                trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
205            }
206        }
207    }
208}