pea2pea 0.29.0

A small library allowing simple and quick creation of custom P2P nodes and networks.
Documentation
mod common;

use tokio::time::sleep;
use tracing::*;
use tracing_subscriber::filter::LevelFilter;

use pea2pea::{
    protocols::{Handshake, Reading, Writing},
    Config, Connection, ConnectionSide, Node, Pea2Pea,
};

use std::{io, net::SocketAddr, time::Duration};

#[derive(Clone)]
struct JoJoNode(Node);

impl Pea2Pea for JoJoNode {
    fn node(&self) -> &Node {
        &self.0
    }
}

#[async_trait::async_trait]
impl Handshake for JoJoNode {
    async fn perform_handshake(&self, conn: Connection) -> io::Result<Connection> {
        // some handshakes are useful, others are menacing ゴゴゴゴ
        match !conn.side {
            ConnectionSide::Initiator => {
                info!(parent: self.node().span(), "Dio!");
                sleep(Duration::from_secs(4)).await;
                info!(parent: self.node().span(), "I can't beat the shit out of you without getting closer.");
                sleep(Duration::from_secs(3)).await;
            }
            ConnectionSide::Responder => {
                sleep(Duration::from_secs(1)).await;
                warn!(parent: self.node().span(), "Oh, you're approaching me? Instead of running away, you're coming right to me?");
                sleep(Duration::from_secs(6)).await;
                warn!(parent: self.node().span(), "Oh ho! Then come as close as you like.");
            }
        }

        Ok(conn)
    }
}

#[derive(Debug, PartialEq, Clone, Copy)]
enum BattleCry {
    Ora = 0,
    Muda,
}

impl From<u8> for BattleCry {
    fn from(byte: u8) -> Self {
        match byte {
            0 => BattleCry::Ora,
            1 => BattleCry::Muda,
            _ => unreachable!(),
        }
    }
}

#[async_trait::async_trait]
impl Reading for JoJoNode {
    type Message = BattleCry;

    fn read_message<R: io::Read>(
        &self,
        _source: SocketAddr,
        reader: &mut R,
    ) -> io::Result<Option<Self::Message>> {
        let mut arr = [0u8; 1];
        reader.read_exact(&mut arr)?;
        let battle_cry = BattleCry::from(arr[0]);

        Ok(Some(battle_cry))
    }

    async fn process_message(
        &self,
        source: SocketAddr,
        battle_cry: Self::Message,
    ) -> io::Result<()> {
        let reply = match battle_cry {
            BattleCry::Ora => BattleCry::Muda,
            BattleCry::Muda => BattleCry::Ora,
        };

        self.send_direct_message(source, reply)
    }
}

impl Writing for JoJoNode {
    type Message = BattleCry;

    fn write_message<W: io::Write>(
        &self,
        _: SocketAddr,
        payload: &Self::Message,
        writer: &mut W,
    ) -> io::Result<()> {
        writer.write_all(&[*payload as u8])?;

        if *payload == BattleCry::Ora {
            info!(parent: self.node().span(), "{:?}!", payload);
        } else {
            warn!(parent: self.node().span(), "{:?}!", payload);
        };

        Ok(())
    }
}

#[tokio::main]
async fn main() {
    common::start_logger(LevelFilter::INFO);

    let config = Config {
        name: Some("Jotaro".into()),
        max_handshake_time_ms: 10_000,
        ..Default::default()
    };
    let jotaro = JoJoNode(Node::new(Some(config)).await.unwrap());

    let config = Config {
        name: Some("Dio".into()),
        max_handshake_time_ms: 10_000,
        ..Default::default()
    };
    let dio = JoJoNode(Node::new(Some(config)).await.unwrap());
    let dio_addr = dio.node().listening_addr().unwrap();

    for node in &[&jotaro, &dio] {
        node.enable_handshake();
        node.enable_reading();
        node.enable_writing();
    }

    jotaro.node().connect(dio_addr).await.unwrap();

    sleep(Duration::from_secs(3)).await;

    jotaro
        .send_direct_message(dio_addr, BattleCry::Ora)
        .unwrap();

    sleep(Duration::from_secs(3)).await;
}