koibumi_node_sync/
message_handler.rs1use std::{convert::TryFrom, fmt};
2
3use log::info;
4
5use koibumi_core::{
6 io::{ReadFromExact, SizedReadFromExact},
7 message::{self, Services},
8 packet::{CommandKind, Packet},
9 time::Time,
10};
11
12use crate::{
13 connection::Connection,
14 connection_loop::{Error, Result},
15 constant::MAX_TIME_OFFSET,
16};
17
18#[derive(Clone, PartialEq, Eq, Debug)]
19pub enum HandleError {
20 InvalidState,
21 InvalidProtocolVersion,
22 InvalidTimestamp,
23 NoCommonStream,
24 InvalidServices,
25 ConnectedToMyself,
26 ErrorReceived,
27}
28
29impl fmt::Display for HandleError {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 write!(f, "{:?}", self)
33 }
34}
35
36impl std::error::Error for HandleError {}
37
38fn handle_version(conn: &mut Connection, message: message::Version) -> Result<()> {
39 let now = Time::now();
40
41 if message.version() < conn.ctx().config().core().protocol_version() {
42 conn.write_error(2, "Your is using an old protocol. Closing connection.")?;
43 return Err(Error::from(HandleError::InvalidProtocolVersion));
44 }
45
46 match now.checked_add(MAX_TIME_OFFSET) {
47 Some(target) => {
48 if message.timestamp() > target {
49 conn.write_error(
50 2,
51 "Your time is too far in the future compared to mine. Closing connection.",
52 )?;
53 return Err(Error::from(HandleError::InvalidTimestamp));
54 }
55 }
56 None => {
57 return Err(Error::from(HandleError::InvalidTimestamp));
58 }
59 }
60 match message.timestamp().checked_add(MAX_TIME_OFFSET) {
61 Some(target) => {
62 if now > target {
63 conn.write_error(
64 2,
65 "Your time is too far in the past compared to mine. Closing connection.",
66 )?;
67 return Err(Error::from(HandleError::InvalidTimestamp));
68 }
69 }
70 None => {
71 return Err(Error::from(HandleError::InvalidTimestamp));
72 }
73 }
74
75 let streams = conn.common_stream_numbers(message.stream_numbers());
76 if streams.as_ref().is_empty() {
77 conn.write_error(
78 2,
79 "We don't have shared stream interests. Closing connection.",
80 )?;
81 return Err(Error::from(HandleError::NoCommonStream));
82 }
83
84 if !message.services().contains(Services::NETWORK) {
85 return Err(Error::from(HandleError::InvalidServices));
86 }
87
88 if !conn.ctx().config().connect_to_myself() && message.nonce() == conn.ctx().node_nonce() {
91 conn.write_error(2, "I'm connected to myself. Closing connection.")?;
92 return Err(Error::from(HandleError::ConnectedToMyself));
93 }
94
95 info!("User agent: {}", message.user_agent());
96
97 conn.write_verack()?;
98
99 conn.write_version_if_not_sent()?;
100
101 conn.set_peer_version(Some(message));
102 if conn.verack_received() {
103 conn.set_state_established()?;
104 }
105
106 Ok(())
107}
108
109fn handle_verack(conn: &mut Connection, _message: message::Verack) -> Result<()> {
110 if conn.version_received() {
111 conn.set_state_established()?;
112 }
113 Ok(())
114}
115
116fn handle_addr(conn: &mut Connection, message: message::Addr) -> Result<()> {
117 conn.add_addrs(message.as_ref());
118 Ok(())
119}
120
121fn handle_inv(conn: &mut Connection, message: message::Inv) -> Result<()> {
122 conn.add_inv_hashes(message.as_ref().to_vec());
124 Ok(())
125}
126
127fn handle_getdata(conn: &mut Connection, message: message::Getdata) -> Result<()> {
128 conn.send_objects(message.as_ref().to_vec());
130 Ok(())
131}
132
133fn handle_object(conn: &mut Connection, message: message::Object) -> Result<()> {
134 conn.add_object(message);
135 Ok(())
136}
137
138fn handle_error(_conn: &mut Connection, message: message::Error) -> Result<()> {
139 info!(
140 "Error received: (fatal: {}) {}",
141 message.fatal(),
142 message.error_text()
143 );
144 Err(Error::from(HandleError::ErrorReceived))
145}
146
147fn handle_ping(conn: &mut Connection, _message: message::Ping) -> Result<()> {
148 conn.write_pong()?;
149 Ok(())
150}
151
152fn handle_pong(_conn: &mut Connection, _message: message::Pong) -> Result<()> {
153 Ok(())
155}
156
157pub fn dispatch_message(conn: &mut Connection, packet: Packet) -> Result<()> {
158 let header = packet.header();
159 let payload = packet.payload();
160 match CommandKind::try_from(header.command()) {
161 Ok(command) => match command {
162 CommandKind::Error => {
163 let message = message::Error::read_from_exact(&payload)?;
164 handle_error(conn, message)?;
165 }
166 CommandKind::Getdata => {
167 if !conn.established() {
168 return Err(Error::from(HandleError::InvalidState));
169 }
170 let message = message::Getdata::read_from_exact(&payload)?;
171 handle_getdata(conn, message)?;
172 }
173 CommandKind::Inv => {
174 if !conn.established() {
175 return Err(Error::from(HandleError::InvalidState));
176 }
177 let message = message::Inv::read_from_exact(&payload)?;
178 handle_inv(conn, message)?;
179 }
180 CommandKind::Dinv => {
181 if !conn.established() {
182 return Err(Error::from(HandleError::InvalidState));
183 }
184 }
188 CommandKind::Object => {
189 if !conn.established() {
190 return Err(Error::from(HandleError::InvalidState));
191 }
192 let message = message::Object::sized_read_from_exact(&payload)?;
193 handle_object(conn, message)?;
194 }
195 CommandKind::Addr => {
196 if !conn.established() {
197 return Err(Error::from(HandleError::InvalidState));
198 }
199 let message = message::Addr::read_from_exact(&payload)?;
200 handle_addr(conn, message)?;
201 }
202 CommandKind::Portcheck => {
203 }
210 CommandKind::Ping => {
211 if !conn.established() {
212 return Err(Error::from(HandleError::InvalidState));
213 }
214 let message = message::Ping::sized_read_from_exact(&payload)?;
215 handle_ping(conn, message)?;
216 }
217 CommandKind::Pong => {
218 if !conn.established() {
219 return Err(Error::from(HandleError::InvalidState));
220 }
221 let message = message::Pong::sized_read_from_exact(&payload)?;
222 handle_pong(conn, message)?;
223 }
224 CommandKind::Verack => {
225 if conn.established() {
226 return Err(Error::from(HandleError::InvalidState));
227 }
228 if !conn.version_sent() {
229 return Err(Error::from(HandleError::InvalidState));
230 }
231 if conn.verack_received() {
232 return Err(Error::from(HandleError::InvalidState));
233 }
234 let message = message::Verack::read_from_exact(&payload)?;
235 handle_verack(conn, message)?;
236 conn.set_verack_received();
237 }
238 CommandKind::Version => {
239 if conn.established() {
240 return Err(Error::from(HandleError::InvalidState));
241 }
242 if conn.version_received() {
243 return Err(Error::from(HandleError::InvalidState));
244 }
245 let message = message::Version::read_from_exact(&payload)?;
246 handle_version(conn, message)?;
247 conn.set_version_received();
248 }
249 },
250 Err(err) => info!("Unknown command: {}", err),
251 }
252 Ok(())
253}