nightshade-api 0.45.0

Procedural high level API for the nightshade game engine
Documentation
//! Multiplayer pubsub over the hearsay broker. One peer [`host`]s a session, the
//! rest [`join`] it by address, and they exchange messages on named topics.
//!
//! Hearsay is async, the game loop is not, so a [`Network`] owns a background
//! tokio runtime and bridges it to the frame with channels: [`publish`] hands a
//! message off without blocking, and [`drain_messages`] collects whatever has
//! arrived since the last frame. Hold the [`Network`] for the session's life;
//! dropping it tears the connection and runtime down.

use nightshade::prelude::tracing;
use serde::Serialize;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};

enum Outbound {
    Publish { topic: String, payload: String },
    PublishBytes { topic: String, bytes: Vec<u8> },
    Subscribe(Vec<String>),
    Unsubscribe(Vec<String>),
}

enum Inbound {
    Connected { client_id: String },
    Disconnected,
    Message(NetMessage),
}

/// A message received from a subscribed topic. `payload` is the text form sent
/// by [`publish`] or [`publish_json`], `bytes` is set instead for
/// [`publish_bytes`].
#[derive(Debug, Clone)]
pub struct NetMessage {
    pub topic: String,
    pub payload: String,
    pub bytes: Option<Vec<u8>>,
}

/// A live connection to a hearsay session. Create one with [`host`] or [`join`].
pub struct Network {
    outbound: UnboundedSender<Outbound>,
    inbound: UnboundedReceiver<Inbound>,
    connected: bool,
    client_id: String,
}

enum Role {
    Host {
        broker_address: String,
        websocket_address: Option<String>,
    },
    Join {
        broker_address: String,
    },
}

/// Hosts a session: starts a broker bound to `address`, also listening for
/// browser peers on the next port, and connects this peer to it.
pub fn host(address: &str) -> Network {
    let websocket_address = next_port(address);
    start(Role::Host {
        broker_address: address.to_string(),
        websocket_address,
    })
}

/// Joins a session hosted elsewhere at `address` (`"host:port"`).
pub fn join(address: &str) -> Network {
    start(Role::Join {
        broker_address: address.to_string(),
    })
}

/// Subscribes to topics so their messages arrive through [`drain_messages`].
pub fn subscribe(network: &Network, topics: &[&str]) {
    let topics = topics.iter().map(|topic| topic.to_string()).collect();
    let _ = network.outbound.send(Outbound::Subscribe(topics));
}

/// Stops receiving the given topics.
pub fn unsubscribe(network: &Network, topics: &[&str]) {
    let topics = topics.iter().map(|topic| topic.to_string()).collect();
    let _ = network.outbound.send(Outbound::Unsubscribe(topics));
}

/// Publishes any serializable value to a topic as JSON.
pub fn publish<T: Serialize>(network: &Network, topic: &str, payload: &T) {
    match serde_json::to_string(payload) {
        Ok(message) => publish_json(network, topic, &message),
        Err(error) => tracing::error!("Failed to serialize network payload: {error}"),
    }
}

/// Publishes a JSON string to a topic.
pub fn publish_json(network: &Network, topic: &str, json: &str) {
    let _ = network.outbound.send(Outbound::Publish {
        topic: topic.to_string(),
        payload: json.to_string(),
    });
}

/// Publishes raw bytes to a topic.
pub fn publish_bytes(network: &Network, topic: &str, bytes: &[u8]) {
    let _ = network.outbound.send(Outbound::PublishBytes {
        topic: topic.to_string(),
        bytes: bytes.to_vec(),
    });
}

/// Collects every message that arrived since the last call, and folds in
/// connection state so [`is_connected`] stays current. Call once per frame.
pub fn drain_network(network: &mut Network) -> Vec<NetMessage> {
    let mut messages = Vec::new();
    while let Ok(event) = network.inbound.try_recv() {
        match event {
            Inbound::Connected { client_id } => {
                network.connected = true;
                network.client_id = client_id;
            }
            Inbound::Disconnected => network.connected = false,
            Inbound::Message(message) => messages.push(message),
        }
    }
    messages
}

/// Whether the peer is currently connected to the session.
pub fn is_connected(network: &Network) -> bool {
    network.connected
}

/// This peer's hearsay client id, assigned on connect.
pub fn client_id(network: &Network) -> &str {
    &network.client_id
}

fn next_port(address: &str) -> Option<String> {
    let (host, port) = address.rsplit_once(':')?;
    let port: u16 = port.parse().ok()?;
    Some(format!("{host}:{}", port.checked_add(1)?))
}

fn start(role: Role) -> Network {
    let (outbound_sender, outbound_receiver) = unbounded_channel();
    let (inbound_sender, inbound_receiver) = unbounded_channel();
    std::thread::spawn(move || run_runtime(role, outbound_receiver, inbound_sender));
    Network {
        outbound: outbound_sender,
        inbound: inbound_receiver,
        connected: false,
        client_id: String::new(),
    }
}

fn run_runtime(
    role: Role,
    outbound: UnboundedReceiver<Outbound>,
    inbound: UnboundedSender<Inbound>,
) {
    let runtime = match tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
    {
        Ok(runtime) => runtime,
        Err(error) => {
            tracing::error!("Failed to start network runtime: {error}");
            return;
        }
    };
    runtime.block_on(runtime_loop(role, outbound, inbound));
}

async fn runtime_loop(
    role: Role,
    mut outbound: UnboundedReceiver<Outbound>,
    inbound: UnboundedSender<Inbound>,
) {
    let broker_address = match &role {
        Role::Host {
            broker_address,
            websocket_address,
        } => match hearsay::start_broker(broker_address).await {
            Ok(broker) => {
                if let Some(websocket_address) = websocket_address
                    && let Err(error) =
                        hearsay::start_websocket_listener(&broker, websocket_address).await
                {
                    tracing::warn!("Failed to start websocket listener: {error}");
                }
                std::mem::forget(broker);
                broker_address.clone()
            }
            Err(error) => {
                tracing::error!("Failed to host session: {error}");
                return;
            }
        },
        Role::Join { broker_address } => broker_address.clone(),
    };

    let mut client = hearsay::create_client("nightshade", hearsay::ClientSettings::default());
    if let Err(error) = hearsay::connect(&mut client, &broker_address).await {
        tracing::error!("Failed to connect to session: {error}");
        return;
    }
    let client_id = hearsay::client_id(&client).await;
    let _ = inbound.send(Inbound::Connected { client_id });

    let mut connected = true;
    loop {
        if connected {
            tokio::select! {
                command = outbound.recv() => match command {
                    Some(command) => apply(command, &mut client).await,
                    None => break,
                },
                message = hearsay::next_message(&mut client) => match message {
                    Some(message) => {
                        let _ = inbound.send(Inbound::Message(NetMessage {
                            topic: message.topic,
                            payload: message.payload,
                            bytes: message.bytes,
                        }));
                    }
                    None => {
                        connected = false;
                        let _ = inbound.send(Inbound::Disconnected);
                    }
                },
            }
        } else {
            tokio::select! {
                command = outbound.recv() => match command {
                    Some(command) => apply(command, &mut client).await,
                    None => break,
                },
                _ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {
                    if hearsay::is_connected(&client).await {
                        connected = true;
                        let client_id = hearsay::client_id(&client).await;
                        let _ = inbound.send(Inbound::Connected { client_id });
                    }
                }
            }
        }
    }
}

async fn apply(command: Outbound, client: &mut hearsay::Client) {
    match command {
        Outbound::Publish { topic, payload } => {
            let _ = hearsay::publish_json(client, &topic, &payload, hearsay::Route::Global).await;
        }
        Outbound::PublishBytes { topic, bytes } => {
            let _ = hearsay::publish_bytes(client, &topic, &bytes, hearsay::Route::Global).await;
        }
        Outbound::Subscribe(topics) => {
            let topics: Vec<&str> = topics.iter().map(String::as_str).collect();
            let _ = hearsay::subscribe(client, &topics).await;
        }
        Outbound::Unsubscribe(topics) => {
            let topics: Vec<&str> = topics.iter().map(String::as_str).collect();
            let _ = hearsay::unsubscribe(client, &topics).await;
        }
    }
}