snarkos_node_sync/
ping.rs1use crate::locators::BlockLocators;
17use snarkos_node_router::Router;
18use snarkvm::prelude::Network;
19
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::Mutex;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::Mutex;
24use std::{
25 collections::BTreeMap,
26 net::SocketAddr,
27 sync::Arc,
28 time::{Duration, Instant},
29};
30use tokio::{sync::Notify, time::timeout};
31
32struct PingInner<N: Network> {
40 next_ping: BTreeMap<Instant, SocketAddr>,
42 block_locators: Option<BlockLocators<N>>,
45}
46
47pub struct Ping<N: Network> {
49 router: Router<N>,
50 inner: Arc<Mutex<PingInner<N>>>,
51 notify: Arc<Notify>,
52}
53
54impl<N: Network> PingInner<N> {
55 fn new(block_locators: Option<BlockLocators<N>>) -> Self {
56 Self { block_locators, next_ping: Default::default() }
57 }
58}
59
60impl<N: Network> Ping<N> {
61 const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
63
64 pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
71 let notify = Arc::new(Notify::default());
72 let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
73
74 {
75 let inner = inner.clone();
76 let router = router.clone();
77 let notify = notify.clone();
78
79 tokio::spawn(async move {
80 Self::ping_task(&inner, &router, ¬ify).await;
81 });
82 }
83
84 Self { inner, router, notify }
85 }
86
87 pub fn new_nosync(router: Router<N>) -> Self {
90 let notify = Arc::new(Notify::default());
91 let inner = Arc::new(Mutex::new(PingInner::new(None)));
92
93 {
94 let inner = inner.clone();
95 let router = router.clone();
96 let notify = notify.clone();
97
98 tokio::spawn(async move {
99 Self::ping_task(&inner, &router, ¬ify).await;
100 });
101 }
102
103 Self { inner, router, notify }
104 }
105
106 pub fn on_pong_received(&self, peer_ip: SocketAddr) {
108 let now = Instant::now();
109 let mut inner = self.inner.lock();
110
111 inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
112
113 }
115
116 pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
118 let locators = self.inner.lock().block_locators.clone();
120 if !self.router.send_ping(peer_ip, locators) {
121 warn!("Peer {peer_ip} connected and immediately disconnected?");
122 }
123 }
124
125 pub fn update_block_locators(&self, locators: BlockLocators<N>) {
127 self.inner.lock().block_locators = Some(locators);
128
129 self.notify.notify_one();
131 }
132
133 async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
135 let mut new_block = false;
136
137 loop {
138 let sleep_time = {
140 let mut inner = inner.lock();
141 let now = Instant::now();
142
143 if new_block {
145 Self::ping_all_peers(&mut inner, router);
146 new_block = false;
147 } else {
148 Self::ping_expired_peers(now, &mut inner, router);
149 }
150
151 if let Some((time, _)) = inner.next_ping.first_key_value() {
153 time.saturating_duration_since(now)
154 } else {
155 Self::MAX_PING_INTERVAL
156 }
157 };
158
159 if timeout(sleep_time, notify.notified()).await.is_ok() {
161 new_block = true;
163 }
164 }
165 }
166
167 fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
169 loop {
170 let peer_ip = {
172 let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
173 return;
174 };
175
176 if *time > now {
177 return;
178 }
179
180 *peer_ip
181 };
182
183 let locators = inner.block_locators.clone();
185 let success = router.send_ping(peer_ip, locators.clone());
186 inner.next_ping.pop_first();
187
188 if !success {
189 trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
190 }
191 }
192 }
193
194 fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
196 let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
197 inner.next_ping.clear();
198
199 for peer_ip in peers {
200 let locators = inner.block_locators.clone();
201 let success = router.send_ping(peer_ip, locators);
202
203 if !success {
204 trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
205 }
206 }
207 }
208}