kyoto/network/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    net::{IpAddr, SocketAddr},
4    time::Duration,
5};
6
7use bitcoin::{
8    consensus::Decodable,
9    io::Read,
10    key::rand,
11    p2p::{address::AddrV2, message::CommandString, Magic},
12    Wtxid,
13};
14use socks::create_socks5;
15use tokio::{net::TcpStream, time::Instant};
16
17use error::PeerError;
18
19use crate::channel_messages::TimeSensitiveId;
20
21pub(crate) mod dns;
22pub(crate) mod error;
23pub(crate) mod outbound_messages;
24pub(crate) mod parsers;
25pub(crate) mod peer;
26pub(crate) mod peer_map;
27pub(crate) mod reader;
28pub(crate) mod socks;
29
30pub const PROTOCOL_VERSION: u32 = 70016;
31pub const KYOTO_VERSION: &str = "0.15.0";
32pub const RUST_BITCOIN_VERSION: &str = "0.32.7";
33
34const THIRTY_MINS: Duration = Duration::from_secs(60 * 30);
35const MESSAGE_TIMEOUT_SECS: Duration = Duration::from_secs(5);
36//                                            sec  min  hour
37const TWO_HOUR: Duration = Duration::from_secs(60 * 60 * 2);
38const TCP_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2);
39// Ping the peer if we have not exchanged messages for two minutes
40const SEND_PING: Duration = Duration::from_secs(60 * 2);
41
42// A peer cannot send 10,000 ADDRs in one connection.
43const ADDR_HARD_LIMIT: usize = 10_000;
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
46pub(crate) struct PeerId(pub(crate) u32);
47
48impl PeerId {
49    pub(crate) fn increment(&mut self) {
50        self.0 = self.0.wrapping_add(1)
51    }
52}
53
54impl From<u32> for PeerId {
55    fn from(value: u32) -> Self {
56        PeerId(value)
57    }
58}
59
60impl std::fmt::Display for PeerId {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        write!(f, "Peer {}", self.0)
63    }
64}
65
66/// Configuration for peer connection timeouts
67#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
68pub struct PeerTimeoutConfig {
69    /// How long to wait for a peer to respond to a request
70    pub(crate) response_timeout: Duration,
71    /// Maximum time to maintain a connection with a peer
72    pub(crate) max_connection_time: Duration,
73    /// How much time does the peer have to make the initial TCP handshake
74    pub(crate) handshake_timeout: Duration,
75}
76
77impl PeerTimeoutConfig {
78    /// Create a new peer timeout configuration
79    pub fn new(
80        response_timeout: Duration,
81        max_connection_time: Duration,
82        handshake_timeout: Duration,
83    ) -> Self {
84        Self {
85            response_timeout,
86            max_connection_time,
87            handshake_timeout,
88        }
89    }
90}
91
92impl Default for PeerTimeoutConfig {
93    fn default() -> Self {
94        Self {
95            response_timeout: MESSAGE_TIMEOUT_SECS,
96            max_connection_time: TWO_HOUR,
97            handshake_timeout: TCP_CONNECTION_TIMEOUT,
98        }
99    }
100}
101
102pub(crate) struct LastBlockMonitor {
103    last_block: Option<Instant>,
104}
105
106impl LastBlockMonitor {
107    pub(crate) fn new() -> Self {
108        Self { last_block: None }
109    }
110
111    pub(crate) fn reset(&mut self) {
112        self.last_block = Some(Instant::now())
113    }
114
115    pub(crate) fn stale(&self) -> bool {
116        if let Some(time) = self.last_block {
117            return time.elapsed() > THIRTY_MINS;
118        }
119        false
120    }
121}
122
123#[derive(Debug, Clone, Copy, Default)]
124pub(crate) enum ConnectionType {
125    #[default]
126    ClearNet,
127    Socks5Proxy(SocketAddr),
128}
129
130impl ConnectionType {
131    pub(crate) fn can_connect(&self, addr: &AddrV2) -> bool {
132        match &self {
133            Self::ClearNet => matches!(addr, AddrV2::Ipv4(_) | AddrV2::Ipv6(_)),
134            Self::Socks5Proxy(_) => matches!(addr, AddrV2::Ipv4(_) | AddrV2::Ipv6(_)),
135        }
136    }
137
138    pub(crate) async fn connect(
139        &self,
140        addr: AddrV2,
141        port: u16,
142        handshake_timeout: Duration,
143    ) -> Result<TcpStream, PeerError> {
144        let socket_addr = match addr {
145            AddrV2::Ipv4(ip) => IpAddr::V4(ip),
146            AddrV2::Ipv6(ip) => IpAddr::V6(ip),
147            _ => return Err(PeerError::UnreachableSocketAddr),
148        };
149        match &self {
150            Self::ClearNet => {
151                let timeout = tokio::time::timeout(
152                    handshake_timeout,
153                    TcpStream::connect((socket_addr, port)),
154                )
155                .await
156                .map_err(|_| PeerError::ConnectionFailed)?;
157                let tcp_stream = timeout.map_err(|_| PeerError::ConnectionFailed)?;
158                Ok(tcp_stream)
159            }
160            Self::Socks5Proxy(proxy) => {
161                let socks5_timeout = tokio::time::timeout(
162                    handshake_timeout,
163                    create_socks5(*proxy, socket_addr, port),
164                )
165                .await
166                .map_err(|_| PeerError::ConnectionFailed)?;
167                let tcp_stream = socks5_timeout.map_err(PeerError::Socks5)?;
168                Ok(tcp_stream)
169            }
170        }
171    }
172}
173
174#[derive(Debug, Clone)]
175struct MessageState {
176    general_timeout: Duration,
177    version_handshake: VersionHandshakeState,
178    verack: VerackState,
179    addr_state: AddrGossipState,
180    sent_txs: HashSet<Wtxid>,
181    timed_message_state: HashMap<TimeSensitiveId, Instant>,
182    ping_state: PingState,
183}
184
185impl MessageState {
186    fn new(general_timeout: Duration) -> Self {
187        Self {
188            general_timeout,
189            version_handshake: Default::default(),
190            verack: Default::default(),
191            addr_state: Default::default(),
192            sent_txs: Default::default(),
193            timed_message_state: Default::default(),
194            ping_state: PingState::default(),
195        }
196    }
197
198    fn start_version_handshake(&mut self) {
199        self.version_handshake = self.version_handshake.start();
200    }
201
202    fn finish_version_handshake(&mut self) {
203        self.version_handshake = self.version_handshake.finish();
204    }
205
206    fn sent_tx(&mut self, wtxid: Wtxid) {
207        self.sent_txs.insert(wtxid);
208    }
209
210    fn unknown_rejection(&mut self, wtxid: Wtxid) -> bool {
211        !self.sent_txs.remove(&wtxid)
212    }
213
214    fn unresponsive(&self) -> bool {
215        self.timed_message_state
216            .values()
217            .any(|time| time.elapsed() > self.general_timeout)
218            || self.version_handshake.is_unresponsive(self.general_timeout)
219    }
220}
221
222#[derive(Debug, Clone, Copy, Default)]
223enum VersionHandshakeState {
224    #[default]
225    NotStarted,
226    Started {
227        at: tokio::time::Instant,
228    },
229    Completed,
230}
231
232impl VersionHandshakeState {
233    fn start(self) -> Self {
234        Self::Started {
235            at: tokio::time::Instant::now(),
236        }
237    }
238
239    fn finish(self) -> Self {
240        Self::Completed
241    }
242
243    fn is_complete(&self) -> bool {
244        matches!(self, Self::Completed)
245    }
246
247    fn is_unresponsive(&self, timeout: Duration) -> bool {
248        match self {
249            Self::Started { at } => at.elapsed() > timeout,
250            _ => false,
251        }
252    }
253}
254
255#[derive(Debug, Clone, Copy, Default)]
256struct VerackState {
257    got_ack: bool,
258    sent_ack: bool,
259}
260
261impl VerackState {
262    fn got_ack(&mut self) {
263        self.got_ack = true
264    }
265
266    fn sent_ack(&mut self) {
267        self.sent_ack = true
268    }
269
270    fn both_acks(&self) -> bool {
271        self.got_ack && self.sent_ack
272    }
273}
274
275#[derive(Debug, Clone, Copy, Default)]
276struct AddrGossipState {
277    num_advertised: usize,
278    gossip_stage: AddrGossipStages,
279}
280
281impl AddrGossipState {
282    fn received(&mut self, num_addrs: usize) {
283        self.num_advertised += num_addrs;
284    }
285
286    fn first_gossip(&mut self) {
287        self.gossip_stage = AddrGossipStages::RandomGossip;
288    }
289
290    fn over_limit(&self) -> bool {
291        self.num_advertised > ADDR_HARD_LIMIT
292    }
293}
294
295// Network address gossip occurs in multiple stages. First, we will send a `getaddr` message to
296// inform the peer that we want to know about nodes they are aware of. Oftentimes this will result
297// in a message containing 250-300 potential peers. Thereafter, the remote node will randomly send
298// 1-5 potential peers throughout the duration of the connection.
299#[derive(Debug, Clone, Copy, Default)]
300enum AddrGossipStages {
301    #[default]
302    NotReceived,
303    RandomGossip,
304}
305
306#[derive(Debug, Clone, Copy)]
307enum PingState {
308    WaitingFor { nonce: u64 },
309    LastMessageReceied { then: Instant },
310}
311
312impl PingState {
313    fn send_ping(&mut self) -> Option<u64> {
314        match self {
315            Self::WaitingFor { nonce: _ } => None,
316            Self::LastMessageReceied { then } => {
317                if then.elapsed() > SEND_PING {
318                    let nonce = rand::random();
319                    *self = Self::WaitingFor { nonce };
320                    Some(nonce)
321                } else {
322                    None
323                }
324            }
325        }
326    }
327
328    fn check_pong(&mut self, pong: u64) -> bool {
329        match self {
330            Self::WaitingFor { nonce } => {
331                if pong.eq(&*nonce) {
332                    *self = Self::LastMessageReceied {
333                        then: Instant::now(),
334                    };
335                    true
336                } else {
337                    false
338                }
339            }
340            Self::LastMessageReceied { then: _ } => false,
341        }
342    }
343
344    fn update_last_message(&mut self) {
345        match self {
346            Self::WaitingFor { nonce: _ } => (),
347            Self::LastMessageReceied { then: _ } => {
348                *self = Self::LastMessageReceied {
349                    then: Instant::now(),
350                }
351            }
352        }
353    }
354}
355
356impl Default for PingState {
357    fn default() -> Self {
358        Self::LastMessageReceied {
359            then: Instant::now(),
360        }
361    }
362}
363
364pub(crate) struct V1Header {
365    magic: Magic,
366    _command: CommandString,
367    length: u32,
368    _checksum: u32,
369}
370
371impl Decodable for V1Header {
372    fn consensus_decode<R: Read + ?Sized>(
373        reader: &mut R,
374    ) -> Result<Self, bitcoin::consensus::encode::Error> {
375        let magic = Magic::consensus_decode(reader)?;
376        let _command = CommandString::consensus_decode(reader)?;
377        let length = u32::consensus_decode(reader)?;
378        let _checksum = u32::consensus_decode(reader)?;
379        Ok(Self {
380            magic,
381            _command,
382            length,
383            _checksum,
384        })
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use std::{net::Ipv4Addr, time::Duration};
391
392    use bitcoin::{consensus::deserialize, p2p::address::AddrV2, Transaction};
393
394    use crate::{
395        network::{AddrGossipStages, LastBlockMonitor, MessageState, PingState},
396        prelude::Netgroup,
397    };
398
399    #[test]
400    fn test_sixteen() {
401        let peer = AddrV2::Ipv4(Ipv4Addr::new(95, 217, 198, 121));
402        assert_eq!("95.217".to_string(), peer.netgroup());
403    }
404
405    #[tokio::test(start_paused = true)]
406    async fn test_version_message_state() {
407        let timeout = Duration::from_secs(1);
408        let mut message_state = MessageState::new(timeout);
409        assert!(!message_state.unresponsive());
410        tokio::time::sleep(Duration::from_secs(2)).await;
411        assert!(!message_state.unresponsive());
412        message_state.start_version_handshake();
413        tokio::time::sleep(Duration::from_secs(2)).await;
414        assert!(message_state.unresponsive());
415        let mut message_state = MessageState::new(timeout);
416        message_state.start_version_handshake();
417        message_state.finish_version_handshake();
418        tokio::time::sleep(Duration::from_secs(2)).await;
419        assert!(!message_state.unresponsive());
420        assert!(message_state.version_handshake.is_complete());
421    }
422
423    #[test]
424    fn test_verack_state() {
425        let timeout = Duration::from_secs(1);
426        let mut messsage_state = MessageState::new(timeout);
427        messsage_state.version_handshake.start();
428        messsage_state.verack.got_ack();
429        assert!(!messsage_state.verack.both_acks());
430        messsage_state.verack.sent_ack();
431        assert!(messsage_state.verack.both_acks());
432    }
433
434    #[test]
435    fn test_tx_reject_state() {
436        let transaction: Transaction = deserialize(&hex::decode("0200000000010158e87a21b56daf0c23be8e7070456c336f7cbaa5c8757924f545887bb2abdd7501000000171600145f275f436b09a8cc9a2eb2a2f528485c68a56323feffffff02d8231f1b0100000017a914aed962d6654f9a2b36608eb9d64d2b260db4f1118700c2eb0b0000000017a914b7f5faf40e3d40a5a459b1db3535f2b72fa921e88702483045022100a22edcc6e5bc511af4cc4ae0de0fcd75c7e04d8c1c3a8aa9d820ed4b967384ec02200642963597b9b1bc22c75e9f3e117284a962188bf5e8a74c895089046a20ad770121035509a48eb623e10aace8bfd0212fdb8a8e5af3c94b0b133b95e114cab89e4f7965000000").unwrap()).unwrap();
437        let wtxid = transaction.compute_wtxid();
438        let mut message_state = MessageState::new(Duration::from_secs(2));
439        message_state.sent_tx(wtxid);
440        assert!(!message_state.unknown_rejection(wtxid));
441        assert!(message_state.unknown_rejection(wtxid));
442    }
443
444    #[test]
445    fn test_addr_gossip_state() {
446        let mut message_state = MessageState::new(Duration::from_secs(2));
447        assert!(matches!(
448            message_state.addr_state.gossip_stage,
449            AddrGossipStages::NotReceived
450        ));
451        message_state.addr_state.received(100);
452        message_state.addr_state.first_gossip();
453        assert!(matches!(
454            message_state.addr_state.gossip_stage,
455            AddrGossipStages::RandomGossip
456        ));
457        assert!(!message_state.addr_state.over_limit());
458        message_state.addr_state.received(10_000);
459        assert!(message_state.addr_state.over_limit());
460    }
461
462    #[tokio::test(start_paused = true)]
463    async fn test_ping_state() {
464        // Detect we need a ping
465        let mut ping_state = PingState::default();
466        assert!(ping_state.send_ping().is_none());
467        tokio::time::sleep(Duration::from_secs(60)).await;
468        assert!(ping_state.send_ping().is_none());
469        tokio::time::sleep(Duration::from_secs(70)).await;
470        assert!(ping_state.send_ping().is_some());
471        // Do not spam
472        assert!(ping_state.send_ping().is_none());
473        // We match pings and update the state correctly
474        let mut ping_state = PingState::default();
475        tokio::time::sleep(Duration::from_secs(60 * 3)).await;
476        let ping = ping_state.send_ping().unwrap();
477        tokio::time::sleep(Duration::from_secs(60 * 3)).await;
478        assert!(ping_state.check_pong(ping));
479        assert!(!ping_state.check_pong(ping));
480        assert!(ping_state.send_ping().is_none());
481        tokio::time::sleep(Duration::from_secs(60 * 3)).await;
482        assert!(ping_state.send_ping().is_some());
483        // Receiving a message without a `Pong` does not update the state
484        let mut ping_state = PingState::default();
485        tokio::time::sleep(Duration::from_secs(60 * 3)).await;
486        let ping = ping_state.send_ping().unwrap();
487        ping_state.update_last_message();
488        assert!(ping_state.check_pong(ping));
489        // Time updates properly
490        let mut ping_state = PingState::default();
491        assert!(ping_state.send_ping().is_none());
492        tokio::time::sleep(Duration::from_secs(60)).await;
493        assert!(ping_state.send_ping().is_none());
494        ping_state.update_last_message();
495        tokio::time::sleep(Duration::from_secs(70)).await;
496        assert!(ping_state.send_ping().is_none());
497    }
498
499    #[tokio::test(start_paused = true)]
500    async fn test_block_detected_stale() {
501        let mut last_block = LastBlockMonitor::new();
502        tokio::time::sleep(Duration::from_secs(60 * 40)).await;
503        // No blocks received yet.
504        assert!(!last_block.stale());
505        last_block.reset();
506        tokio::time::sleep(Duration::from_secs(60 * 20)).await;
507        // Has not been thirty minutes
508        assert!(!last_block.stale());
509        // Should get a block by now
510        tokio::time::sleep(Duration::from_secs(60 * 20)).await;
511        assert!(last_block.stale());
512        last_block.reset();
513        assert!(!last_block.stale());
514    }
515}