peerlink 0.10.0

Low-level P2P Network Client
Documentation
use std::net::SocketAddr;

use peerlink::{Command, Config, Event};

// This example consists of a client and server. The server waits for inbound connections from
// clients and replies to client pings with pongs.
//
// Start the server with `cargo run --example pingpong server` and then start any number of
// clients with `cargo run --example pingpong client`.

// First we define a set of messages describing our protocol.
#[derive(Debug)]
enum Message {
    Ping(u64),
    Pong(u64),
}

// Then we define the codec for the messages we plan on sending and receiving. The encoding is
// entirely arbitrary.
impl peerlink::Message for Message {
    const MAX_SIZE: usize = 12;

    fn encode(&self, dest: &mut impl std::io::Write) -> usize {
        let (msg_type, value) = match &self {
            Message::Ping(p) => (b"ping", p),
            Message::Pong(p) => (b"pong", p),
        };

        let mut written = 0;
        written += dest.write(msg_type).unwrap();
        written += dest.write(&value.to_le_bytes()).unwrap();
        written
    }

    fn decode(buffer: &[u8]) -> Result<(Self, usize), peerlink::DecodeError> {
        const VALUE_OFFSET: usize = 4;
        const MSG_LEN: usize = 12;

        if buffer.len() >= MSG_LEN {
            let value = u64::from_le_bytes(buffer[VALUE_OFFSET..MSG_LEN].try_into().unwrap());

            match &buffer[0..4] {
                b"ping" => Ok((Message::Ping(value), MSG_LEN)),
                b"pong" => Ok((Message::Pong(value), MSG_LEN)),
                _ => Err(peerlink::DecodeError::MalformedMessage),
            }
        } else {
            Err(peerlink::DecodeError::NotEnoughData)
        }
    }
}

// The server consists of a reactor listening to events in a loop and replying to pings with pongs.
fn server() -> std::io::Result<()> {
    // Define the listen address for inbound peers to connect to.
    let bind_addr = "127.0.0.1:8080".parse().unwrap();
    println!("Server: starting to listen on address {}", bind_addr);

    // Create the reactor and get its handle.
    let handle = peerlink::run::<_>(Config {
        bind_addr: vec![bind_addr],
        ..Default::default()
    })?;

    // Start processing events.
    loop {
        match handle.receive_blocking().unwrap() {
            Event::ConnectedFrom { peer, addr, .. } => {
                println!("Inbound peer connect: peer_id={} ip={}", peer, addr);
            }

            Event::Disconnected { peer, reason } => {
                println!(
                    "Inbound peer disconnect: peer_id={}, reason={:?}",
                    peer, reason
                );
            }

            Event::Message {
                peer,
                message,
                size,
            } => match message {
                Message::Ping(p) => {
                    assert_eq!(size, 12);
                    println!("Incoming ping: peer={}, value={}", peer, p);
                    handle.send(Command::Message(peer, Message::Pong(p)))?;
                }
                Message::Pong(p) => {
                    assert_eq!(size, 12);
                    println!("Incoming pong: peer={}, value={}", peer, p);
                }
            },

            _ => {}
        }
    }
}

// The client consists of a reactor connecting to a server and periodically sending out pings until
// the server disappears.
fn client() -> std::io::Result<()> {
    // Create the reactor and get its handle.
    let handle = peerlink::run(Config::default())?;

    // Connect to our server.
    let server_addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    handle.send(Command::connect(server_addr))?;

    let peer_id = match handle.receive_blocking()? {
        Event::ConnectedTo {
            target,
            result: Ok(peer_id),
        } if target == server_addr.into() => {
            println!("Connected to server at {}", target);
            peer_id
        }
        event => panic!("Unexpected event: {:?}", event),
    };

    let mut ping = 0;

    loop {
        std::thread::sleep(std::time::Duration::from_secs(5));

        handle.send(Command::Message(peer_id, Message::Ping(ping)))?;
        println!("Sending a ping: value={}", ping);

        match handle.receive_blocking()? {
            Event::Message {
                message: Message::Pong(pong),
                ..
            } => {
                println!("Received a pong: value={}", pong);
            }

            Event::Disconnected { .. } => {
                println!("The server has disconnected, exiting.");
                break Ok(());
            }

            event => panic!("Unexpected event: {:?}", event),
        }

        ping += 1;
    }
}

fn main() -> std::io::Result<()> {
    env_logger::init();

    match std::env::args().nth(1).as_deref() {
        Some("client") => client(),
        Some("server") => server(),
        _ => {
            eprintln!("The first arg must be either 'client' or 'server'");
            Ok(())
        }
    }
}