koibumi_node_sync/
message_handler.rs

1use std::{convert::TryFrom, fmt};
2
3use log::info;
4
5use koibumi_core::{
6    io::{ReadFromExact, SizedReadFromExact},
7    message::{self, Services},
8    packet::{CommandKind, Packet},
9    time::Time,
10};
11
12use crate::{
13    connection::Connection,
14    connection_loop::{Error, Result},
15    constant::MAX_TIME_OFFSET,
16};
17
18#[derive(Clone, PartialEq, Eq, Debug)]
19pub enum HandleError {
20    InvalidState,
21    InvalidProtocolVersion,
22    InvalidTimestamp,
23    NoCommonStream,
24    InvalidServices,
25    ConnectedToMyself,
26    ErrorReceived,
27}
28
29impl fmt::Display for HandleError {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        // TODO
32        write!(f, "{:?}", self)
33    }
34}
35
36impl std::error::Error for HandleError {}
37
38fn handle_version(conn: &mut Connection, message: message::Version) -> Result<()> {
39    let now = Time::now();
40
41    if message.version() < conn.ctx().config().core().protocol_version() {
42        conn.write_error(2, "Your is using an old protocol. Closing connection.")?;
43        return Err(Error::from(HandleError::InvalidProtocolVersion));
44    }
45
46    match now.checked_add(MAX_TIME_OFFSET) {
47        Some(target) => {
48            if message.timestamp() > target {
49                conn.write_error(
50                    2,
51                    "Your time is too far in the future compared to mine. Closing connection.",
52                )?;
53                return Err(Error::from(HandleError::InvalidTimestamp));
54            }
55        }
56        None => {
57            return Err(Error::from(HandleError::InvalidTimestamp));
58        }
59    }
60    match message.timestamp().checked_add(MAX_TIME_OFFSET) {
61        Some(target) => {
62            if now > target {
63                conn.write_error(
64                    2,
65                    "Your time is too far in the past compared to mine. Closing connection.",
66                )?;
67                return Err(Error::from(HandleError::InvalidTimestamp));
68            }
69        }
70        None => {
71            return Err(Error::from(HandleError::InvalidTimestamp));
72        }
73    }
74
75    let streams = conn.common_stream_numbers(message.stream_numbers());
76    if streams.as_ref().is_empty() {
77        conn.write_error(
78            2,
79            "We don't have shared stream interests. Closing connection.",
80        )?;
81        return Err(Error::from(HandleError::NoCommonStream));
82    }
83
84    if !message.services().contains(Services::NETWORK) {
85        return Err(Error::from(HandleError::InvalidServices));
86    }
87
88    // TODO check if server full when inbound
89
90    if !conn.ctx().config().connect_to_myself() && message.nonce() == conn.ctx().node_nonce() {
91        conn.write_error(2, "I'm connected to myself. Closing connection.")?;
92        return Err(Error::from(HandleError::ConnectedToMyself));
93    }
94
95    info!("User agent: {}", message.user_agent());
96
97    conn.write_verack()?;
98
99    conn.write_version_if_not_sent()?;
100
101    conn.set_peer_version(Some(message));
102    if conn.verack_received() {
103        conn.set_state_established()?;
104    }
105
106    Ok(())
107}
108
109fn handle_verack(conn: &mut Connection, _message: message::Verack) -> Result<()> {
110    if conn.version_received() {
111        conn.set_state_established()?;
112    }
113    Ok(())
114}
115
116fn handle_addr(conn: &mut Connection, message: message::Addr) -> Result<()> {
117    conn.add_addrs(message.as_ref());
118    Ok(())
119}
120
121fn handle_inv(conn: &mut Connection, message: message::Inv) -> Result<()> {
122    // TODO consume inv message into Vec
123    conn.add_inv_hashes(message.as_ref().to_vec());
124    Ok(())
125}
126
127fn handle_getdata(conn: &mut Connection, message: message::Getdata) -> Result<()> {
128    // TODO consume getdata message into Vec
129    conn.send_objects(message.as_ref().to_vec());
130    Ok(())
131}
132
133fn handle_object(conn: &mut Connection, message: message::Object) -> Result<()> {
134    conn.add_object(message);
135    Ok(())
136}
137
138fn handle_error(_conn: &mut Connection, message: message::Error) -> Result<()> {
139    info!(
140        "Error received: (fatal: {}) {}",
141        message.fatal(),
142        message.error_text()
143    );
144    Err(Error::from(HandleError::ErrorReceived))
145}
146
147fn handle_ping(conn: &mut Connection, _message: message::Ping) -> Result<()> {
148    conn.write_pong()?;
149    Ok(())
150}
151
152fn handle_pong(_conn: &mut Connection, _message: message::Pong) -> Result<()> {
153    // TODO check if ping was initiated
154    Ok(())
155}
156
157pub fn dispatch_message(conn: &mut Connection, packet: Packet) -> Result<()> {
158    let header = packet.header();
159    let payload = packet.payload();
160    match CommandKind::try_from(header.command()) {
161        Ok(command) => match command {
162            CommandKind::Error => {
163                let message = message::Error::read_from_exact(&payload)?;
164                handle_error(conn, message)?;
165            }
166            CommandKind::Getdata => {
167                if !conn.established() {
168                    return Err(Error::from(HandleError::InvalidState));
169                }
170                let message = message::Getdata::read_from_exact(&payload)?;
171                handle_getdata(conn, message)?;
172            }
173            CommandKind::Inv => {
174                if !conn.established() {
175                    return Err(Error::from(HandleError::InvalidState));
176                }
177                let message = message::Inv::read_from_exact(&payload)?;
178                handle_inv(conn, message)?;
179            }
180            CommandKind::Dinv => {
181                if !conn.established() {
182                    return Err(Error::from(HandleError::InvalidState));
183                }
184                // TODO dinv unimplemented
185                //let message = DinvMessage::read_from_exact(&payload)?;
186                //handle_dinv(conn, &message)?;
187            }
188            CommandKind::Object => {
189                if !conn.established() {
190                    return Err(Error::from(HandleError::InvalidState));
191                }
192                let message = message::Object::sized_read_from_exact(&payload)?;
193                handle_object(conn, message)?;
194            }
195            CommandKind::Addr => {
196                if !conn.established() {
197                    return Err(Error::from(HandleError::InvalidState));
198                }
199                let message = message::Addr::read_from_exact(&payload)?;
200                handle_addr(conn, message)?;
201            }
202            CommandKind::Portcheck => {
203                // TODO portcheck unimplemented
204                //if !conn.established() {
205                //    return Err(Error::from(HandleError::InvalidState));
206                //}
207                //let message = PortcheckMessage::read_from_exact(&payload)?;
208                //handle_addr(conn, &message)?;
209            }
210            CommandKind::Ping => {
211                if !conn.established() {
212                    return Err(Error::from(HandleError::InvalidState));
213                }
214                let message = message::Ping::sized_read_from_exact(&payload)?;
215                handle_ping(conn, message)?;
216            }
217            CommandKind::Pong => {
218                if !conn.established() {
219                    return Err(Error::from(HandleError::InvalidState));
220                }
221                let message = message::Pong::sized_read_from_exact(&payload)?;
222                handle_pong(conn, message)?;
223            }
224            CommandKind::Verack => {
225                if conn.established() {
226                    return Err(Error::from(HandleError::InvalidState));
227                }
228                if !conn.version_sent() {
229                    return Err(Error::from(HandleError::InvalidState));
230                }
231                if conn.verack_received() {
232                    return Err(Error::from(HandleError::InvalidState));
233                }
234                let message = message::Verack::read_from_exact(&payload)?;
235                handle_verack(conn, message)?;
236                conn.set_verack_received();
237            }
238            CommandKind::Version => {
239                if conn.established() {
240                    return Err(Error::from(HandleError::InvalidState));
241                }
242                if conn.version_received() {
243                    return Err(Error::from(HandleError::InvalidState));
244                }
245                let message = message::Version::read_from_exact(&payload)?;
246                handle_version(conn, message)?;
247                conn.set_version_received();
248            }
249        },
250        Err(err) => info!("Unknown command: {}", err),
251    }
252    Ok(())
253}