use std::{convert::TryFrom, fmt};
use log::info;
use koibumi_core::{
io::{ReadFromExact, SizedReadFromExact},
message::{self, Services},
packet::{CommandKind, Packet},
time::Time,
};
use crate::{
connection::Connection,
connection_loop::{Error, Result},
constant::MAX_TIME_OFFSET,
};
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum HandleError {
InvalidState,
InvalidProtocolVersion,
InvalidTimestamp,
NoCommonStream,
InvalidServices,
ConnectedToMyself,
ErrorReceived,
}
impl fmt::Display for HandleError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl std::error::Error for HandleError {}
async fn handle_version(conn: &mut Connection, message: message::Version) -> Result<()> {
let now = Time::now();
if message.version() < conn.ctx().config().core().protocol_version() {
conn.write_error(2, "Your is using an old protocol. Closing connection.")
.await?;
return Err(Error::from(HandleError::InvalidProtocolVersion));
}
match now.checked_add(MAX_TIME_OFFSET) {
Some(target) => {
if message.timestamp() > target {
conn.write_error(
2,
"Your time is too far in the future compared to mine. Closing connection.",
)
.await?;
return Err(Error::from(HandleError::InvalidTimestamp));
}
}
None => {
return Err(Error::from(HandleError::InvalidTimestamp));
}
}
match message.timestamp().checked_add(MAX_TIME_OFFSET) {
Some(target) => {
if now > target {
conn.write_error(
2,
"Your time is too far in the past compared to mine. Closing connection.",
)
.await?;
return Err(Error::from(HandleError::InvalidTimestamp));
}
}
None => {
return Err(Error::from(HandleError::InvalidTimestamp));
}
}
let streams = conn.common_stream_numbers(message.stream_numbers());
if streams.as_ref().is_empty() {
conn.write_error(
2,
"We don't have shared stream interests. Closing connection.",
)
.await?;
return Err(Error::from(HandleError::NoCommonStream));
}
if !message.services().contains(Services::NETWORK) {
return Err(Error::from(HandleError::InvalidServices));
}
if !conn.ctx().config().connect_to_myself() && message.nonce() == conn.ctx().node_nonce() {
conn.write_error(2, "I'm connected to myself. Closing connection.")
.await?;
return Err(Error::from(HandleError::ConnectedToMyself));
}
info!("User agent: {}", message.user_agent());
conn.write_verack().await?;
conn.write_version_if_not_sent().await?;
conn.set_peer_version(Some(message));
if conn.verack_received() {
conn.set_state_established().await?;
}
Ok(())
}
async fn handle_verack(conn: &mut Connection, _message: message::Verack) -> Result<()> {
if conn.version_received() {
conn.set_state_established().await?;
}
Ok(())
}
async fn handle_addr(conn: &mut Connection, message: message::Addr) -> Result<()> {
conn.add_addrs(message.as_ref()).await;
Ok(())
}
async fn handle_inv(conn: &mut Connection, message: message::Inv) -> Result<()> {
conn.add_inv_hashes(message.as_ref().to_vec()).await;
Ok(())
}
async fn handle_getdata(conn: &mut Connection, message: message::Getdata) -> Result<()> {
conn.send_objects(message.as_ref().to_vec()).await;
Ok(())
}
async fn handle_object(conn: &mut Connection, message: message::Object) -> Result<()> {
conn.add_object(message).await;
Ok(())
}
async fn handle_error(_conn: &mut Connection, message: message::Error) -> Result<()> {
info!(
"Error received: (fatal: {}) {}",
message.fatal(),
message.error_text()
);
Err(Error::from(HandleError::ErrorReceived))
}
async fn handle_ping(conn: &mut Connection, _message: message::Ping) -> Result<()> {
conn.write_pong().await?;
Ok(())
}
async fn handle_pong(_conn: &mut Connection, _message: message::Pong) -> Result<()> {
Ok(())
}
pub async fn dispatch_message(conn: &mut Connection, packet: Packet) -> Result<()> {
let header = packet.header();
let payload = packet.payload();
match CommandKind::try_from(header.command()) {
Ok(command) => match command {
CommandKind::Error => {
let message = message::Error::read_from_exact(&payload)?;
handle_error(conn, message).await?;
}
CommandKind::Getdata => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Getdata::read_from_exact(&payload)?;
handle_getdata(conn, message).await?;
}
CommandKind::Inv => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Inv::read_from_exact(&payload)?;
handle_inv(conn, message).await?;
}
CommandKind::Dinv => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
}
CommandKind::Object => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Object::sized_read_from_exact(&payload)?;
handle_object(conn, message).await?;
}
CommandKind::Addr => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Addr::read_from_exact(&payload)?;
handle_addr(conn, message).await?;
}
CommandKind::Portcheck => {
}
CommandKind::Ping => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Ping::sized_read_from_exact(&payload)?;
handle_ping(conn, message).await?;
}
CommandKind::Pong => {
if !conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Pong::sized_read_from_exact(&payload)?;
handle_pong(conn, message).await?;
}
CommandKind::Verack => {
if conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
if !conn.version_sent() {
return Err(Error::from(HandleError::InvalidState));
}
if conn.verack_received() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Verack::read_from_exact(&payload)?;
handle_verack(conn, message).await?;
conn.set_verack_received();
}
CommandKind::Version => {
if conn.established() {
return Err(Error::from(HandleError::InvalidState));
}
if conn.version_received() {
return Err(Error::from(HandleError::InvalidState));
}
let message = message::Version::read_from_exact(&payload)?;
handle_version(conn, message).await?;
conn.set_version_received();
}
},
Err(err) => info!("Unknown command: {}", err),
}
Ok(())
}