Skip to main content

snarkos_node_sync/
ping.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::locators::BlockLocators;
17use snarkos_node_router::Router;
18use snarkvm::prelude::Network;
19
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::Mutex;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::Mutex;
24use std::{
25    collections::BTreeMap,
26    net::SocketAddr,
27    sync::Arc,
28    time::{Duration, Instant},
29};
30use tokio::{sync::Notify, time::timeout};
31
32/// Internal state of the ping logic
33///
34/// Essentially, ping keeps an ordered map `next_ping` of time(rs) to peer IPs.
35/// When a new peer connects or a Pong message is received, an entry in next ping is created
36/// for when a peer should next be pinged.
37///
38/// TODO (kaimast): maybe keep track of the last ping too, to not trigger spam detection?
39struct PingInner<N: Network> {
40    /// The next time we should ping a peer.
41    next_ping: BTreeMap<Instant, SocketAddr>,
42    /// The most recent block locators.
43    /// (or None if this node does not offer block sync)
44    block_locators: Option<BlockLocators<N>>,
45}
46
47/// Manages sending Ping messages to all connected peers.
48pub struct Ping<N: Network> {
49    router: Router<N>,
50    inner: Arc<Mutex<PingInner<N>>>,
51    notify: Arc<Notify>,
52}
53
54impl<N: Network> PingInner<N> {
55    fn new(block_locators: Option<BlockLocators<N>>) -> Self {
56        Self { block_locators, next_ping: Default::default() }
57    }
58}
59
60impl<N: Network> Ping<N> {
61    /// The duration in seconds to wait between sending ping requests to a peer.
62    const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
63
64    /// Create a new instance of the ping logic.
65    /// There should only be one per node.
66    ///
67    /// # Usage
68    /// Initialize this with the most up-to-date block locators and call
69    /// update_block_locators, whenever a new block is received/created.
70    pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
71        let notify = Arc::new(Notify::default());
72        let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
73
74        {
75            let inner = inner.clone();
76            let router_ = router.clone();
77            let notify = notify.clone();
78
79            router.spawn(async move {
80                Self::ping_task(&inner, &router_, &notify).await;
81            });
82        }
83
84        Self { inner, router, notify }
85    }
86
87    /// Same as [`Self::new`] but for nodes that peers cannot sync from
88    /// such as provers.
89    pub fn new_nosync(router: Router<N>) -> Self {
90        let notify = Arc::new(Notify::default());
91        let inner = Arc::new(Mutex::new(PingInner::new(None)));
92
93        {
94            let inner = inner.clone();
95            let router_ = router.clone();
96            let notify = notify.clone();
97
98            router.spawn(async move {
99                Self::ping_task(&inner, &router_, &notify).await;
100            });
101        }
102
103        Self { inner, router, notify }
104    }
105
106    /// Notify the ping logic that we received a Pong response.
107    pub fn on_pong_received(&self, peer_ip: SocketAddr) {
108        let now = Instant::now();
109        let mut inner = self.inner.lock();
110
111        inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
112
113        // self.notify.notify() is not needed as ping_task wakes up every MAX_PING_INTERVAL
114    }
115
116    /// Notify the ping logic that a new peer connected.
117    pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
118        // Send the first ping.
119        let locators = self.inner.lock().block_locators.clone();
120        if !self.router.send_ping(peer_ip, locators) {
121            warn!("Peer {peer_ip} connected and immediately disconnected?");
122        }
123    }
124
125    /// Notify the ping logic that new blocks were created or synced.
126    pub fn update_block_locators(&self, locators: BlockLocators<N>) {
127        self.inner.lock().block_locators = Some(locators);
128
129        // wake up the ping task
130        self.notify.notify_one();
131    }
132
133    /// Background task that periodically sends out new ping messages.
134    async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
135        let mut new_block = false;
136
137        loop {
138            if router.ledger().is_stopped() {
139                break;
140            }
141
142            // Do not hold the lock while waiting.
143            let sleep_time = {
144                let mut inner = inner.lock();
145                let now = Instant::now();
146
147                // Ping peers.
148                if new_block {
149                    Self::ping_all_peers(&mut inner, router);
150                    new_block = false;
151                } else {
152                    Self::ping_expired_peers(now, &mut inner, router);
153                }
154
155                // Figure out how long to sleep.
156                if let Some((time, _)) = inner.next_ping.first_key_value() {
157                    time.saturating_duration_since(now)
158                } else {
159                    Self::MAX_PING_INTERVAL
160                }
161            };
162
163            // wait to be woke up, either by timer or notify
164            if timeout(sleep_time, notify.notified()).await.is_ok() {
165                // If the timer is not expired, it means we got woken up by a new block.
166                new_block = true;
167            }
168        }
169    }
170
171    /// Ping all peers that have an expired timer.
172    fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
173        loop {
174            // Find next peer to contact.
175            let peer_ip = {
176                let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
177                    return;
178                };
179
180                if *time > now {
181                    return;
182                }
183
184                *peer_ip
185            };
186
187            // Send new ping
188            let locators = inner.block_locators.clone();
189            let success = router.send_ping(peer_ip, locators.clone());
190            inner.next_ping.pop_first();
191
192            if !success {
193                trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
194            }
195        }
196    }
197
198    /// Ping all known peers.
199    fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
200        let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
201        inner.next_ping.clear();
202
203        for peer_ip in peers {
204            let locators = inner.block_locators.clone();
205            let success = router.send_ping(peer_ip, locators);
206
207            if !success {
208                trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
209            }
210        }
211    }
212}