koibumi-node 0.0.8

A Bitmessage node implementation as a library for Koibumi, an experimental Bitmessage client
Documentation
use std::{convert::TryFrom, fmt};

use log::info;

use koibumi_core::{
    io::{ReadFromExact, SizedReadFromExact},
    message::{self, Services},
    packet::{CommandKind, Packet},
    time::Time,
};

use crate::{
    connection::Connection,
    connection_loop::{Error, Result},
    constant::MAX_TIME_OFFSET,
};

#[derive(Clone, PartialEq, Eq, Debug)]
pub enum HandleError {
    InvalidState,
    InvalidProtocolVersion,
    InvalidTimestamp,
    NoCommonStream,
    InvalidServices,
    ConnectedToMyself,
    ErrorReceived,
}

impl fmt::Display for HandleError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // TODO
        write!(f, "{:?}", self)
    }
}

impl std::error::Error for HandleError {}

async fn handle_version(conn: &mut Connection, message: message::Version) -> Result<()> {
    let now = Time::now();

    if message.version() < conn.ctx().config().core().protocol_version() {
        conn.write_error(2, "Your is using an old protocol. Closing connection.")
            .await?;
        return Err(Error::from(HandleError::InvalidProtocolVersion));
    }

    match now.checked_add(MAX_TIME_OFFSET) {
        Some(target) => {
            if message.timestamp() > target {
                conn.write_error(
                    2,
                    "Your time is too far in the future compared to mine. Closing connection.",
                )
                .await?;
                return Err(Error::from(HandleError::InvalidTimestamp));
            }
        }
        None => {
            return Err(Error::from(HandleError::InvalidTimestamp));
        }
    }
    match message.timestamp().checked_add(MAX_TIME_OFFSET) {
        Some(target) => {
            if now > target {
                conn.write_error(
                    2,
                    "Your time is too far in the past compared to mine. Closing connection.",
                )
                .await?;
                return Err(Error::from(HandleError::InvalidTimestamp));
            }
        }
        None => {
            return Err(Error::from(HandleError::InvalidTimestamp));
        }
    }

    let streams = conn.common_stream_numbers(message.stream_numbers());
    if streams.as_ref().is_empty() {
        conn.write_error(
            2,
            "We don't have shared stream interests. Closing connection.",
        )
        .await?;
        return Err(Error::from(HandleError::NoCommonStream));
    }

    if !message.services().contains(Services::NETWORK) {
        return Err(Error::from(HandleError::InvalidServices));
    }

    // TODO check if server full when inbound

    if !conn.ctx().config().connect_to_myself() && message.nonce() == conn.ctx().node_nonce() {
        conn.write_error(2, "I'm connected to myself. Closing connection.")
            .await?;
        return Err(Error::from(HandleError::ConnectedToMyself));
    }

    info!("User agent: {}", message.user_agent());

    conn.write_verack().await?;

    conn.write_version_if_not_sent().await?;

    conn.set_peer_version(Some(message));
    if conn.verack_received() {
        conn.set_state_established().await?;
    }

    Ok(())
}

async fn handle_verack(conn: &mut Connection, _message: message::Verack) -> Result<()> {
    if conn.version_received() {
        conn.set_state_established().await?;
    }
    Ok(())
}

async fn handle_addr(conn: &mut Connection, message: message::Addr) -> Result<()> {
    conn.add_addrs(message.as_ref()).await;
    Ok(())
}

async fn handle_inv(conn: &mut Connection, message: message::Inv) -> Result<()> {
    // TODO consume inv message into Vec
    conn.add_inv_hashes(message.as_ref().to_vec()).await;
    Ok(())
}

async fn handle_getdata(conn: &mut Connection, message: message::Getdata) -> Result<()> {
    // TODO consume getdata message into Vec
    conn.send_objects(message.as_ref().to_vec()).await;
    Ok(())
}

async fn handle_object(conn: &mut Connection, message: message::Object) -> Result<()> {
    conn.add_object(message).await;
    Ok(())
}

async fn handle_error(_conn: &mut Connection, message: message::Error) -> Result<()> {
    info!(
        "Error received: (fatal: {}) {}",
        message.fatal(),
        message.error_text()
    );
    Err(Error::from(HandleError::ErrorReceived))
}

async fn handle_ping(conn: &mut Connection, _message: message::Ping) -> Result<()> {
    conn.write_pong().await?;
    Ok(())
}

async fn handle_pong(_conn: &mut Connection, _message: message::Pong) -> Result<()> {
    // TODO check if ping was initiated
    Ok(())
}

pub async fn dispatch_message(conn: &mut Connection, packet: Packet) -> Result<()> {
    let header = packet.header();
    let payload = packet.payload();
    match CommandKind::try_from(header.command()) {
        Ok(command) => match command {
            CommandKind::Error => {
                let message = message::Error::read_from_exact(&payload)?;
                handle_error(conn, message).await?;
            }
            CommandKind::Getdata => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Getdata::read_from_exact(&payload)?;
                handle_getdata(conn, message).await?;
            }
            CommandKind::Inv => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Inv::read_from_exact(&payload)?;
                handle_inv(conn, message).await?;
            }
            CommandKind::Dinv => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                // TODO dinv unimplemented
                //let message = DinvMessage::read_from_exact(&payload)?;
                //handle_dinv(conn, &message).await?;
            }
            CommandKind::Object => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Object::sized_read_from_exact(&payload)?;
                handle_object(conn, message).await?;
            }
            CommandKind::Addr => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Addr::read_from_exact(&payload)?;
                handle_addr(conn, message).await?;
            }
            CommandKind::Portcheck => {
                // TODO portcheck unimplemented
                //if !conn.established() {
                //    return Err(Error::from(HandleError::InvalidState));
                //}
                //let message = PortcheckMessage::read_from_exact(&payload)?;
                //handle_addr(conn, &message).await?;
            }
            CommandKind::Ping => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Ping::sized_read_from_exact(&payload)?;
                handle_ping(conn, message).await?;
            }
            CommandKind::Pong => {
                if !conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Pong::sized_read_from_exact(&payload)?;
                handle_pong(conn, message).await?;
            }
            CommandKind::Verack => {
                if conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                if !conn.version_sent() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                if conn.verack_received() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Verack::read_from_exact(&payload)?;
                handle_verack(conn, message).await?;
                conn.set_verack_received();
            }
            CommandKind::Version => {
                if conn.established() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                if conn.version_received() {
                    return Err(Error::from(HandleError::InvalidState));
                }
                let message = message::Version::read_from_exact(&payload)?;
                handle_version(conn, message).await?;
                conn.set_version_received();
            }
        },
        Err(err) => info!("Unknown command: {}", err),
    }
    Ok(())
}