use std::path::Path;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::UnixStream,
};
use crate::message::Message;
#[derive(Debug, Clone)]
pub struct SocketTarget {
pub path: std::path::PathBuf,
pub daemon_room: Option<String>,
}
impl SocketTarget {
fn handshake_line(&self, token_line: &str) -> String {
match &self.daemon_room {
Some(room_id) => format!("ROOM:{room_id}:{token_line}"),
None => token_line.to_owned(),
}
}
}
pub fn resolve_socket_target(room_id: &str, explicit: Option<&Path>) -> SocketTarget {
let per_room = crate::paths::room_single_socket_path(room_id);
let daemon = crate::paths::effective_socket_path(None);
if let Some(path) = explicit {
if path == per_room {
return SocketTarget {
path: path.to_owned(),
daemon_room: None,
};
}
return SocketTarget {
path: path.to_owned(),
daemon_room: Some(room_id.to_owned()),
};
}
if daemon.exists() {
SocketTarget {
path: daemon,
daemon_room: Some(room_id.to_owned()),
}
} else {
SocketTarget {
path: per_room,
daemon_room: None,
}
}
}
const DAEMON_POLL_INTERVAL_MS: u64 = 50;
const DAEMON_START_TIMEOUT_MS: u64 = 5_000;
pub async fn ensure_daemon_running() -> anyhow::Result<()> {
let exe = resolve_daemon_binary()?;
ensure_daemon_running_impl(&crate::paths::effective_socket_path(None), &exe).await
}
fn resolve_daemon_binary() -> anyhow::Result<std::path::PathBuf> {
if let Ok(p) = std::env::var("ROOM_BINARY") {
let path = std::path::PathBuf::from(&p);
if path.exists() {
return Ok(path);
}
}
if let Ok(output) = std::process::Command::new("which").arg("room").output() {
if output.status.success() {
let path_str = String::from_utf8_lossy(&output.stdout);
let path = std::path::PathBuf::from(path_str.trim());
if path.exists() {
return Ok(path);
}
}
}
std::env::current_exe().map_err(|e| anyhow::anyhow!("cannot resolve daemon binary: {e}"))
}
#[cfg(test)]
pub(crate) async fn ensure_daemon_running_at(
socket: &Path,
exe: &std::path::Path,
) -> anyhow::Result<()> {
ensure_daemon_running_impl(socket, exe).await
}
async fn ensure_daemon_running_impl(socket: &Path, exe: &Path) -> anyhow::Result<()> {
if UnixStream::connect(socket).await.is_ok() {
return Ok(());
}
let child = std::process::Command::new(exe)
.arg("daemon")
.arg("--socket")
.arg(socket)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.map_err(|e| anyhow::anyhow!("failed to spawn daemon ({}): {e}", exe.display()))?;
let pid_path = crate::paths::room_pid_path();
let _ = std::fs::write(&pid_path, child.id().to_string());
let deadline =
tokio::time::Instant::now() + tokio::time::Duration::from_millis(DAEMON_START_TIMEOUT_MS);
loop {
if UnixStream::connect(socket).await.is_ok() {
return Ok(());
}
if tokio::time::Instant::now() >= deadline {
anyhow::bail!(
"daemon failed to start within {}ms (socket: {})",
DAEMON_START_TIMEOUT_MS,
socket.display()
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(DAEMON_POLL_INTERVAL_MS)).await;
}
}
pub async fn send_message(
socket_path: &Path,
username: &str,
content: &str,
) -> anyhow::Result<Message> {
let stream = UnixStream::connect(socket_path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to broker at {}: {e}", socket_path.display())
})?;
let (r, mut w) = stream.into_split();
w.write_all(format!("SEND:{username}\n").as_bytes()).await?;
w.write_all(format!("{content}\n").as_bytes()).await?;
let mut reader = BufReader::new(r);
let mut line = String::new();
reader.read_line(&mut line).await?;
let msg: Message = serde_json::from_str(line.trim())
.map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
Ok(msg)
}
pub async fn send_message_with_token(
socket_path: &Path,
token: &str,
content: &str,
) -> anyhow::Result<Message> {
send_message_with_token_target(
&SocketTarget {
path: socket_path.to_owned(),
daemon_room: None,
},
token,
content,
)
.await
}
pub async fn send_message_with_token_target(
target: &SocketTarget,
token: &str,
content: &str,
) -> anyhow::Result<Message> {
let stream = UnixStream::connect(&target.path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to broker at {}: {e}", target.path.display())
})?;
let (r, mut w) = stream.into_split();
let handshake = target.handshake_line(&format!("TOKEN:{token}"));
w.write_all(format!("{handshake}\n").as_bytes()).await?;
w.write_all(format!("{content}\n").as_bytes()).await?;
let mut reader = BufReader::new(r);
let mut line = String::new();
reader.read_line(&mut line).await?;
let v: serde_json::Value = serde_json::from_str(line.trim())
.map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
if v["type"] == "error" {
let code = v["code"].as_str().unwrap_or("unknown");
if code == "invalid_token" {
anyhow::bail!("invalid token — run: room join {}", target.path.display());
}
anyhow::bail!("broker error: {code}");
}
let msg: Message = serde_json::from_value(v)
.map_err(|e| anyhow::anyhow!("broker returned unexpected JSON: {e}"))?;
Ok(msg)
}
pub async fn join_session(socket_path: &Path, username: &str) -> anyhow::Result<(String, String)> {
join_session_target(
&SocketTarget {
path: socket_path.to_owned(),
daemon_room: None,
},
username,
)
.await
}
pub async fn join_session_target(
target: &SocketTarget,
username: &str,
) -> anyhow::Result<(String, String)> {
let stream = UnixStream::connect(&target.path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to broker at {}: {e}", target.path.display())
})?;
let (r, mut w) = stream.into_split();
let handshake = target.handshake_line(&format!("JOIN:{username}"));
w.write_all(format!("{handshake}\n").as_bytes()).await?;
let mut reader = BufReader::new(r);
let mut line = String::new();
reader.read_line(&mut line).await?;
let v: serde_json::Value = serde_json::from_str(line.trim())
.map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
if v["type"] == "error" {
let code = v["code"].as_str().unwrap_or("unknown");
if code == "username_taken" {
anyhow::bail!("username '{}' is already in use in this room", username);
}
anyhow::bail!("broker error: {code}");
}
let token = v["token"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("broker response missing 'token' field"))?
.to_owned();
let returned_user = v["username"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("broker response missing 'username' field"))?
.to_owned();
Ok((returned_user, token))
}
pub async fn global_join_session(
socket_path: &Path,
username: &str,
) -> anyhow::Result<(String, String)> {
let stream = UnixStream::connect(socket_path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
})?;
let (r, mut w) = stream.into_split();
w.write_all(format!("JOIN:{username}\n").as_bytes()).await?;
let mut reader = BufReader::new(r);
let mut line = String::new();
reader.read_line(&mut line).await?;
let v: serde_json::Value = serde_json::from_str(line.trim())
.map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
if v["type"] == "error" {
let code = v["code"].as_str().unwrap_or("unknown");
anyhow::bail!("daemon error: {code}");
}
let token = v["token"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("daemon response missing 'token' field"))?
.to_owned();
let returned_user = v["username"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("daemon response missing 'username' field"))?
.to_owned();
Ok((returned_user, token))
}
pub async fn create_room(
socket_path: &Path,
room_id: &str,
config_json: &str,
) -> anyhow::Result<serde_json::Value> {
let stream = UnixStream::connect(socket_path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
})?;
let (r, mut w) = stream.into_split();
w.write_all(format!("CREATE:{room_id}\n").as_bytes())
.await?;
w.write_all(format!("{config_json}\n").as_bytes()).await?;
let mut reader = BufReader::new(r);
let mut line = String::new();
reader.read_line(&mut line).await?;
let v: serde_json::Value = serde_json::from_str(line.trim())
.map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
if v["type"] == "error" {
let message = v["message"].as_str().unwrap_or("unknown error");
anyhow::bail!("{message}");
}
Ok(v)
}
pub async fn destroy_room(socket_path: &Path, room_id: &str) -> anyhow::Result<serde_json::Value> {
let stream = UnixStream::connect(socket_path).await.map_err(|e| {
anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
})?;
let (r, mut w) = stream.into_split();
w.write_all(format!("DESTROY:{room_id}\n").as_bytes())
.await?;
let mut reader = BufReader::new(r);
let mut line = String::new();
reader.read_line(&mut line).await?;
let v: serde_json::Value = serde_json::from_str(line.trim())
.map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
if v["type"] == "error" {
let message = v["message"]
.as_str()
.unwrap_or(v["code"].as_str().unwrap_or("unknown error"));
anyhow::bail!("{message}");
}
Ok(v)
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn per_room_target(room_id: &str) -> SocketTarget {
SocketTarget {
path: PathBuf::from(format!("/tmp/room-{room_id}.sock")),
daemon_room: None,
}
}
fn daemon_target(room_id: &str) -> SocketTarget {
SocketTarget {
path: PathBuf::from("/tmp/roomd.sock"),
daemon_room: Some(room_id.to_owned()),
}
}
#[test]
fn per_room_token_handshake_no_prefix() {
let t = per_room_target("myroom");
assert_eq!(t.handshake_line("TOKEN:abc-123"), "TOKEN:abc-123");
}
#[test]
fn daemon_token_handshake_has_room_prefix() {
let t = daemon_target("myroom");
assert_eq!(
t.handshake_line("TOKEN:abc-123"),
"ROOM:myroom:TOKEN:abc-123"
);
}
#[test]
fn per_room_join_handshake_no_prefix() {
let t = per_room_target("chat");
assert_eq!(t.handshake_line("JOIN:alice"), "JOIN:alice");
}
#[test]
fn daemon_join_handshake_has_room_prefix() {
let t = daemon_target("chat");
assert_eq!(t.handshake_line("JOIN:alice"), "ROOM:chat:JOIN:alice");
}
#[test]
fn daemon_handshake_with_hyphen_room_id() {
let t = daemon_target("agent-room-2");
assert_eq!(
t.handshake_line("TOKEN:uuid"),
"ROOM:agent-room-2:TOKEN:uuid"
);
}
fn room_bin() -> PathBuf {
let bin = std::env::current_exe()
.unwrap()
.parent()
.unwrap()
.parent()
.unwrap()
.join("room");
assert!(bin.exists(), "room binary not found at {}", bin.display());
bin
}
#[tokio::test]
#[ignore = "spawns a real daemon process; run explicitly with `cargo test -- --ignored`"]
async fn ensure_daemon_noop_when_socket_connectable() {
let dir = tempfile::TempDir::new().unwrap();
let socket = dir.path().join("roomd.sock");
let exe = room_bin();
let mut child = tokio::process::Command::new(&exe)
.args(["daemon", "--socket"])
.arg(&socket)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("failed to spawn room daemon");
for _ in 0..200 {
if tokio::net::UnixStream::connect(&socket).await.is_ok() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
assert!(
tokio::net::UnixStream::connect(&socket).await.is_ok(),
"daemon socket not ready"
);
ensure_daemon_running_at(&socket, &exe).await.unwrap();
child.kill().await.ok();
}
#[tokio::test]
#[ignore = "spawns a real daemon process; run explicitly with `cargo test -- --ignored`"]
async fn ensure_daemon_starts_daemon_and_writes_pid() {
let dir = tempfile::TempDir::new().unwrap();
let socket = dir.path().join("autostart.sock");
let exe = room_bin();
ensure_daemon_running_at(&socket, &exe).await.unwrap();
assert!(
tokio::net::UnixStream::connect(&socket).await.is_ok(),
"daemon socket not connectable after auto-start"
);
}
#[test]
fn resolve_explicit_per_room_socket_is_not_daemon() {
let per_room = crate::paths::room_single_socket_path("myroom");
let target = resolve_socket_target("myroom", Some(&per_room));
assert_eq!(target.path, per_room);
assert!(
target.daemon_room.is_none(),
"per-room socket should not set daemon_room"
);
}
#[test]
fn resolve_explicit_daemon_socket_is_daemon() {
let daemon_sock = PathBuf::from("/tmp/roomd.sock");
let target = resolve_socket_target("myroom", Some(&daemon_sock));
assert_eq!(target.path, daemon_sock);
assert_eq!(target.daemon_room.as_deref(), Some("myroom"));
}
#[test]
fn resolve_explicit_custom_path_is_daemon() {
let custom = PathBuf::from("/var/run/roomd-test.sock");
let target = resolve_socket_target("chat", Some(&custom));
assert_eq!(target.path, custom);
assert_eq!(target.daemon_room.as_deref(), Some("chat"));
}
#[test]
fn resolve_auto_no_daemon_falls_back_to_per_room() {
let daemon_path = crate::paths::room_socket_path();
if !daemon_path.exists() {
let target = resolve_socket_target("myroom", None);
assert_eq!(target.path, crate::paths::room_single_socket_path("myroom"));
assert!(target.daemon_room.is_none());
}
}
static TRANSPORT_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn resolve_daemon_binary_uses_room_binary_env() {
let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
let key = "ROOM_BINARY";
let prev = std::env::var(key).ok();
let target = std::env::current_exe().unwrap();
std::env::set_var(key, &target);
let result = resolve_daemon_binary().unwrap();
assert_eq!(result, target, "should use ROOM_BINARY when set");
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
#[test]
fn resolve_daemon_binary_ignores_nonexistent_room_binary() {
let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
let key = "ROOM_BINARY";
let prev = std::env::var(key).ok();
std::env::set_var(key, "/nonexistent/path/to/room");
let result = resolve_daemon_binary().unwrap();
assert_ne!(
result,
std::path::PathBuf::from("/nonexistent/path/to/room"),
"should skip ROOM_BINARY when path does not exist"
);
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
#[test]
fn resolve_daemon_binary_falls_back_without_env() {
let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
let key = "ROOM_BINARY";
let prev = std::env::var(key).ok();
std::env::remove_var(key);
let result = resolve_daemon_binary().unwrap();
assert!(result.exists(), "resolved binary should exist: {result:?}");
match prev {
Some(v) => std::env::set_var(key, v),
None => std::env::remove_var(key),
}
}
}