harlequinn 0.1.1

A real-time networking library primarily aimed at games.
Documentation
mod connection;
mod handshake;

pub use connection::ConnectionCommand;

use std::net::SocketAddr;

use {
    bytes::Bytes,
    crossbeam_channel::Sender,
    futures::StreamExt,
    quinn::{ClientConfigBuilder, Connection, Endpoint, EndpointDriver, Incoming},
    tokio::sync::{
        mpsc::{Receiver as TokioReceiver, Sender as TokioSender},
        oneshot::Sender as OneshotSender,
    },
};

use crate::{worker::connection::handle_connecting, Certificate, PeerId};

#[derive(Debug)]
pub enum WorkerCommand {
    Connect {
        server_addr: SocketAddr,
        server_name: String,
        certificate: Certificate,
        assigned_peer_id: PeerId,
    },
    Stop,
}

#[derive(Debug)]
pub enum WorkerEvent {
    /// A connection initiated by this endpoint has failed.
    ConnectionFailed {
        peer_id: PeerId,
    },
    /// The start of a new connection, negotiate a peer ID with the synchronous thread.
    ConnectionStarted {
        connection: Connection,
        /// The command channel through which the worker receives connection commands.
        connection_sender: TokioSender<ConnectionCommand>,
        /// Previously established peer ID if applicable.
        server_peer_id: Option<PeerId>,
        peer_id_sender: OneshotSender<PeerId>,
    },
    /// The connection handshake has succeeded, get the caller to confirm it.
    ConnectionRequested {
        peer_id: PeerId,
        socket_addr: SocketAddr,
        confirm_sender: OneshotSender<bool>,
    },
    Disconnected {
        peer_id: PeerId,
        reason: Option<String>,
    },
    ReceivedDatagram {
        peer_id: PeerId,
        bytes: Bytes,
    },
    ReceivedMessage {
        peer_id: PeerId,
        bytes: Bytes,
    },
    Stopped,
}

pub async fn driver_worker(driver: EndpointDriver, sender: Sender<WorkerEvent>) {
    let result = driver.await;

    sender.send(WorkerEvent::Stopped).unwrap();

    if let Err(error) = result {
        // TODO: Better error handling
        panic!("IO Error: {}", error);
    }
}

pub async fn network_worker(
    endpoint: Endpoint,
    incoming: Incoming,
    sender: Sender<WorkerEvent>,
    mut receiver: TokioReceiver<WorkerCommand>,
    protocol_checksum: u32,
    is_listening: bool,
) {
    // If we've got an open endpoint we're listening on, spawn the listen task
    if is_listening {
        tokio::spawn(listen_incoming(incoming, sender.clone(), protocol_checksum));
    }

    // Listen for commands
    while let Some(command) = receiver.recv().await {
        match command {
            WorkerCommand::Connect {
                server_addr,
                server_name,
                certificate,
                assigned_peer_id,
            } => {
                // Create the client config
                let mut config_builder = ClientConfigBuilder::default();
                config_builder
                    .add_certificate_authority(certificate)
                    .unwrap();
                let client_config = config_builder.build();

                // Start the connection attempt
                let connecting = endpoint
                    .connect_with(client_config, &server_addr, &server_name)
                    .unwrap();

                // Spawn a handler for this connection
                tokio::spawn(handle_connecting(
                    connecting,
                    sender.clone(),
                    Some(assigned_peer_id),
                    protocol_checksum,
                ));
            }
            WorkerCommand::Stop => {
                // The driver worker will send an event back when it's stopped
                endpoint.close(0u32.into(), b"Endpoint was dropped");
                return;
            }
        }
    }
}

async fn listen_incoming(
    mut incoming: Incoming,
    sender: Sender<WorkerEvent>,
    protocol_checksum: u32,
) {
    while let Some(connecting) = incoming.next().await {
        // Spawn a handler for this connection
        tokio::spawn(handle_connecting(
            connecting,
            sender.clone(),
            None,
            protocol_checksum,
        ));
    }
}