snarkos_node_sync/
ping.rs1use crate::locators::BlockLocators;
17use snarkos_node_router::Router;
18use snarkvm::prelude::Network;
19
20#[cfg(feature = "locktick")]
21use locktick::parking_lot::Mutex;
22#[cfg(not(feature = "locktick"))]
23use parking_lot::Mutex;
24use std::{
25 collections::BTreeMap,
26 net::SocketAddr,
27 sync::Arc,
28 time::{Duration, Instant},
29};
30use tokio::{sync::Notify, time::timeout};
31
32struct PingInner<N: Network> {
40 next_ping: BTreeMap<Instant, SocketAddr>,
42 block_locators: Option<BlockLocators<N>>,
45}
46
47pub struct Ping<N: Network> {
49 router: Router<N>,
50 inner: Arc<Mutex<PingInner<N>>>,
51 notify: Arc<Notify>,
52}
53
54impl<N: Network> PingInner<N> {
55 fn new(block_locators: Option<BlockLocators<N>>) -> Self {
56 Self { block_locators, next_ping: Default::default() }
57 }
58}
59
60impl<N: Network> Ping<N> {
61 const MAX_PING_INTERVAL: Duration = Duration::from_secs(20);
63
64 pub fn new(router: Router<N>, block_locators: BlockLocators<N>) -> Self {
71 let notify = Arc::new(Notify::default());
72 let inner = Arc::new(Mutex::new(PingInner::new(Some(block_locators))));
73
74 {
75 let inner = inner.clone();
76 let router_ = router.clone();
77 let notify = notify.clone();
78
79 router.spawn(async move {
80 Self::ping_task(&inner, &router_, ¬ify).await;
81 });
82 }
83
84 Self { inner, router, notify }
85 }
86
87 pub fn new_nosync(router: Router<N>) -> Self {
90 let notify = Arc::new(Notify::default());
91 let inner = Arc::new(Mutex::new(PingInner::new(None)));
92
93 {
94 let inner = inner.clone();
95 let router_ = router.clone();
96 let notify = notify.clone();
97
98 router.spawn(async move {
99 Self::ping_task(&inner, &router_, ¬ify).await;
100 });
101 }
102
103 Self { inner, router, notify }
104 }
105
106 pub fn on_pong_received(&self, peer_ip: SocketAddr) {
108 let now = Instant::now();
109 let mut inner = self.inner.lock();
110
111 inner.next_ping.insert(now + Self::MAX_PING_INTERVAL, peer_ip);
112
113 }
115
116 pub fn on_peer_connected(&self, peer_ip: SocketAddr) {
118 let locators = self.inner.lock().block_locators.clone();
120 if !self.router.send_ping(peer_ip, locators) {
121 warn!("Peer {peer_ip} connected and immediately disconnected?");
122 }
123 }
124
125 pub fn update_block_locators(&self, locators: BlockLocators<N>) {
127 self.inner.lock().block_locators = Some(locators);
128
129 self.notify.notify_one();
131 }
132
133 async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
135 let mut new_block = false;
136
137 loop {
138 if router.ledger().is_stopped() {
139 break;
140 }
141
142 let sleep_time = {
144 let mut inner = inner.lock();
145 let now = Instant::now();
146
147 if new_block {
149 Self::ping_all_peers(&mut inner, router);
150 new_block = false;
151 } else {
152 Self::ping_expired_peers(now, &mut inner, router);
153 }
154
155 if let Some((time, _)) = inner.next_ping.first_key_value() {
157 time.saturating_duration_since(now)
158 } else {
159 Self::MAX_PING_INTERVAL
160 }
161 };
162
163 if timeout(sleep_time, notify.notified()).await.is_ok() {
165 new_block = true;
167 }
168 }
169 }
170
171 fn ping_expired_peers(now: Instant, inner: &mut PingInner<N>, router: &Router<N>) {
173 loop {
174 let peer_ip = {
176 let Some((time, peer_ip)) = inner.next_ping.first_key_value() else {
177 return;
178 };
179
180 if *time > now {
181 return;
182 }
183
184 *peer_ip
185 };
186
187 let locators = inner.block_locators.clone();
189 let success = router.send_ping(peer_ip, locators.clone());
190 inner.next_ping.pop_first();
191
192 if !success {
193 trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
194 }
195 }
196 }
197
198 fn ping_all_peers(inner: &mut PingInner<N>, router: &Router<N>) {
200 let peers: Vec<SocketAddr> = inner.next_ping.values().copied().collect();
201 inner.next_ping.clear();
202
203 for peer_ip in peers {
204 let locators = inner.block_locators.clone();
205 let success = router.send_ping(peer_ip, locators);
206
207 if !success {
208 trace!("Failed to send ping to peer {peer_ip}. Disconnected.");
209 }
210 }
211 }
212}