1use std::path::Path;
2
3use tokio::{
4 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5 net::UnixStream,
6};
7
8use crate::message::Message;
9
10#[derive(Debug, Clone)]
21pub struct SocketTarget {
22 pub path: std::path::PathBuf,
24 pub daemon_room: Option<String>,
26}
27
28impl SocketTarget {
29 fn handshake_line(&self, token_line: &str) -> String {
34 match &self.daemon_room {
35 Some(room_id) => format!("ROOM:{room_id}:{token_line}"),
36 None => token_line.to_owned(),
37 }
38 }
39}
40
41pub fn resolve_socket_target(room_id: &str, explicit: Option<&Path>) -> SocketTarget {
49 let path = explicit
50 .map(|p| p.to_owned())
51 .unwrap_or_else(|| crate::paths::effective_socket_path(None));
52 SocketTarget {
53 path,
54 daemon_room: Some(room_id.to_owned()),
55 }
56}
57
58const DAEMON_POLL_INTERVAL_MS: u64 = 50;
61const DAEMON_START_TIMEOUT_MS: u64 = 5_000;
62
63pub async fn ensure_daemon_running() -> anyhow::Result<()> {
78 let exe = resolve_daemon_binary()?;
79 ensure_daemon_running_impl(&crate::paths::effective_socket_path(None), &exe).await
81}
82
83fn resolve_daemon_binary() -> anyhow::Result<std::path::PathBuf> {
94 if let Ok(p) = std::env::var("ROOM_BINARY") {
96 let path = std::path::PathBuf::from(&p);
97 if path.exists() {
98 return Ok(path);
99 }
100 }
101
102 if let Ok(output) = std::process::Command::new("which").arg("room").output() {
104 if output.status.success() {
105 let path_str = String::from_utf8_lossy(&output.stdout);
106 let path = std::path::PathBuf::from(path_str.trim());
107 if path.exists() {
108 return Ok(path);
109 }
110 }
111 }
112
113 std::env::current_exe().map_err(|e| anyhow::anyhow!("cannot resolve daemon binary: {e}"))
115}
116
117#[cfg(test)]
120pub(crate) async fn ensure_daemon_running_at(
121 socket: &Path,
122 exe: &std::path::Path,
123) -> anyhow::Result<()> {
124 ensure_daemon_running_impl(socket, exe).await
125}
126
127async fn ensure_daemon_running_impl(socket: &Path, exe: &Path) -> anyhow::Result<()> {
128 if UnixStream::connect(socket).await.is_ok() {
130 return Ok(());
131 }
132
133 let child = std::process::Command::new(exe)
134 .arg("daemon")
135 .arg("--socket")
136 .arg(socket)
137 .stdin(std::process::Stdio::null())
138 .stdout(std::process::Stdio::null())
139 .stderr(std::process::Stdio::null())
140 .spawn()
141 .map_err(|e| anyhow::anyhow!("failed to spawn daemon ({}): {e}", exe.display()))?;
142
143 let pid_path = crate::paths::room_pid_path();
145 let _ = std::fs::write(&pid_path, child.id().to_string());
146
147 let deadline =
149 tokio::time::Instant::now() + tokio::time::Duration::from_millis(DAEMON_START_TIMEOUT_MS);
150
151 loop {
152 if UnixStream::connect(socket).await.is_ok() {
153 return Ok(());
154 }
155 if tokio::time::Instant::now() >= deadline {
156 anyhow::bail!(
157 "daemon failed to start within {}ms (socket: {})",
158 DAEMON_START_TIMEOUT_MS,
159 socket.display()
160 );
161 }
162 tokio::time::sleep(tokio::time::Duration::from_millis(DAEMON_POLL_INTERVAL_MS)).await;
163 }
164}
165
166#[deprecated(
176 since = "3.1.0",
177 note = "SEND: handshake is unauthenticated; use send_message_with_token instead"
178)]
179pub async fn send_message(
180 socket_path: &Path,
181 username: &str,
182 content: &str,
183) -> anyhow::Result<Message> {
184 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
185 anyhow::anyhow!("cannot connect to broker at {}: {e}", socket_path.display())
186 })?;
187 let (r, mut w) = stream.into_split();
188 w.write_all(format!("SEND:{username}\n").as_bytes()).await?;
189 w.write_all(format!("{content}\n").as_bytes()).await?;
190
191 let mut reader = BufReader::new(r);
192 let mut line = String::new();
193 reader.read_line(&mut line).await?;
194 let msg: Message = serde_json::from_str(line.trim())
195 .map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
196 Ok(msg)
197}
198
199pub async fn send_message_with_token(
206 socket_path: &Path,
207 token: &str,
208 content: &str,
209) -> anyhow::Result<Message> {
210 send_message_with_token_target(
211 &SocketTarget {
212 path: socket_path.to_owned(),
213 daemon_room: None,
214 },
215 token,
216 content,
217 )
218 .await
219}
220
221pub async fn send_message_with_token_target(
224 target: &SocketTarget,
225 token: &str,
226 content: &str,
227) -> anyhow::Result<Message> {
228 let stream = UnixStream::connect(&target.path).await.map_err(|e| {
229 anyhow::anyhow!("cannot connect to broker at {}: {e}", target.path.display())
230 })?;
231 let (r, mut w) = stream.into_split();
232 let handshake = target.handshake_line(&format!("TOKEN:{token}"));
233 w.write_all(format!("{handshake}\n").as_bytes()).await?;
234 w.write_all(format!("{content}\n").as_bytes()).await?;
236
237 let mut reader = BufReader::new(r);
238 let mut line = String::new();
239 reader.read_line(&mut line).await?;
240 let v: serde_json::Value = serde_json::from_str(line.trim())
242 .map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
243 if v["type"] == "error" {
244 let code = v["code"].as_str().unwrap_or("unknown");
245 if code == "invalid_token" {
246 anyhow::bail!("invalid token — run: room join {}", target.path.display());
247 }
248 anyhow::bail!("broker error: {code}");
249 }
250 let msg: Message = serde_json::from_value(v)
251 .map_err(|e| anyhow::anyhow!("broker returned unexpected JSON: {e}"))?;
252 Ok(msg)
253}
254
255pub async fn join_session(socket_path: &Path, username: &str) -> anyhow::Result<(String, String)> {
260 join_session_target(
261 &SocketTarget {
262 path: socket_path.to_owned(),
263 daemon_room: None,
264 },
265 username,
266 )
267 .await
268}
269
270pub async fn join_session_target(
272 target: &SocketTarget,
273 username: &str,
274) -> anyhow::Result<(String, String)> {
275 let stream = UnixStream::connect(&target.path).await.map_err(|e| {
276 anyhow::anyhow!("cannot connect to broker at {}: {e}", target.path.display())
277 })?;
278 let (r, mut w) = stream.into_split();
279 let handshake = target.handshake_line(&format!("JOIN:{username}"));
280 w.write_all(format!("{handshake}\n").as_bytes()).await?;
281
282 let mut reader = BufReader::new(r);
283 let mut line = String::new();
284 reader.read_line(&mut line).await?;
285 let v: serde_json::Value = serde_json::from_str(line.trim())
286 .map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
287 if v["type"] == "error" {
288 let code = v["code"].as_str().unwrap_or("unknown");
289 if code == "username_taken" {
290 anyhow::bail!("username '{}' is already in use in this room", username);
291 }
292 anyhow::bail!("broker error: {code}");
293 }
294 let token = v["token"]
295 .as_str()
296 .ok_or_else(|| anyhow::anyhow!("broker response missing 'token' field"))?
297 .to_owned();
298 let returned_user = v["username"]
299 .as_str()
300 .ok_or_else(|| anyhow::anyhow!("broker response missing 'username' field"))?
301 .to_owned();
302 Ok((returned_user, token))
303}
304
305pub async fn global_join_session(
311 socket_path: &Path,
312 username: &str,
313) -> anyhow::Result<(String, String)> {
314 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
315 anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
316 })?;
317 let (r, mut w) = stream.into_split();
318 w.write_all(format!("JOIN:{username}\n").as_bytes()).await?;
319
320 let mut reader = BufReader::new(r);
321 let mut line = String::new();
322 reader.read_line(&mut line).await?;
323 let v: serde_json::Value = serde_json::from_str(line.trim())
324 .map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
325 if v["type"] == "error" {
326 let code = v["code"].as_str().unwrap_or("unknown");
327 anyhow::bail!("daemon error: {code}");
328 }
329 let token = v["token"]
330 .as_str()
331 .ok_or_else(|| anyhow::anyhow!("daemon response missing 'token' field"))?
332 .to_owned();
333 let returned_user = v["username"]
334 .as_str()
335 .ok_or_else(|| anyhow::anyhow!("daemon response missing 'username' field"))?
336 .to_owned();
337 Ok((returned_user, token))
338}
339
340pub async fn create_room(
351 socket_path: &Path,
352 room_id: &str,
353 config_json: &str,
354) -> anyhow::Result<serde_json::Value> {
355 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
356 anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
357 })?;
358 let (r, mut w) = stream.into_split();
359 w.write_all(format!("CREATE:{room_id}\n").as_bytes())
360 .await?;
361 w.write_all(format!("{config_json}\n").as_bytes()).await?;
362
363 let mut reader = BufReader::new(r);
364 let mut line = String::new();
365 reader.read_line(&mut line).await?;
366 let v: serde_json::Value = serde_json::from_str(line.trim())
367 .map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
368 if v["type"] == "error" {
369 let message = v["message"].as_str().unwrap_or("unknown error");
370 anyhow::bail!("{message}");
371 }
372 Ok(v)
373}
374
375pub async fn destroy_room(
383 socket_path: &Path,
384 room_id: &str,
385 token: &str,
386) -> anyhow::Result<serde_json::Value> {
387 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
388 anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
389 })?;
390 let (r, mut w) = stream.into_split();
391 w.write_all(format!("DESTROY:{room_id}\n").as_bytes())
392 .await?;
393 w.write_all(format!("{token}\n").as_bytes()).await?;
394
395 let mut reader = BufReader::new(r);
396 let mut line = String::new();
397 reader.read_line(&mut line).await?;
398 let v: serde_json::Value = serde_json::from_str(line.trim())
399 .map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
400 if v["type"] == "error" {
401 let message = v["message"]
402 .as_str()
403 .unwrap_or(v["code"].as_str().unwrap_or("unknown error"));
404 anyhow::bail!("{message}");
405 }
406 Ok(v)
407}
408
409pub fn inject_token_into_config(config_json: &str, token: &str) -> String {
414 if let Ok(mut v) = serde_json::from_str::<serde_json::Value>(config_json) {
415 if let Some(obj) = v.as_object_mut() {
416 obj.insert(
417 "token".to_owned(),
418 serde_json::Value::String(token.to_owned()),
419 );
420 return serde_json::to_string(&v).unwrap_or_default();
421 }
422 }
423 serde_json::json!({"token": token}).to_string()
425}
426
427#[cfg(test)]
430mod tests {
431 use super::*;
432 use std::path::PathBuf;
433
434 fn per_room_target(room_id: &str) -> SocketTarget {
435 SocketTarget {
436 path: PathBuf::from(format!("/tmp/room-{room_id}.sock")),
437 daemon_room: None,
438 }
439 }
440
441 fn daemon_target(room_id: &str) -> SocketTarget {
442 SocketTarget {
443 path: PathBuf::from("/tmp/roomd.sock"),
444 daemon_room: Some(room_id.to_owned()),
445 }
446 }
447
448 #[test]
451 fn per_room_token_handshake_no_prefix() {
452 let t = per_room_target("myroom");
453 assert_eq!(t.handshake_line("TOKEN:abc-123"), "TOKEN:abc-123");
454 }
455
456 #[test]
457 fn daemon_token_handshake_has_room_prefix() {
458 let t = daemon_target("myroom");
459 assert_eq!(
460 t.handshake_line("TOKEN:abc-123"),
461 "ROOM:myroom:TOKEN:abc-123"
462 );
463 }
464
465 #[test]
466 fn per_room_join_handshake_no_prefix() {
467 let t = per_room_target("chat");
468 assert_eq!(t.handshake_line("JOIN:alice"), "JOIN:alice");
469 }
470
471 #[test]
472 fn daemon_join_handshake_has_room_prefix() {
473 let t = daemon_target("chat");
474 assert_eq!(t.handshake_line("JOIN:alice"), "ROOM:chat:JOIN:alice");
475 }
476
477 #[test]
478 fn daemon_handshake_with_hyphen_room_id() {
479 let t = daemon_target("agent-room-2");
480 assert_eq!(
481 t.handshake_line("TOKEN:uuid"),
482 "ROOM:agent-room-2:TOKEN:uuid"
483 );
484 }
485
486 fn room_bin() -> PathBuf {
495 let bin = std::env::current_exe()
496 .unwrap()
497 .parent()
498 .unwrap()
499 .parent()
500 .unwrap()
501 .join("room");
502 assert!(bin.exists(), "room binary not found at {}", bin.display());
503 bin
504 }
505
506 #[tokio::test]
509 #[ignore = "spawns a real daemon process; run explicitly with `cargo test -- --ignored`"]
510 async fn ensure_daemon_noop_when_socket_connectable() {
511 let dir = tempfile::TempDir::new().unwrap();
514 let socket = dir.path().join("roomd.sock");
515 let exe = room_bin();
516
517 let mut child = tokio::process::Command::new(&exe)
518 .args(["daemon", "--socket"])
519 .arg(&socket)
520 .stdin(std::process::Stdio::null())
521 .stdout(std::process::Stdio::null())
522 .stderr(std::process::Stdio::null())
523 .spawn()
524 .expect("failed to spawn room daemon");
525
526 for _ in 0..200 {
528 if tokio::net::UnixStream::connect(&socket).await.is_ok() {
529 break;
530 }
531 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
532 }
533 assert!(
534 tokio::net::UnixStream::connect(&socket).await.is_ok(),
535 "daemon socket not ready"
536 );
537
538 ensure_daemon_running_at(&socket, &exe).await.unwrap();
540
541 child.kill().await.ok();
542 }
543
544 #[tokio::test]
547 #[ignore = "spawns a real daemon process; run explicitly with `cargo test -- --ignored`"]
548 async fn ensure_daemon_starts_daemon_and_writes_pid() {
549 let dir = tempfile::TempDir::new().unwrap();
550 let socket = dir.path().join("autostart.sock");
551 let exe = room_bin();
552
553 ensure_daemon_running_at(&socket, &exe).await.unwrap();
555
556 assert!(
558 tokio::net::UnixStream::connect(&socket).await.is_ok(),
559 "daemon socket not connectable after auto-start"
560 );
561 }
568
569 #[test]
572 fn resolve_explicit_socket_is_daemon() {
573 let daemon_sock = PathBuf::from("/tmp/roomd.sock");
574 let target = resolve_socket_target("myroom", Some(&daemon_sock));
575 assert_eq!(target.path, daemon_sock);
576 assert_eq!(target.daemon_room.as_deref(), Some("myroom"));
577 }
578
579 #[test]
580 fn resolve_explicit_custom_path_is_daemon() {
581 let custom = PathBuf::from("/var/run/roomd-test.sock");
582 let target = resolve_socket_target("chat", Some(&custom));
583 assert_eq!(target.path, custom);
584 assert_eq!(target.daemon_room.as_deref(), Some("chat"));
585 }
586
587 #[test]
588 fn resolve_auto_uses_daemon_socket() {
589 let target = resolve_socket_target("myroom", None);
590 assert_eq!(target.daemon_room.as_deref(), Some("myroom"));
592 }
593
594 static TRANSPORT_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
598
599 #[test]
600 fn resolve_daemon_binary_uses_room_binary_env() {
601 let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
602 let key = "ROOM_BINARY";
603 let prev = std::env::var(key).ok();
604
605 let target = std::env::current_exe().unwrap();
607 std::env::set_var(key, &target);
608 let result = resolve_daemon_binary().unwrap();
609 assert_eq!(result, target, "should use ROOM_BINARY when set");
610
611 match prev {
612 Some(v) => std::env::set_var(key, v),
613 None => std::env::remove_var(key),
614 }
615 }
616
617 #[test]
618 fn resolve_daemon_binary_ignores_nonexistent_room_binary() {
619 let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
620 let key = "ROOM_BINARY";
621 let prev = std::env::var(key).ok();
622
623 std::env::set_var(key, "/nonexistent/path/to/room");
624 let result = resolve_daemon_binary().unwrap();
625 assert_ne!(
627 result,
628 std::path::PathBuf::from("/nonexistent/path/to/room"),
629 "should skip ROOM_BINARY when path does not exist"
630 );
631
632 match prev {
633 Some(v) => std::env::set_var(key, v),
634 None => std::env::remove_var(key),
635 }
636 }
637
638 #[test]
639 fn resolve_daemon_binary_falls_back_without_env() {
640 let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
641 let key = "ROOM_BINARY";
642 let prev = std::env::var(key).ok();
643
644 std::env::remove_var(key);
645 let result = resolve_daemon_binary().unwrap();
646 assert!(result.exists(), "resolved binary should exist: {result:?}");
648
649 match prev {
650 Some(v) => std::env::set_var(key, v),
651 None => std::env::remove_var(key),
652 }
653 }
654}