use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
const DEFAULT_PORT: u16 = 7777;
const OUTBOX_CAP: usize = 64;
const READ_BUF: usize = 2048;
const INPUT_CAP: usize = 4096;
const MAX_BROADCAST_TARGETS: usize = 1024;
type Message = Arc<Vec<u8>>;
#[derive(Clone)]
struct ClientEntry {
id: u64,
tx: llam::channel::Sender<Message>,
}
struct ChatServer {
clients: Mutex<HashMap<u64, ClientEntry>>,
next_id: AtomicU64,
quiet: bool,
}
fn main() {
let config = Config::parse().unwrap_or_else(|error| {
eprintln!("{error}");
std::process::exit(2);
});
llam::Runtime::builder()
.profile(llam::Profile::IoLatency)
.lockfree_normq(true)
.run(move || run_chat(config))
.expect("LLAM chat server failed");
}
fn run_chat(config: Config) -> llam::Result<()> {
let bind_addr = if config.public_bind {
format!("0.0.0.0:{}", config.port)
} else {
format!("127.0.0.1:{}", config.port)
};
let listener = llam::net::TcpListener::bind(&bind_addr).map_err(llam::Error::from)?;
let server = Arc::new(ChatServer {
clients: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(1),
quiet: config.quiet,
});
if !config.quiet {
println!("LLAM chat server listening on {bind_addr}");
}
loop {
let (stream, peer) = listener.accept().map_err(llam::Error::from)?;
accept_client(Arc::clone(&server), stream, peer)?;
}
}
fn accept_client(
server: Arc<ChatServer>,
stream: llam::net::TcpStream,
peer: Option<SocketAddr>,
) -> llam::Result<()> {
let id = server.next_id.fetch_add(1, Ordering::Relaxed);
let peer = peer
.map(|addr| addr.to_string())
.unwrap_or_else(|| "unknown".to_string());
let (tx, rx) = llam::channel::bounded::<Message>(OUTBOX_CAP)?;
let writer_stream = stream.try_clone().map_err(llam::Error::from)?;
{
let mut clients = server.clients.lock().expect("client table poisoned");
clients.insert(id, ClientEntry { id, tx: tx.clone() });
}
let writer = llam::try_spawn!(stack = llam::StackClass::Huge, move {
writer_task(writer_stream, rx);
})?;
writer.detach()?;
let reader_server = Arc::clone(&server);
let reader = llam::try_spawn!(stack = llam::StackClass::Huge, move {
reader_task(reader_server, id, stream);
})?;
reader.detach()?;
if !server.quiet {
println!("client {id} connected from {peer}");
}
queue_text(&tx, "Welcome to LLAM chat. Type and press enter.\n");
broadcast_system(&server, id, "joined");
Ok(())
}
fn writer_task(mut stream: llam::net::TcpStream, rx: llam::channel::Receiver<Message>) {
while let Ok(Some(message)) = rx.recv_option() {
if stream.write_all(message.as_ref().as_slice()).is_err() {
break;
}
}
}
fn reader_task(server: Arc<ChatServer>, id: u64, mut stream: llam::net::TcpStream) {
let mut buf = [0u8; READ_BUF];
let mut input = Vec::with_capacity(INPUT_CAP);
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => process_input(&server, id, &mut input, &buf[..n]),
Err(_) => break,
}
}
if !input.is_empty() {
broadcast_line(&server, id, &input);
input.clear();
}
broadcast_system(&server, id, "left");
if let Some(client) = remove_client(&server, id) {
let _ = client.tx.close();
}
}
fn process_input(server: &ChatServer, id: u64, input: &mut Vec<u8>, data: &[u8]) {
let mut off = 0usize;
while off < data.len() {
if input.is_empty() {
if let Some(span) = complete_line_span(&data[off..]) {
broadcast_line(server, id, &data[off..off + span]);
off += span;
continue;
}
}
let space = INPUT_CAP.saturating_sub(input.len());
let copy_len = (data.len() - off).min(space);
input.extend_from_slice(&data[off..off + copy_len]);
off += copy_len;
if let Some(span) = complete_line_span(input) {
let tail = input.split_off(span);
broadcast_line(server, id, input);
*input = tail;
} else if input.len() == INPUT_CAP {
broadcast_line(server, id, input);
input.clear();
}
if copy_len == 0 {
off = data.len();
}
}
}
fn complete_line_span(data: &[u8]) -> Option<usize> {
data.iter()
.rposition(|byte| *byte == b'\n')
.map(|pos| pos + 1)
}
fn broadcast_line(server: &ChatServer, sender_id: u64, line: &[u8]) {
let mut message = format!("[client {sender_id}] ").into_bytes();
message.extend_from_slice(line);
broadcast(server, Some(sender_id), Arc::new(message));
}
fn broadcast_system(server: &ChatServer, client_id: u64, event: &str) {
let message = Arc::new(format!("* client {client_id} {event}\n").into_bytes());
broadcast(server, Some(client_id), Arc::clone(&message));
if !server.quiet {
print!("{}", String::from_utf8_lossy(message.as_ref().as_slice()));
}
}
fn broadcast(server: &ChatServer, sender_id: Option<u64>, message: Message) {
let targets = {
let clients = server.clients.lock().expect("client table poisoned");
clients
.values()
.filter(|client| Some(client.id) != sender_id)
.take(MAX_BROADCAST_TARGETS)
.cloned()
.collect::<Vec<_>>()
};
for target in targets {
let _ = target.tx.try_send(Arc::clone(&message));
}
}
fn queue_text(tx: &llam::channel::Sender<Message>, text: &str) {
let _ = tx.try_send(Arc::new(text.as_bytes().to_vec()));
}
fn remove_client(server: &ChatServer, id: u64) -> Option<ClientEntry> {
server
.clients
.lock()
.expect("client table poisoned")
.remove(&id)
}
struct Config {
port: u16,
public_bind: bool,
quiet: bool,
}
impl Config {
fn parse() -> Result<Self, String> {
let mut port = DEFAULT_PORT;
let mut public_bind = false;
let mut quiet = std::env::var("LLAM_CHAT_QUIET").is_ok_and(|value| value != "0");
for arg in std::env::args().skip(1) {
match arg.as_str() {
"--public" => public_bind = true,
"--quiet" => quiet = true,
value => {
port = value
.parse::<u16>()
.map_err(|_| format!("invalid port: {value}"))?;
if port == 0 {
return Err(format!("invalid port: {value}"));
}
}
}
}
Ok(Self {
port,
public_bind,
quiet,
})
}
}