use std::{collections::HashMap, path::Path};
use room_protocol::SubscriptionTier;
use crate::{
message::{make_system, Message},
plugin::{
builtin_command_infos, ChatWriter, CommandContext, CommandInfo, HistoryReader, ParamType,
PluginResult, RoomMetadata,
},
};
use super::{
admin::{handle_admin_cmd, ADMIN_CMD_NAMES},
fanout::broadcast_and_persist,
state::RoomState,
};
pub(crate) fn save_subscription_map(
map: &HashMap<String, SubscriptionTier>,
path: &Path,
) -> Result<(), String> {
let json =
serde_json::to_string_pretty(map).map_err(|e| format!("serialize subscriptions: {e}"))?;
std::fs::write(path, json).map_err(|e| format!("write {}: {e}", path.display()))
}
pub(crate) fn load_subscription_map(path: &Path) -> HashMap<String, SubscriptionTier> {
let contents = match std::fs::read_to_string(path) {
Ok(c) => c,
Err(_) => return HashMap::new(),
};
serde_json::from_str(&contents).unwrap_or_else(|e| {
eprintln!(
"[broker] corrupt subscription file {}: {e} — starting empty",
path.display()
);
HashMap::new()
})
}
pub(crate) async fn persist_subscriptions(state: &RoomState) {
let snapshot = state.subscription_snapshot().await;
if let Err(e) = save_subscription_map(&snapshot, &state.subscription_map_path) {
eprintln!("[broker] subscription persist failed: {e}");
}
}
pub(crate) enum CommandResult {
Handled,
HandledWithReply(String),
Reply(String),
Shutdown,
Passthrough(Message),
}
pub(crate) async fn route_command(
msg: Message,
username: &str,
state: &RoomState,
) -> anyhow::Result<CommandResult> {
if let Message::Command {
ref cmd,
ref params,
..
} = msg
{
let builtins = builtin_command_infos();
if let Some(cmd_info) = builtins.iter().find(|c| c.name == *cmd) {
if let Err(err_msg) = validate_params(params, cmd_info) {
let sys = make_system(&state.room_id, "broker", err_msg);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
}
if cmd == "set_status" {
let status = params.first().cloned().unwrap_or_default();
state.set_status(username, status.clone()).await;
let display = if status.is_empty() {
format!("{username} cleared their status")
} else {
format!("{username} set status: {status}")
};
let sys = make_system(&state.room_id, "broker", display);
broadcast_and_persist(&sys, &state.clients, &state.chat_path, &state.seq_counter)
.await?;
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::HandledWithReply(json));
}
if cmd == "who" {
let entries: Vec<String> = state
.status_entries()
.await
.into_iter()
.map(|(u, s)| if s.is_empty() { u } else { format!("{u}: {s}") })
.collect();
let content = if entries.is_empty() {
"no users online".to_owned()
} else {
format!("online — {}", entries.join(", "))
};
let sys = make_system(&state.room_id, "broker", content);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
if cmd == "claim" {
let task = params.join(" ");
state.set_claim(username, task.clone()).await;
let display = format!("{username} claimed: {task}");
let sys = make_system(&state.room_id, "broker", display);
broadcast_and_persist(&sys, &state.clients, &state.chat_path, &state.seq_counter)
.await?;
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::HandledWithReply(json));
}
if cmd == "unclaim" {
let removed = state.remove_claim(username).await;
let display = match removed {
Some(task) => format!("{username} released claim: {task}"),
None => format!("{username} has no active claim"),
};
let sys = make_system(&state.room_id, "broker", display);
broadcast_and_persist(&sys, &state.clients, &state.chat_path, &state.seq_counter)
.await?;
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::HandledWithReply(json));
}
if cmd == "claimed" {
let raw = state.claim_entries().await;
let content = if raw.is_empty() {
"no active claims".to_owned()
} else {
let entries: Vec<String> =
raw.into_iter().map(|(u, t)| format!("{u}: {t}")).collect();
format!("claimed — {}", entries.join(", "))
};
let sys = make_system(&state.room_id, "broker", content);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
if cmd == "subscribe" || cmd == "set_subscription" {
let tier_str = params.first().map(String::as_str).unwrap_or("full");
let tier: SubscriptionTier = match tier_str.parse() {
Ok(t) => t,
Err(e) => {
let sys = make_system(&state.room_id, "broker", e);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
};
state.set_subscription(username, tier).await;
persist_subscriptions(state).await;
let display = format!("{username} subscribed to {} (tier: {tier})", state.room_id);
let sys = make_system(&state.room_id, "broker", display);
broadcast_and_persist(&sys, &state.clients, &state.chat_path, &state.seq_counter)
.await?;
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::HandledWithReply(json));
}
if cmd == "unsubscribe" {
state
.set_subscription(username, SubscriptionTier::Unsubscribed)
.await;
persist_subscriptions(state).await;
let display = format!("{username} unsubscribed from {}", state.room_id);
let sys = make_system(&state.room_id, "broker", display);
broadcast_and_persist(&sys, &state.clients, &state.chat_path, &state.seq_counter)
.await?;
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::HandledWithReply(json));
}
if cmd == "subscriptions" {
let raw = state.subscription_entries().await;
let content = if raw.is_empty() {
"no subscriptions".to_owned()
} else {
let entries: Vec<String> =
raw.into_iter().map(|(u, t)| format!("{u}: {t}")).collect();
format!("subscriptions — {}", entries.join(", "))
};
let sys = make_system(&state.room_id, "broker", content);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
if cmd == "room-info" {
let result = handle_room_info(state).await;
let sys = make_system(&state.room_id, "broker", result);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
if ADMIN_CMD_NAMES.contains(&cmd.as_str()) {
let cmd_line = format!("{cmd} {}", params.join(" "));
let error = handle_admin_cmd(&cmd_line, username, state).await;
if let Some(err) = error {
let sys = make_system(&state.room_id, "broker", err);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
if cmd == "exit" {
return Ok(CommandResult::Shutdown);
}
return Ok(CommandResult::Handled);
}
if let Some(plugin) = state.plugin_registry.resolve(cmd) {
let plugin_name = plugin.name().to_owned();
match dispatch_plugin(plugin, &msg, username, state).await {
Ok(result) => return Ok(result),
Err(e) => {
let err_msg = format!("plugin:{plugin_name} error: {e}");
let sys = make_system(&state.room_id, "broker", err_msg);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
}
}
}
Ok(CommandResult::Passthrough(msg))
}
fn validate_params(params: &[String], schema: &CommandInfo) -> Result<(), String> {
for (i, ps) in schema.params.iter().enumerate() {
let value = params.get(i).map(String::as_str).unwrap_or("");
if ps.required && value.is_empty() {
return Err(format!(
"/{}: missing required parameter <{}>",
schema.name, ps.name
));
}
if value.is_empty() {
continue;
}
match &ps.param_type {
ParamType::Choice(allowed) => {
if !allowed.iter().any(|a| a == value) {
return Err(format!(
"/{}: <{}> must be one of: {}",
schema.name,
ps.name,
allowed.join(", ")
));
}
}
ParamType::Number { min, max } => {
let Ok(n) = value.parse::<i64>() else {
return Err(format!(
"/{}: <{}> must be a number, got '{}'",
schema.name, ps.name, value
));
};
if let Some(lo) = min {
if n < *lo {
return Err(format!("/{}: <{}> must be >= {lo}", schema.name, ps.name));
}
}
if let Some(hi) = max {
if n > *hi {
return Err(format!("/{}: <{}> must be <= {hi}", schema.name, ps.name));
}
}
}
ParamType::Text | ParamType::Username => {}
}
}
Ok(())
}
async fn dispatch_plugin(
plugin: &dyn crate::plugin::Plugin,
msg: &Message,
username: &str,
state: &RoomState,
) -> anyhow::Result<CommandResult> {
let (cmd, params, id, ts) = match msg {
Message::Command {
cmd,
params,
id,
ts,
..
} => (cmd, params, id, ts),
_ => return Ok(CommandResult::Passthrough(msg.clone())),
};
if let Some(cmd_info) = plugin.commands().iter().find(|c| c.name == *cmd) {
if let Err(err_msg) = validate_params(params, cmd_info) {
let sys = make_system(
&state.room_id,
&format!("plugin:{}", plugin.name()),
err_msg,
);
let json = serde_json::to_string(&sys)?;
return Ok(CommandResult::Reply(json));
}
}
let history = HistoryReader::new(&state.chat_path, username);
let writer = ChatWriter::new(
&state.clients,
&state.chat_path,
&state.room_id,
&state.seq_counter,
plugin.name(),
);
let metadata =
RoomMetadata::snapshot(&state.status_map, &state.host_user, &state.chat_path).await;
let available_commands = state.plugin_registry.all_commands();
let ctx = CommandContext {
command: cmd.clone(),
params: params.clone(),
sender: username.to_owned(),
room_id: state.room_id.as_ref().clone(),
message_id: id.clone(),
timestamp: *ts,
history,
writer,
metadata,
available_commands,
};
let result = plugin.handle(ctx).await?;
Ok(match result {
PluginResult::Reply(text) => {
let sys = make_system(&state.room_id, &format!("plugin:{}", plugin.name()), text);
let json = serde_json::to_string(&sys)?;
CommandResult::Reply(json)
}
PluginResult::Broadcast(text) => {
let sys = make_system(&state.room_id, &format!("plugin:{}", plugin.name()), text);
broadcast_and_persist(&sys, &state.clients, &state.chat_path, &state.seq_counter)
.await?;
CommandResult::Handled
}
PluginResult::Handled => CommandResult::Handled,
})
}
async fn handle_room_info(state: &RoomState) -> String {
let member_count = state.status_count().await;
let sub_count = state.subscription_count().await;
match &state.config {
Some(config) => {
let vis = serde_json::to_string(&config.visibility).unwrap_or_default();
let max = config
.max_members
.map(|n| n.to_string())
.unwrap_or_else(|| "unlimited".to_owned());
let invites: Vec<&str> = config.invite_list.iter().map(|s| s.as_str()).collect();
format!(
"room: {} | visibility: {} | max members: {} | members online: {} | subscribers: {} | invited: [{}] | created by: {}",
state.room_id, vis, max, member_count, sub_count, invites.join(", "), config.created_by
)
}
None => {
format!(
"room: {} | visibility: public (legacy) | members online: {} | subscribers: {}",
state.room_id, member_count, sub_count
)
}
}
}
#[cfg(test)]
mod tests {
use super::{handle_room_info, route_command, CommandResult};
use crate::{
broker::state::RoomState,
message::{make_command, make_dm, make_message},
};
use room_protocol::SubscriptionTier;
use std::{collections::HashMap, sync::Arc};
use tempfile::NamedTempFile;
use tokio::sync::Mutex;
fn make_state(chat_path: std::path::PathBuf) -> Arc<RoomState> {
let token_map_path = chat_path.with_extension("tokens");
let subscription_map_path = chat_path.with_extension("subscriptions");
RoomState::new(
"test-room".to_owned(),
chat_path,
token_map_path,
subscription_map_path,
Arc::new(Mutex::new(HashMap::new())),
Arc::new(Mutex::new(HashMap::new())),
None,
)
.unwrap()
}
#[tokio::test]
async fn route_command_regular_message_is_passthrough() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_message("test-room", "alice", "hello");
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Passthrough(_)));
}
#[tokio::test]
async fn route_command_dm_message_is_passthrough() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_dm("test-room", "alice", "bob", "secret");
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Passthrough(_)));
}
#[tokio::test]
async fn route_command_set_status_returns_handled_with_reply_and_updates_map() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "set_status", vec!["busy".to_owned()]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply, got Handled or other");
};
assert!(
json.contains("set status"),
"reply JSON should contain status announcement"
);
assert!(
json.contains("busy"),
"reply JSON should contain the status text"
);
assert_eq!(
state
.status_map
.lock()
.await
.get("alice")
.map(String::as_str),
Some("busy")
);
}
#[tokio::test]
async fn route_command_set_status_empty_params_clears_status() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
state
.status_map
.lock()
.await
.insert("alice".to_owned(), "busy".to_owned());
let msg = make_command("test-room", "alice", "set_status", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::HandledWithReply(_)));
assert_eq!(
state
.status_map
.lock()
.await
.get("alice")
.map(String::as_str),
Some("")
);
}
#[tokio::test]
async fn route_command_who_with_online_user_in_reply() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
state
.status_map
.lock()
.await
.insert("alice".to_owned(), String::new());
let msg = make_command("test-room", "alice", "who", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply");
};
assert!(json.contains("alice"), "reply should list alice");
}
#[tokio::test]
async fn route_command_who_empty_room_says_no_users_online() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "who", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply");
};
assert!(json.contains("no users online"));
}
#[tokio::test]
async fn route_command_who_shows_status_alongside_name() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
state
.status_map
.lock()
.await
.insert("alice".to_owned(), "reviewing PR".to_owned());
let msg = make_command("test-room", "alice", "who", vec![]);
let CommandResult::Reply(json) = route_command(msg, "alice", &state).await.unwrap() else {
panic!("expected Reply");
};
assert!(json.contains("reviewing PR"));
}
#[tokio::test]
async fn route_command_admin_as_non_host_gets_permission_denied_reply() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
*state.host_user.lock().await = Some("host-user".to_owned());
let msg = make_command("test-room", "alice", "kick", vec!["bob".to_owned()]);
let CommandResult::Reply(json) = route_command(msg, "alice", &state).await.unwrap() else {
panic!("expected Reply");
};
assert!(json.contains("permission denied"));
}
#[tokio::test]
async fn route_command_admin_when_no_host_set_gets_permission_denied() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "exit", vec![]);
let CommandResult::Reply(json) = route_command(msg, "alice", &state).await.unwrap() else {
panic!("expected Reply");
};
assert!(json.contains("permission denied"));
}
#[tokio::test]
async fn route_command_kick_as_host_returns_handled_and_invalidates_token() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
*state.host_user.lock().await = Some("alice".to_owned());
state
.token_map
.lock()
.await
.insert("some-uuid".to_owned(), "bob".to_owned());
let msg = make_command("test-room", "alice", "kick", vec!["bob".to_owned()]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Handled));
let guard = state.token_map.lock().await;
assert!(
!guard.contains_key("some-uuid"),
"original token must be revoked"
);
assert_eq!(
guard.get("KICKED:bob").map(String::as_str),
Some("bob"),
"KICKED sentinel must be inserted"
);
}
#[tokio::test]
async fn route_command_exit_as_host_returns_shutdown() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
*state.host_user.lock().await = Some("alice".to_owned());
let msg = make_command("test-room", "alice", "exit", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Shutdown));
}
#[tokio::test]
async fn route_command_kick_missing_user_gets_validation_error() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
*state.host_user.lock().await = Some("alice".to_owned());
let msg = make_command("test-room", "alice", "kick", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply with validation error");
};
assert!(
json.contains("missing required"),
"should report missing param"
);
assert!(json.contains("<user>"), "should name the missing param");
}
#[tokio::test]
async fn route_command_reauth_missing_user_gets_validation_error() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
*state.host_user.lock().await = Some("alice".to_owned());
let msg = make_command("test-room", "alice", "reauth", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply with validation error");
};
assert!(json.contains("missing required"));
}
#[tokio::test]
async fn route_command_kick_with_valid_params_passes_validation() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
*state.host_user.lock().await = Some("alice".to_owned());
let msg = make_command("test-room", "alice", "kick", vec!["bob".to_owned()]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Handled));
}
#[tokio::test]
async fn route_command_claim_missing_task_gets_validation_error() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "claim", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply with validation error");
};
assert!(json.contains("missing required"));
assert!(json.contains("<task>"));
}
#[tokio::test]
async fn route_command_claim_with_task_is_handled() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "claim", vec!["fix bug".to_owned()]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply for /claim");
};
assert!(json.contains("alice claimed: fix bug"));
}
#[tokio::test]
async fn route_command_who_no_params_passes_validation() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "who", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Reply(_)));
}
#[tokio::test]
async fn route_command_reply_missing_params_gets_validation_error() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "reply", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply with validation error");
};
assert!(json.contains("missing required"));
}
#[tokio::test]
async fn route_command_nonbuiltin_command_skips_validation() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "unknown_cmd", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Passthrough(_)));
}
mod validation_tests {
use super::super::validate_params;
use crate::plugin::{CommandInfo, ParamSchema, ParamType};
fn cmd_with_params(params: Vec<ParamSchema>) -> CommandInfo {
CommandInfo {
name: "test".to_owned(),
description: "test".to_owned(),
usage: "/test".to_owned(),
params,
}
}
#[test]
fn validate_empty_schema_always_passes() {
let cmd = cmd_with_params(vec![]);
assert!(validate_params(&[], &cmd).is_ok());
assert!(validate_params(&["extra".to_owned()], &cmd).is_ok());
}
#[test]
fn validate_required_param_missing() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "user".to_owned(),
param_type: ParamType::Text,
required: true,
description: "target user".to_owned(),
}]);
let err = validate_params(&[], &cmd).unwrap_err();
assert!(err.contains("missing required"));
assert!(err.contains("<user>"));
}
#[test]
fn validate_required_param_present() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "user".to_owned(),
param_type: ParamType::Text,
required: true,
description: "target user".to_owned(),
}]);
assert!(validate_params(&["alice".to_owned()], &cmd).is_ok());
}
#[test]
fn validate_optional_param_missing_is_ok() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "count".to_owned(),
param_type: ParamType::Number {
min: None,
max: None,
},
required: false,
description: "count".to_owned(),
}]);
assert!(validate_params(&[], &cmd).is_ok());
}
#[test]
fn validate_choice_valid_value() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "color".to_owned(),
param_type: ParamType::Choice(vec!["red".to_owned(), "blue".to_owned()]),
required: true,
description: "pick a color".to_owned(),
}]);
assert!(validate_params(&["red".to_owned()], &cmd).is_ok());
assert!(validate_params(&["blue".to_owned()], &cmd).is_ok());
}
#[test]
fn validate_choice_invalid_value() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "color".to_owned(),
param_type: ParamType::Choice(vec!["red".to_owned(), "blue".to_owned()]),
required: true,
description: "pick a color".to_owned(),
}]);
let err = validate_params(&["green".to_owned()], &cmd).unwrap_err();
assert!(err.contains("must be one of"));
assert!(err.contains("red"));
assert!(err.contains("blue"));
}
#[test]
fn validate_number_valid() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "count".to_owned(),
param_type: ParamType::Number {
min: Some(1),
max: Some(100),
},
required: true,
description: "count".to_owned(),
}]);
assert!(validate_params(&["50".to_owned()], &cmd).is_ok());
assert!(validate_params(&["1".to_owned()], &cmd).is_ok());
assert!(validate_params(&["100".to_owned()], &cmd).is_ok());
}
#[test]
fn validate_number_not_a_number() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "count".to_owned(),
param_type: ParamType::Number {
min: None,
max: None,
},
required: true,
description: "count".to_owned(),
}]);
let err = validate_params(&["abc".to_owned()], &cmd).unwrap_err();
assert!(err.contains("must be a number"));
}
#[test]
fn validate_number_below_min() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "count".to_owned(),
param_type: ParamType::Number {
min: Some(10),
max: None,
},
required: true,
description: "count".to_owned(),
}]);
let err = validate_params(&["5".to_owned()], &cmd).unwrap_err();
assert!(err.contains("must be >= 10"));
}
#[test]
fn validate_number_above_max() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "count".to_owned(),
param_type: ParamType::Number {
min: None,
max: Some(50),
},
required: true,
description: "count".to_owned(),
}]);
let err = validate_params(&["100".to_owned()], &cmd).unwrap_err();
assert!(err.contains("must be <= 50"));
}
#[test]
fn validate_text_always_passes() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "msg".to_owned(),
param_type: ParamType::Text,
required: true,
description: "message".to_owned(),
}]);
assert!(validate_params(&["anything at all".to_owned()], &cmd).is_ok());
}
#[test]
fn validate_username_always_passes() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "user".to_owned(),
param_type: ParamType::Username,
required: true,
description: "user".to_owned(),
}]);
assert!(validate_params(&["alice".to_owned()], &cmd).is_ok());
}
#[test]
fn validate_multiple_params() {
let cmd = cmd_with_params(vec![
ParamSchema {
name: "user".to_owned(),
param_type: ParamType::Username,
required: true,
description: "target".to_owned(),
},
ParamSchema {
name: "count".to_owned(),
param_type: ParamType::Number {
min: Some(1),
max: Some(100),
},
required: false,
description: "count".to_owned(),
},
]);
assert!(validate_params(&["alice".to_owned(), "50".to_owned()], &cmd).is_ok());
assert!(validate_params(&["alice".to_owned()], &cmd).is_ok());
assert!(validate_params(&[], &cmd).is_err());
}
#[test]
fn validate_choice_optional_missing_is_ok() {
let cmd = cmd_with_params(vec![ParamSchema {
name: "level".to_owned(),
param_type: ParamType::Choice(vec!["low".to_owned(), "high".to_owned()]),
required: false,
description: "level".to_owned(),
}]);
assert!(validate_params(&[], &cmd).is_ok());
}
}
fn make_state_with_config(
chat_path: std::path::PathBuf,
config: room_protocol::RoomConfig,
) -> Arc<RoomState> {
let token_map_path = chat_path.with_extension("tokens");
let subscription_map_path = chat_path.with_extension("subscriptions");
RoomState::new(
"test-room".to_owned(),
chat_path,
token_map_path,
subscription_map_path,
Arc::new(Mutex::new(HashMap::new())),
Arc::new(Mutex::new(HashMap::new())),
Some(config),
)
.unwrap()
}
#[tokio::test]
async fn room_info_no_config() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let result = handle_room_info(&state).await;
assert!(result.contains("legacy"));
assert!(result.contains("test-room"));
}
#[tokio::test]
async fn room_info_with_config() {
let tmp = NamedTempFile::new().unwrap();
let config = room_protocol::RoomConfig::dm("alice", "bob");
let state = make_state_with_config(tmp.path().to_path_buf(), config);
let result = handle_room_info(&state).await;
assert!(result.contains("dm"));
assert!(result.contains("alice"));
}
#[tokio::test]
async fn route_command_room_info_returns_reply() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "room-info", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::Reply(_)));
}
#[tokio::test]
async fn route_command_claim_stores_task_and_broadcasts() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command(
"test-room",
"alice",
"claim",
vec!["fix bug #42".to_owned()],
);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("claimed"));
assert!(json.contains("fix bug #42"));
assert_eq!(
state
.claim_map
.lock()
.await
.get("alice")
.map(String::as_str),
Some("fix bug #42")
);
}
#[tokio::test]
async fn route_command_claim_overwrites_previous() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg1 = make_command("test-room", "alice", "claim", vec!["task A".to_owned()]);
route_command(msg1, "alice", &state).await.unwrap();
let msg2 = make_command("test-room", "alice", "claim", vec!["task B".to_owned()]);
route_command(msg2, "alice", &state).await.unwrap();
assert_eq!(
state
.claim_map
.lock()
.await
.get("alice")
.map(String::as_str),
Some("task B"),
"new claim should overwrite the old one"
);
}
#[tokio::test]
async fn route_command_unclaim_removes_claim() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
state
.claim_map
.lock()
.await
.insert("alice".to_owned(), "task A".to_owned());
let msg = make_command("test-room", "alice", "unclaim", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("released"));
assert!(json.contains("task A"));
assert!(state.claim_map.lock().await.get("alice").is_none());
}
#[tokio::test]
async fn route_command_unclaim_no_active_claim() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "unclaim", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("no active claim"));
}
#[tokio::test]
async fn route_command_claimed_empty_board() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "claimed", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply");
};
assert!(json.contains("no active claims"));
}
#[tokio::test]
async fn route_command_claimed_shows_all_claims() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
{
let mut map = state.claim_map.lock().await;
map.insert("alice".to_owned(), "task A".to_owned());
map.insert("bob".to_owned(), "task B".to_owned());
}
let msg = make_command("test-room", "alice", "claimed", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply");
};
assert!(json.contains("alice: task A"));
assert!(json.contains("bob: task B"));
}
#[tokio::test]
async fn route_command_claimed_is_sorted() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
{
let mut map = state.claim_map.lock().await;
map.insert("zara".to_owned(), "z-task".to_owned());
map.insert("alice".to_owned(), "a-task".to_owned());
}
let msg = make_command("test-room", "alice", "claimed", vec![]);
let CommandResult::Reply(json) = route_command(msg, "alice", &state).await.unwrap() else {
panic!("expected Reply");
};
let alice_pos = json.find("alice: a-task").unwrap();
let zara_pos = json.find("zara: z-task").unwrap();
assert!(alice_pos < zara_pos, "claims should be sorted by username");
}
#[tokio::test]
async fn set_subscription_alias_works() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command(
"test-room",
"alice",
"set_subscription",
vec!["full".to_owned()],
);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("subscribed"));
assert!(json.contains("full"));
assert_eq!(
*state.subscription_map.lock().await.get("alice").unwrap(),
SubscriptionTier::Full
);
}
#[tokio::test]
async fn set_subscription_alias_mentions_only() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command(
"test-room",
"bob",
"set_subscription",
vec!["mentions_only".to_owned()],
);
let result = route_command(msg, "bob", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("subscribed"));
assert_eq!(
*state.subscription_map.lock().await.get("bob").unwrap(),
SubscriptionTier::MentionsOnly
);
}
#[tokio::test]
async fn subscribe_default_tier_is_full() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("subscribed"));
assert!(json.contains("full"));
assert_eq!(
*state.subscription_map.lock().await.get("alice").unwrap(),
SubscriptionTier::Full
);
}
#[tokio::test]
async fn subscribe_explicit_mentions_only() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command(
"test-room",
"bob",
"subscribe",
vec!["mentions_only".to_owned()],
);
let result = route_command(msg, "bob", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("mentions_only"));
assert_eq!(
*state.subscription_map.lock().await.get("bob").unwrap(),
SubscriptionTier::MentionsOnly
);
}
#[tokio::test]
async fn subscribe_overwrites_previous_tier() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg1 = make_command("test-room", "alice", "subscribe", vec!["full".to_owned()]);
route_command(msg1, "alice", &state).await.unwrap();
let msg2 = make_command(
"test-room",
"alice",
"subscribe",
vec!["mentions_only".to_owned()],
);
route_command(msg2, "alice", &state).await.unwrap();
assert_eq!(
*state.subscription_map.lock().await.get("alice").unwrap(),
SubscriptionTier::MentionsOnly,
"second subscribe should overwrite the first"
);
}
#[tokio::test]
async fn unsubscribe_sets_tier_to_unsubscribed() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec!["full".to_owned()]);
route_command(msg, "alice", &state).await.unwrap();
let msg = make_command("test-room", "alice", "unsubscribe", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::HandledWithReply(json) = result else {
panic!("expected HandledWithReply");
};
assert!(json.contains("unsubscribed"));
assert_eq!(
*state.subscription_map.lock().await.get("alice").unwrap(),
SubscriptionTier::Unsubscribed
);
}
#[tokio::test]
async fn unsubscribe_without_prior_subscription() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "unsubscribe", vec![]);
let result = route_command(msg, "alice", &state).await.unwrap();
assert!(matches!(result, CommandResult::HandledWithReply(_)));
assert_eq!(
*state.subscription_map.lock().await.get("alice").unwrap(),
SubscriptionTier::Unsubscribed
);
}
#[tokio::test]
async fn subscriptions_empty() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscriptions", vec![]);
let CommandResult::Reply(json) = route_command(msg, "alice", &state).await.unwrap() else {
panic!("expected Reply");
};
assert!(json.contains("no subscriptions"));
}
#[tokio::test]
async fn subscriptions_lists_all_sorted() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
{
let mut map = state.subscription_map.lock().await;
map.insert("zara".to_owned(), SubscriptionTier::Full);
map.insert("alice".to_owned(), SubscriptionTier::MentionsOnly);
}
let msg = make_command("test-room", "alice", "subscriptions", vec![]);
let CommandResult::Reply(json) = route_command(msg, "alice", &state).await.unwrap() else {
panic!("expected Reply");
};
assert!(json.contains("alice: mentions_only"));
assert!(json.contains("zara: full"));
let alice_pos = json.find("alice: mentions_only").unwrap();
let zara_pos = json.find("zara: full").unwrap();
assert!(
alice_pos < zara_pos,
"subscriptions should be sorted by username"
);
}
#[tokio::test]
async fn subscribe_invalid_tier_returns_error() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec!["banana".to_owned()]);
let result = route_command(msg, "alice", &state).await.unwrap();
let CommandResult::Reply(json) = result else {
panic!("expected Reply for invalid tier");
};
assert!(json.contains("must be one of"));
assert!(state.subscription_map.lock().await.get("alice").is_none());
}
#[tokio::test]
async fn subscribe_broadcasts_system_message() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let history = std::fs::read_to_string(tmp.path()).unwrap();
assert!(history.contains("subscribed"));
assert!(history.contains("alice"));
}
#[tokio::test]
async fn unsubscribe_broadcasts_system_message() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "unsubscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let history = std::fs::read_to_string(tmp.path()).unwrap();
assert!(history.contains("unsubscribed"));
assert!(history.contains("alice"));
}
#[test]
fn load_subscription_map_missing_file_returns_empty() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("nonexistent.subscriptions");
let map = super::load_subscription_map(&path);
assert!(map.is_empty());
}
#[test]
fn save_and_load_subscription_map_round_trip() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.subscriptions");
let mut original = HashMap::new();
original.insert("alice".to_owned(), SubscriptionTier::Full);
original.insert("bob".to_owned(), SubscriptionTier::MentionsOnly);
original.insert("carol".to_owned(), SubscriptionTier::Unsubscribed);
super::save_subscription_map(&original, &path).unwrap();
let loaded = super::load_subscription_map(&path);
assert_eq!(loaded, original);
}
#[test]
fn load_subscription_map_corrupt_file_returns_empty() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("corrupt.subscriptions");
std::fs::write(&path, "not json{{{").unwrap();
let map = super::load_subscription_map(&path);
assert!(map.is_empty());
}
#[tokio::test]
async fn subscribe_persists_to_disk() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let loaded = super::load_subscription_map(&state.subscription_map_path);
assert_eq!(loaded.get("alice"), Some(&SubscriptionTier::Full));
}
#[tokio::test]
async fn unsubscribe_persists_to_disk() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let msg = make_command("test-room", "alice", "unsubscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let loaded = super::load_subscription_map(&state.subscription_map_path);
assert_eq!(loaded.get("alice"), Some(&SubscriptionTier::Unsubscribed));
}
#[tokio::test]
async fn subscribe_accumulates_on_disk() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let msg = make_command(
"test-room",
"bob",
"subscribe",
vec!["mentions_only".to_owned()],
);
route_command(msg, "bob", &state).await.unwrap();
let loaded = super::load_subscription_map(&state.subscription_map_path);
assert_eq!(loaded.len(), 2);
assert_eq!(loaded.get("alice"), Some(&SubscriptionTier::Full));
assert_eq!(loaded.get("bob"), Some(&SubscriptionTier::MentionsOnly));
}
#[tokio::test]
async fn subscribe_survives_simulated_restart() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let loaded = super::load_subscription_map(&state.subscription_map_path);
assert_eq!(loaded.get("alice"), Some(&SubscriptionTier::Full));
let state2 = RoomState::new(
state.room_id.as_ref().clone(),
state.chat_path.as_ref().clone(),
state.token_map_path.as_ref().clone(),
state.subscription_map_path.as_ref().clone(),
Arc::new(Mutex::new(HashMap::new())),
Arc::new(Mutex::new(loaded)),
None,
)
.unwrap();
assert_eq!(
*state2.subscription_map.lock().await.get("alice").unwrap(),
SubscriptionTier::Full
);
}
#[tokio::test]
async fn subscribe_overwrite_persists_latest_tier() {
let tmp = NamedTempFile::new().unwrap();
let state = make_state(tmp.path().to_path_buf());
let msg = make_command("test-room", "alice", "subscribe", vec![]);
route_command(msg, "alice", &state).await.unwrap();
let msg = make_command(
"test-room",
"alice",
"subscribe",
vec!["mentions_only".to_owned()],
);
route_command(msg, "alice", &state).await.unwrap();
let loaded = super::load_subscription_map(&state.subscription_map_path);
assert_eq!(loaded.get("alice"), Some(&SubscriptionTier::MentionsOnly));
}
}