llam 0.1.1

Safe, Go-style Rust bindings for the LLAM runtime
// Run from the LLAM-rs repository root:
//
//     cargo run -p llam --example chat_server -- 7777
//
// Or use the local Cargo alias:
//
//     cargo chat-server 7777
//
// This file is a Cargo example target. `rustc chat_server.rs` does not know how
// to resolve the `llam` crate or link the LLAM C runtime by itself.

use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

const DEFAULT_PORT: u16 = 7777;
const OUTBOX_CAP: usize = 64;
const READ_BUF: usize = 2048;
const INPUT_CAP: usize = 4096;
const MAX_BROADCAST_TARGETS: usize = 1024;

type Message = Arc<Vec<u8>>;

#[derive(Clone)]
struct ClientEntry {
    id: u64,
    tx: llam::channel::Sender<Message>,
}

struct ChatServer {
    clients: Mutex<HashMap<u64, ClientEntry>>,
    next_id: AtomicU64,
    quiet: bool,
}

fn main() {
    let config = Config::parse().unwrap_or_else(|error| {
        eprintln!("{error}");
        std::process::exit(2);
    });

    llam::Runtime::builder()
        .profile(llam::Profile::IoLatency)
        .lockfree_normq(true)
        .run(move || run_chat(config))
        .expect("LLAM chat server failed");
}

fn run_chat(config: Config) -> llam::Result<()> {
    let bind_addr = if config.public_bind {
        format!("0.0.0.0:{}", config.port)
    } else {
        format!("127.0.0.1:{}", config.port)
    };
    let listener = llam::net::TcpListener::bind(&bind_addr).map_err(llam::Error::from)?;
    let server = Arc::new(ChatServer {
        clients: Mutex::new(HashMap::new()),
        next_id: AtomicU64::new(1),
        quiet: config.quiet,
    });

    if !config.quiet {
        println!("LLAM chat server listening on {bind_addr}");
    }

    loop {
        let (stream, peer) = listener.accept().map_err(llam::Error::from)?;
        accept_client(Arc::clone(&server), stream, peer)?;
    }
}

fn accept_client(
    server: Arc<ChatServer>,
    stream: llam::net::TcpStream,
    peer: Option<SocketAddr>,
) -> llam::Result<()> {
    let id = server.next_id.fetch_add(1, Ordering::Relaxed);
    let peer = peer
        .map(|addr| addr.to_string())
        .unwrap_or_else(|| "unknown".to_string());
    let (tx, rx) = llam::channel::bounded::<Message>(OUTBOX_CAP)?;
    let writer_stream = stream.try_clone().map_err(llam::Error::from)?;

    {
        let mut clients = server.clients.lock().expect("client table poisoned");
        clients.insert(id, ClientEntry { id, tx: tx.clone() });
    }

    let writer = llam::try_spawn!(stack = llam::StackClass::Huge, move {
        writer_task(writer_stream, rx);
    })?;
    writer.detach()?;

    let reader_server = Arc::clone(&server);
    let reader = llam::try_spawn!(stack = llam::StackClass::Huge, move {
        reader_task(reader_server, id, stream);
    })?;
    reader.detach()?;

    if !server.quiet {
        println!("client {id} connected from {peer}");
    }
    queue_text(&tx, "Welcome to LLAM chat. Type and press enter.\n");
    broadcast_system(&server, id, "joined");
    Ok(())
}

fn writer_task(mut stream: llam::net::TcpStream, rx: llam::channel::Receiver<Message>) {
    while let Ok(Some(message)) = rx.recv_option() {
        if stream.write_all(message.as_ref().as_slice()).is_err() {
            break;
        }
    }
}

fn reader_task(server: Arc<ChatServer>, id: u64, mut stream: llam::net::TcpStream) {
    let mut buf = [0u8; READ_BUF];
    let mut input = Vec::with_capacity(INPUT_CAP);

    loop {
        match stream.read(&mut buf) {
            Ok(0) => break,
            Ok(n) => process_input(&server, id, &mut input, &buf[..n]),
            Err(_) => break,
        }
    }

    if !input.is_empty() {
        broadcast_line(&server, id, &input);
        input.clear();
    }
    broadcast_system(&server, id, "left");

    if let Some(client) = remove_client(&server, id) {
        let _ = client.tx.close();
    }
}

fn process_input(server: &ChatServer, id: u64, input: &mut Vec<u8>, data: &[u8]) {
    let mut off = 0usize;
    while off < data.len() {
        if input.is_empty() {
            if let Some(span) = complete_line_span(&data[off..]) {
                broadcast_line(server, id, &data[off..off + span]);
                off += span;
                continue;
            }
        }

        let space = INPUT_CAP.saturating_sub(input.len());
        let copy_len = (data.len() - off).min(space);
        input.extend_from_slice(&data[off..off + copy_len]);
        off += copy_len;

        if let Some(span) = complete_line_span(input) {
            let tail = input.split_off(span);
            broadcast_line(server, id, input);
            *input = tail;
        } else if input.len() == INPUT_CAP {
            broadcast_line(server, id, input);
            input.clear();
        }

        if copy_len == 0 {
            off = data.len();
        }
    }
}

fn complete_line_span(data: &[u8]) -> Option<usize> {
    data.iter()
        .rposition(|byte| *byte == b'\n')
        .map(|pos| pos + 1)
}

fn broadcast_line(server: &ChatServer, sender_id: u64, line: &[u8]) {
    let mut message = format!("[client {sender_id}] ").into_bytes();
    message.extend_from_slice(line);
    broadcast(server, Some(sender_id), Arc::new(message));
}

fn broadcast_system(server: &ChatServer, client_id: u64, event: &str) {
    let message = Arc::new(format!("* client {client_id} {event}\n").into_bytes());
    broadcast(server, Some(client_id), Arc::clone(&message));
    if !server.quiet {
        print!("{}", String::from_utf8_lossy(message.as_ref().as_slice()));
    }
}

fn broadcast(server: &ChatServer, sender_id: Option<u64>, message: Message) {
    let targets = {
        let clients = server.clients.lock().expect("client table poisoned");
        clients
            .values()
            .filter(|client| Some(client.id) != sender_id)
            .take(MAX_BROADCAST_TARGETS)
            .cloned()
            .collect::<Vec<_>>()
    };

    for target in targets {
        let _ = target.tx.try_send(Arc::clone(&message));
    }
}

fn queue_text(tx: &llam::channel::Sender<Message>, text: &str) {
    let _ = tx.try_send(Arc::new(text.as_bytes().to_vec()));
}

fn remove_client(server: &ChatServer, id: u64) -> Option<ClientEntry> {
    server
        .clients
        .lock()
        .expect("client table poisoned")
        .remove(&id)
}

struct Config {
    port: u16,
    public_bind: bool,
    quiet: bool,
}

impl Config {
    fn parse() -> Result<Self, String> {
        let mut port = DEFAULT_PORT;
        let mut public_bind = false;
        let mut quiet = std::env::var("LLAM_CHAT_QUIET").is_ok_and(|value| value != "0");

        for arg in std::env::args().skip(1) {
            match arg.as_str() {
                "--public" => public_bind = true,
                "--quiet" => quiet = true,
                value => {
                    port = value
                        .parse::<u16>()
                        .map_err(|_| format!("invalid port: {value}"))?;
                    if port == 0 {
                        return Err(format!("invalid port: {value}"));
                    }
                }
            }
        }

        Ok(Self {
            port,
            public_bind,
            quiet,
        })
    }
}