1use std::{
2 collections::{HashMap, HashSet},
3 net::{IpAddr, SocketAddr},
4 time::Duration,
5};
6
7use bitcoin::{
8 consensus::Decodable,
9 io::Read,
10 key::rand,
11 p2p::{address::AddrV2, message::CommandString, Magic},
12 Wtxid,
13};
14use socks::create_socks5;
15use tokio::{net::TcpStream, time::Instant};
16
17use error::PeerError;
18
19use crate::channel_messages::TimeSensitiveId;
20
21pub(crate) mod dns;
22pub(crate) mod error;
23pub(crate) mod outbound_messages;
24pub(crate) mod parsers;
25pub(crate) mod peer;
26pub(crate) mod peer_map;
27pub(crate) mod reader;
28pub(crate) mod socks;
29
30pub const PROTOCOL_VERSION: u32 = 70016;
31pub const KYOTO_VERSION: &str = "0.15.0";
32pub const RUST_BITCOIN_VERSION: &str = "0.32.7";
33
34const THIRTY_MINS: Duration = Duration::from_secs(60 * 30);
35const MESSAGE_TIMEOUT_SECS: Duration = Duration::from_secs(5);
36const TWO_HOUR: Duration = Duration::from_secs(60 * 60 * 2);
38const TCP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2);
39const SEND_PING: Duration = Duration::from_secs(60 * 2);
41
42const ADDR_HARD_LIMIT: usize = 10_000;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46pub(crate) struct PeerId(pub(crate) u32);
47
48impl PeerId {
49 pub(crate) fn increment(&mut self) {
50 self.0 = self.0.wrapping_add(1)
51 }
52}
53
54impl From<u32> for PeerId {
55 fn from(value: u32) -> Self {
56 PeerId(value)
57 }
58}
59
60impl std::fmt::Display for PeerId {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 write!(f, "Peer {}", self.0)
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
68pub struct PeerTimeoutConfig {
69 pub(crate) response_timeout: Duration,
71 pub(crate) max_connection_time: Duration,
73 pub(crate) handshake_timeout: Duration,
75}
76
77impl PeerTimeoutConfig {
78 pub fn new(
80 response_timeout: Duration,
81 max_connection_time: Duration,
82 handshake_timeout: Duration,
83 ) -> Self {
84 Self {
85 response_timeout,
86 max_connection_time,
87 handshake_timeout,
88 }
89 }
90}
91
92impl Default for PeerTimeoutConfig {
93 fn default() -> Self {
94 Self {
95 response_timeout: MESSAGE_TIMEOUT_SECS,
96 max_connection_time: TWO_HOUR,
97 handshake_timeout: TCP_CONNECTION_TIMEOUT,
98 }
99 }
100}
101
102pub(crate) struct LastBlockMonitor {
103 last_block: Option<Instant>,
104}
105
106impl LastBlockMonitor {
107 pub(crate) fn new() -> Self {
108 Self { last_block: None }
109 }
110
111 pub(crate) fn reset(&mut self) {
112 self.last_block = Some(Instant::now())
113 }
114
115 pub(crate) fn stale(&self) -> bool {
116 if let Some(time) = self.last_block {
117 return time.elapsed() > THIRTY_MINS;
118 }
119 false
120 }
121}
122
123#[derive(Debug, Clone, Copy, Default)]
124pub(crate) enum ConnectionType {
125 #[default]
126 ClearNet,
127 Socks5Proxy(SocketAddr),
128}
129
130impl ConnectionType {
131 pub(crate) fn can_connect(&self, addr: &AddrV2) -> bool {
132 match &self {
133 Self::ClearNet => matches!(addr, AddrV2::Ipv4(_) | AddrV2::Ipv6(_)),
134 Self::Socks5Proxy(_) => matches!(addr, AddrV2::Ipv4(_) | AddrV2::Ipv6(_)),
135 }
136 }
137
138 pub(crate) async fn connect(
139 &self,
140 addr: AddrV2,
141 port: u16,
142 handshake_timeout: Duration,
143 ) -> Result<TcpStream, PeerError> {
144 let socket_addr = match addr {
145 AddrV2::Ipv4(ip) => IpAddr::V4(ip),
146 AddrV2::Ipv6(ip) => IpAddr::V6(ip),
147 _ => return Err(PeerError::UnreachableSocketAddr),
148 };
149 match &self {
150 Self::ClearNet => {
151 let timeout = tokio::time::timeout(
152 handshake_timeout,
153 TcpStream::connect((socket_addr, port)),
154 )
155 .await
156 .map_err(|_| PeerError::ConnectionFailed)?;
157 let tcp_stream = timeout.map_err(|_| PeerError::ConnectionFailed)?;
158 Ok(tcp_stream)
159 }
160 Self::Socks5Proxy(proxy) => {
161 let socks5_timeout = tokio::time::timeout(
162 handshake_timeout,
163 create_socks5(*proxy, socket_addr, port),
164 )
165 .await
166 .map_err(|_| PeerError::ConnectionFailed)?;
167 let tcp_stream = socks5_timeout.map_err(PeerError::Socks5)?;
168 Ok(tcp_stream)
169 }
170 }
171 }
172}
173
174#[derive(Debug, Clone)]
175struct MessageState {
176 general_timeout: Duration,
177 version_handshake: VersionHandshakeState,
178 verack: VerackState,
179 addr_state: AddrGossipState,
180 sent_txs: HashSet<Wtxid>,
181 timed_message_state: HashMap<TimeSensitiveId, Instant>,
182 ping_state: PingState,
183}
184
185impl MessageState {
186 fn new(general_timeout: Duration) -> Self {
187 Self {
188 general_timeout,
189 version_handshake: Default::default(),
190 verack: Default::default(),
191 addr_state: Default::default(),
192 sent_txs: Default::default(),
193 timed_message_state: Default::default(),
194 ping_state: PingState::default(),
195 }
196 }
197
198 fn start_version_handshake(&mut self) {
199 self.version_handshake = self.version_handshake.start();
200 }
201
202 fn finish_version_handshake(&mut self) {
203 self.version_handshake = self.version_handshake.finish();
204 }
205
206 fn sent_tx(&mut self, wtxid: Wtxid) {
207 self.sent_txs.insert(wtxid);
208 }
209
210 fn unknown_rejection(&mut self, wtxid: Wtxid) -> bool {
211 !self.sent_txs.remove(&wtxid)
212 }
213
214 fn unresponsive(&self) -> bool {
215 self.timed_message_state
216 .values()
217 .any(|time| time.elapsed() > self.general_timeout)
218 || self.version_handshake.is_unresponsive(self.general_timeout)
219 }
220}
221
222#[derive(Debug, Clone, Copy, Default)]
223enum VersionHandshakeState {
224 #[default]
225 NotStarted,
226 Started {
227 at: tokio::time::Instant,
228 },
229 Completed,
230}
231
232impl VersionHandshakeState {
233 fn start(self) -> Self {
234 Self::Started {
235 at: tokio::time::Instant::now(),
236 }
237 }
238
239 fn finish(self) -> Self {
240 Self::Completed
241 }
242
243 fn is_complete(&self) -> bool {
244 matches!(self, Self::Completed)
245 }
246
247 fn is_unresponsive(&self, timeout: Duration) -> bool {
248 match self {
249 Self::Started { at } => at.elapsed() > timeout,
250 _ => false,
251 }
252 }
253}
254
255#[derive(Debug, Clone, Copy, Default)]
256struct VerackState {
257 got_ack: bool,
258 sent_ack: bool,
259}
260
261impl VerackState {
262 fn got_ack(&mut self) {
263 self.got_ack = true
264 }
265
266 fn sent_ack(&mut self) {
267 self.sent_ack = true
268 }
269
270 fn both_acks(&self) -> bool {
271 self.got_ack && self.sent_ack
272 }
273}
274
275#[derive(Debug, Clone, Copy, Default)]
276struct AddrGossipState {
277 num_advertised: usize,
278 gossip_stage: AddrGossipStages,
279}
280
281impl AddrGossipState {
282 fn received(&mut self, num_addrs: usize) {
283 self.num_advertised += num_addrs;
284 }
285
286 fn first_gossip(&mut self) {
287 self.gossip_stage = AddrGossipStages::RandomGossip;
288 }
289
290 fn over_limit(&self) -> bool {
291 self.num_advertised > ADDR_HARD_LIMIT
292 }
293}
294
295#[derive(Debug, Clone, Copy, Default)]
300enum AddrGossipStages {
301 #[default]
302 NotReceived,
303 RandomGossip,
304}
305
306#[derive(Debug, Clone, Copy)]
307enum PingState {
308 WaitingFor { nonce: u64 },
309 LastMessageReceied { then: Instant },
310}
311
312impl PingState {
313 fn send_ping(&mut self) -> Option<u64> {
314 match self {
315 Self::WaitingFor { nonce: _ } => None,
316 Self::LastMessageReceied { then } => {
317 if then.elapsed() > SEND_PING {
318 let nonce = rand::random();
319 *self = Self::WaitingFor { nonce };
320 Some(nonce)
321 } else {
322 None
323 }
324 }
325 }
326 }
327
328 fn check_pong(&mut self, pong: u64) -> bool {
329 match self {
330 Self::WaitingFor { nonce } => {
331 if pong.eq(&*nonce) {
332 *self = Self::LastMessageReceied {
333 then: Instant::now(),
334 };
335 true
336 } else {
337 false
338 }
339 }
340 Self::LastMessageReceied { then: _ } => false,
341 }
342 }
343
344 fn update_last_message(&mut self) {
345 match self {
346 Self::WaitingFor { nonce: _ } => (),
347 Self::LastMessageReceied { then: _ } => {
348 *self = Self::LastMessageReceied {
349 then: Instant::now(),
350 }
351 }
352 }
353 }
354}
355
356impl Default for PingState {
357 fn default() -> Self {
358 Self::LastMessageReceied {
359 then: Instant::now(),
360 }
361 }
362}
363
364pub(crate) struct V1Header {
365 magic: Magic,
366 _command: CommandString,
367 length: u32,
368 _checksum: u32,
369}
370
371impl Decodable for V1Header {
372 fn consensus_decode<R: Read + ?Sized>(
373 reader: &mut R,
374 ) -> Result<Self, bitcoin::consensus::encode::Error> {
375 let magic = Magic::consensus_decode(reader)?;
376 let _command = CommandString::consensus_decode(reader)?;
377 let length = u32::consensus_decode(reader)?;
378 let _checksum = u32::consensus_decode(reader)?;
379 Ok(Self {
380 magic,
381 _command,
382 length,
383 _checksum,
384 })
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use std::{net::Ipv4Addr, time::Duration};
391
392 use bitcoin::{consensus::deserialize, p2p::address::AddrV2, Transaction};
393
394 use crate::{
395 network::{AddrGossipStages, LastBlockMonitor, MessageState, PingState},
396 prelude::Netgroup,
397 };
398
399 #[test]
400 fn test_sixteen() {
401 let peer = AddrV2::Ipv4(Ipv4Addr::new(95, 217, 198, 121));
402 assert_eq!("95.217".to_string(), peer.netgroup());
403 }
404
405 #[tokio::test(start_paused = true)]
406 async fn test_version_message_state() {
407 let timeout = Duration::from_secs(1);
408 let mut message_state = MessageState::new(timeout);
409 assert!(!message_state.unresponsive());
410 tokio::time::sleep(Duration::from_secs(2)).await;
411 assert!(!message_state.unresponsive());
412 message_state.start_version_handshake();
413 tokio::time::sleep(Duration::from_secs(2)).await;
414 assert!(message_state.unresponsive());
415 let mut message_state = MessageState::new(timeout);
416 message_state.start_version_handshake();
417 message_state.finish_version_handshake();
418 tokio::time::sleep(Duration::from_secs(2)).await;
419 assert!(!message_state.unresponsive());
420 assert!(message_state.version_handshake.is_complete());
421 }
422
423 #[test]
424 fn test_verack_state() {
425 let timeout = Duration::from_secs(1);
426 let mut messsage_state = MessageState::new(timeout);
427 messsage_state.version_handshake.start();
428 messsage_state.verack.got_ack();
429 assert!(!messsage_state.verack.both_acks());
430 messsage_state.verack.sent_ack();
431 assert!(messsage_state.verack.both_acks());
432 }
433
434 #[test]
435 fn test_tx_reject_state() {
436 let transaction: Transaction = deserialize(&hex::decode("0200000000010158e87a21b56daf0c23be8e7070456c336f7cbaa5c8757924f545887bb2abdd7501000000171600145f275f436b09a8cc9a2eb2a2f528485c68a56323feffffff02d8231f1b0100000017a914aed962d6654f9a2b36608eb9d64d2b260db4f1118700c2eb0b0000000017a914b7f5faf40e3d40a5a459b1db3535f2b72fa921e88702483045022100a22edcc6e5bc511af4cc4ae0de0fcd75c7e04d8c1c3a8aa9d820ed4b967384ec02200642963597b9b1bc22c75e9f3e117284a962188bf5e8a74c895089046a20ad770121035509a48eb623e10aace8bfd0212fdb8a8e5af3c94b0b133b95e114cab89e4f7965000000").unwrap()).unwrap();
437 let wtxid = transaction.compute_wtxid();
438 let mut message_state = MessageState::new(Duration::from_secs(2));
439 message_state.sent_tx(wtxid);
440 assert!(!message_state.unknown_rejection(wtxid));
441 assert!(message_state.unknown_rejection(wtxid));
442 }
443
444 #[test]
445 fn test_addr_gossip_state() {
446 let mut message_state = MessageState::new(Duration::from_secs(2));
447 assert!(matches!(
448 message_state.addr_state.gossip_stage,
449 AddrGossipStages::NotReceived
450 ));
451 message_state.addr_state.received(100);
452 message_state.addr_state.first_gossip();
453 assert!(matches!(
454 message_state.addr_state.gossip_stage,
455 AddrGossipStages::RandomGossip
456 ));
457 assert!(!message_state.addr_state.over_limit());
458 message_state.addr_state.received(10_000);
459 assert!(message_state.addr_state.over_limit());
460 }
461
462 #[tokio::test(start_paused = true)]
463 async fn test_ping_state() {
464 let mut ping_state = PingState::default();
466 assert!(ping_state.send_ping().is_none());
467 tokio::time::sleep(Duration::from_secs(60)).await;
468 assert!(ping_state.send_ping().is_none());
469 tokio::time::sleep(Duration::from_secs(70)).await;
470 assert!(ping_state.send_ping().is_some());
471 assert!(ping_state.send_ping().is_none());
473 let mut ping_state = PingState::default();
475 tokio::time::sleep(Duration::from_secs(60 * 3)).await;
476 let ping = ping_state.send_ping().unwrap();
477 tokio::time::sleep(Duration::from_secs(60 * 3)).await;
478 assert!(ping_state.check_pong(ping));
479 assert!(!ping_state.check_pong(ping));
480 assert!(ping_state.send_ping().is_none());
481 tokio::time::sleep(Duration::from_secs(60 * 3)).await;
482 assert!(ping_state.send_ping().is_some());
483 let mut ping_state = PingState::default();
485 tokio::time::sleep(Duration::from_secs(60 * 3)).await;
486 let ping = ping_state.send_ping().unwrap();
487 ping_state.update_last_message();
488 assert!(ping_state.check_pong(ping));
489 let mut ping_state = PingState::default();
491 assert!(ping_state.send_ping().is_none());
492 tokio::time::sleep(Duration::from_secs(60)).await;
493 assert!(ping_state.send_ping().is_none());
494 ping_state.update_last_message();
495 tokio::time::sleep(Duration::from_secs(70)).await;
496 assert!(ping_state.send_ping().is_none());
497 }
498
499 #[tokio::test(start_paused = true)]
500 async fn test_block_detected_stale() {
501 let mut last_block = LastBlockMonitor::new();
502 tokio::time::sleep(Duration::from_secs(60 * 40)).await;
503 assert!(!last_block.stale());
505 last_block.reset();
506 tokio::time::sleep(Duration::from_secs(60 * 20)).await;
507 assert!(!last_block.stale());
509 tokio::time::sleep(Duration::from_secs(60 * 20)).await;
511 assert!(last_block.stale());
512 last_block.reset();
513 assert!(!last_block.stale());
514 }
515}