1use std::path::Path;
2
3use tokio::{
4 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5 net::UnixStream,
6};
7
8use crate::message::Message;
9
10#[derive(Debug, Clone)]
21pub struct SocketTarget {
22 pub path: std::path::PathBuf,
24 pub daemon_room: Option<String>,
26}
27
28impl SocketTarget {
29 fn handshake_line(&self, token_line: &str) -> String {
34 match &self.daemon_room {
35 Some(room_id) => format!("ROOM:{room_id}:{token_line}"),
36 None => token_line.to_owned(),
37 }
38 }
39}
40
41pub fn resolve_socket_target(room_id: &str, explicit: Option<&Path>) -> SocketTarget {
52 let per_room = crate::paths::room_single_socket_path(room_id);
53 let daemon = crate::paths::effective_socket_path(None);
55
56 if let Some(path) = explicit {
57 if path == per_room {
60 return SocketTarget {
61 path: path.to_owned(),
62 daemon_room: None,
63 };
64 }
65 return SocketTarget {
66 path: path.to_owned(),
67 daemon_room: Some(room_id.to_owned()),
68 };
69 }
70
71 if daemon.exists() {
73 SocketTarget {
74 path: daemon,
75 daemon_room: Some(room_id.to_owned()),
76 }
77 } else {
78 SocketTarget {
79 path: per_room,
80 daemon_room: None,
81 }
82 }
83}
84
85const DAEMON_POLL_INTERVAL_MS: u64 = 50;
88const DAEMON_START_TIMEOUT_MS: u64 = 5_000;
89
90pub async fn ensure_daemon_running() -> anyhow::Result<()> {
105 let exe = resolve_daemon_binary()?;
106 ensure_daemon_running_impl(&crate::paths::effective_socket_path(None), &exe).await
108}
109
110fn resolve_daemon_binary() -> anyhow::Result<std::path::PathBuf> {
121 if let Ok(p) = std::env::var("ROOM_BINARY") {
123 let path = std::path::PathBuf::from(&p);
124 if path.exists() {
125 return Ok(path);
126 }
127 }
128
129 if let Ok(output) = std::process::Command::new("which").arg("room").output() {
131 if output.status.success() {
132 let path_str = String::from_utf8_lossy(&output.stdout);
133 let path = std::path::PathBuf::from(path_str.trim());
134 if path.exists() {
135 return Ok(path);
136 }
137 }
138 }
139
140 std::env::current_exe().map_err(|e| anyhow::anyhow!("cannot resolve daemon binary: {e}"))
142}
143
144#[cfg(test)]
147pub(crate) async fn ensure_daemon_running_at(
148 socket: &Path,
149 exe: &std::path::Path,
150) -> anyhow::Result<()> {
151 ensure_daemon_running_impl(socket, exe).await
152}
153
154async fn ensure_daemon_running_impl(socket: &Path, exe: &Path) -> anyhow::Result<()> {
155 if UnixStream::connect(socket).await.is_ok() {
157 return Ok(());
158 }
159
160 let child = std::process::Command::new(exe)
161 .arg("daemon")
162 .arg("--socket")
163 .arg(socket)
164 .stdin(std::process::Stdio::null())
165 .stdout(std::process::Stdio::null())
166 .stderr(std::process::Stdio::null())
167 .spawn()
168 .map_err(|e| anyhow::anyhow!("failed to spawn daemon ({}): {e}", exe.display()))?;
169
170 let pid_path = crate::paths::room_pid_path();
172 let _ = std::fs::write(&pid_path, child.id().to_string());
173
174 let deadline =
176 tokio::time::Instant::now() + tokio::time::Duration::from_millis(DAEMON_START_TIMEOUT_MS);
177
178 loop {
179 if UnixStream::connect(socket).await.is_ok() {
180 return Ok(());
181 }
182 if tokio::time::Instant::now() >= deadline {
183 anyhow::bail!(
184 "daemon failed to start within {}ms (socket: {})",
185 DAEMON_START_TIMEOUT_MS,
186 socket.display()
187 );
188 }
189 tokio::time::sleep(tokio::time::Duration::from_millis(DAEMON_POLL_INTERVAL_MS)).await;
190 }
191}
192
193#[deprecated(
203 since = "3.1.0",
204 note = "SEND: handshake is unauthenticated; use send_message_with_token instead"
205)]
206pub async fn send_message(
207 socket_path: &Path,
208 username: &str,
209 content: &str,
210) -> anyhow::Result<Message> {
211 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
212 anyhow::anyhow!("cannot connect to broker at {}: {e}", socket_path.display())
213 })?;
214 let (r, mut w) = stream.into_split();
215 w.write_all(format!("SEND:{username}\n").as_bytes()).await?;
216 w.write_all(format!("{content}\n").as_bytes()).await?;
217
218 let mut reader = BufReader::new(r);
219 let mut line = String::new();
220 reader.read_line(&mut line).await?;
221 let msg: Message = serde_json::from_str(line.trim())
222 .map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
223 Ok(msg)
224}
225
226pub async fn send_message_with_token(
233 socket_path: &Path,
234 token: &str,
235 content: &str,
236) -> anyhow::Result<Message> {
237 send_message_with_token_target(
238 &SocketTarget {
239 path: socket_path.to_owned(),
240 daemon_room: None,
241 },
242 token,
243 content,
244 )
245 .await
246}
247
248pub async fn send_message_with_token_target(
251 target: &SocketTarget,
252 token: &str,
253 content: &str,
254) -> anyhow::Result<Message> {
255 let stream = UnixStream::connect(&target.path).await.map_err(|e| {
256 anyhow::anyhow!("cannot connect to broker at {}: {e}", target.path.display())
257 })?;
258 let (r, mut w) = stream.into_split();
259 let handshake = target.handshake_line(&format!("TOKEN:{token}"));
260 w.write_all(format!("{handshake}\n").as_bytes()).await?;
261 w.write_all(format!("{content}\n").as_bytes()).await?;
263
264 let mut reader = BufReader::new(r);
265 let mut line = String::new();
266 reader.read_line(&mut line).await?;
267 let v: serde_json::Value = serde_json::from_str(line.trim())
269 .map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
270 if v["type"] == "error" {
271 let code = v["code"].as_str().unwrap_or("unknown");
272 if code == "invalid_token" {
273 anyhow::bail!("invalid token — run: room join {}", target.path.display());
274 }
275 anyhow::bail!("broker error: {code}");
276 }
277 let msg: Message = serde_json::from_value(v)
278 .map_err(|e| anyhow::anyhow!("broker returned unexpected JSON: {e}"))?;
279 Ok(msg)
280}
281
282pub async fn join_session(socket_path: &Path, username: &str) -> anyhow::Result<(String, String)> {
287 join_session_target(
288 &SocketTarget {
289 path: socket_path.to_owned(),
290 daemon_room: None,
291 },
292 username,
293 )
294 .await
295}
296
297pub async fn join_session_target(
299 target: &SocketTarget,
300 username: &str,
301) -> anyhow::Result<(String, String)> {
302 let stream = UnixStream::connect(&target.path).await.map_err(|e| {
303 anyhow::anyhow!("cannot connect to broker at {}: {e}", target.path.display())
304 })?;
305 let (r, mut w) = stream.into_split();
306 let handshake = target.handshake_line(&format!("JOIN:{username}"));
307 w.write_all(format!("{handshake}\n").as_bytes()).await?;
308
309 let mut reader = BufReader::new(r);
310 let mut line = String::new();
311 reader.read_line(&mut line).await?;
312 let v: serde_json::Value = serde_json::from_str(line.trim())
313 .map_err(|e| anyhow::anyhow!("broker returned invalid JSON: {e}: {:?}", line.trim()))?;
314 if v["type"] == "error" {
315 let code = v["code"].as_str().unwrap_or("unknown");
316 if code == "username_taken" {
317 anyhow::bail!("username '{}' is already in use in this room", username);
318 }
319 anyhow::bail!("broker error: {code}");
320 }
321 let token = v["token"]
322 .as_str()
323 .ok_or_else(|| anyhow::anyhow!("broker response missing 'token' field"))?
324 .to_owned();
325 let returned_user = v["username"]
326 .as_str()
327 .ok_or_else(|| anyhow::anyhow!("broker response missing 'username' field"))?
328 .to_owned();
329 Ok((returned_user, token))
330}
331
332pub async fn global_join_session(
338 socket_path: &Path,
339 username: &str,
340) -> anyhow::Result<(String, String)> {
341 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
342 anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
343 })?;
344 let (r, mut w) = stream.into_split();
345 w.write_all(format!("JOIN:{username}\n").as_bytes()).await?;
346
347 let mut reader = BufReader::new(r);
348 let mut line = String::new();
349 reader.read_line(&mut line).await?;
350 let v: serde_json::Value = serde_json::from_str(line.trim())
351 .map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
352 if v["type"] == "error" {
353 let code = v["code"].as_str().unwrap_or("unknown");
354 anyhow::bail!("daemon error: {code}");
355 }
356 let token = v["token"]
357 .as_str()
358 .ok_or_else(|| anyhow::anyhow!("daemon response missing 'token' field"))?
359 .to_owned();
360 let returned_user = v["username"]
361 .as_str()
362 .ok_or_else(|| anyhow::anyhow!("daemon response missing 'username' field"))?
363 .to_owned();
364 Ok((returned_user, token))
365}
366
367pub async fn create_room(
378 socket_path: &Path,
379 room_id: &str,
380 config_json: &str,
381) -> anyhow::Result<serde_json::Value> {
382 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
383 anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
384 })?;
385 let (r, mut w) = stream.into_split();
386 w.write_all(format!("CREATE:{room_id}\n").as_bytes())
387 .await?;
388 w.write_all(format!("{config_json}\n").as_bytes()).await?;
389
390 let mut reader = BufReader::new(r);
391 let mut line = String::new();
392 reader.read_line(&mut line).await?;
393 let v: serde_json::Value = serde_json::from_str(line.trim())
394 .map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
395 if v["type"] == "error" {
396 let message = v["message"].as_str().unwrap_or("unknown error");
397 anyhow::bail!("{message}");
398 }
399 Ok(v)
400}
401
402pub async fn destroy_room(
410 socket_path: &Path,
411 room_id: &str,
412 token: &str,
413) -> anyhow::Result<serde_json::Value> {
414 let stream = UnixStream::connect(socket_path).await.map_err(|e| {
415 anyhow::anyhow!("cannot connect to daemon at {}: {e}", socket_path.display())
416 })?;
417 let (r, mut w) = stream.into_split();
418 w.write_all(format!("DESTROY:{room_id}\n").as_bytes())
419 .await?;
420 w.write_all(format!("{token}\n").as_bytes()).await?;
421
422 let mut reader = BufReader::new(r);
423 let mut line = String::new();
424 reader.read_line(&mut line).await?;
425 let v: serde_json::Value = serde_json::from_str(line.trim())
426 .map_err(|e| anyhow::anyhow!("daemon returned invalid JSON: {e}: {:?}", line.trim()))?;
427 if v["type"] == "error" {
428 let message = v["message"]
429 .as_str()
430 .unwrap_or(v["code"].as_str().unwrap_or("unknown error"));
431 anyhow::bail!("{message}");
432 }
433 Ok(v)
434}
435
436pub fn inject_token_into_config(config_json: &str, token: &str) -> String {
441 if let Ok(mut v) = serde_json::from_str::<serde_json::Value>(config_json) {
442 if let Some(obj) = v.as_object_mut() {
443 obj.insert(
444 "token".to_owned(),
445 serde_json::Value::String(token.to_owned()),
446 );
447 return serde_json::to_string(&v).unwrap_or_default();
448 }
449 }
450 serde_json::json!({"token": token}).to_string()
452}
453
454#[cfg(test)]
457mod tests {
458 use super::*;
459 use std::path::PathBuf;
460
461 fn per_room_target(room_id: &str) -> SocketTarget {
462 SocketTarget {
463 path: PathBuf::from(format!("/tmp/room-{room_id}.sock")),
464 daemon_room: None,
465 }
466 }
467
468 fn daemon_target(room_id: &str) -> SocketTarget {
469 SocketTarget {
470 path: PathBuf::from("/tmp/roomd.sock"),
471 daemon_room: Some(room_id.to_owned()),
472 }
473 }
474
475 #[test]
478 fn per_room_token_handshake_no_prefix() {
479 let t = per_room_target("myroom");
480 assert_eq!(t.handshake_line("TOKEN:abc-123"), "TOKEN:abc-123");
481 }
482
483 #[test]
484 fn daemon_token_handshake_has_room_prefix() {
485 let t = daemon_target("myroom");
486 assert_eq!(
487 t.handshake_line("TOKEN:abc-123"),
488 "ROOM:myroom:TOKEN:abc-123"
489 );
490 }
491
492 #[test]
493 fn per_room_join_handshake_no_prefix() {
494 let t = per_room_target("chat");
495 assert_eq!(t.handshake_line("JOIN:alice"), "JOIN:alice");
496 }
497
498 #[test]
499 fn daemon_join_handshake_has_room_prefix() {
500 let t = daemon_target("chat");
501 assert_eq!(t.handshake_line("JOIN:alice"), "ROOM:chat:JOIN:alice");
502 }
503
504 #[test]
505 fn daemon_handshake_with_hyphen_room_id() {
506 let t = daemon_target("agent-room-2");
507 assert_eq!(
508 t.handshake_line("TOKEN:uuid"),
509 "ROOM:agent-room-2:TOKEN:uuid"
510 );
511 }
512
513 fn room_bin() -> PathBuf {
522 let bin = std::env::current_exe()
523 .unwrap()
524 .parent()
525 .unwrap()
526 .parent()
527 .unwrap()
528 .join("room");
529 assert!(bin.exists(), "room binary not found at {}", bin.display());
530 bin
531 }
532
533 #[tokio::test]
536 #[ignore = "spawns a real daemon process; run explicitly with `cargo test -- --ignored`"]
537 async fn ensure_daemon_noop_when_socket_connectable() {
538 let dir = tempfile::TempDir::new().unwrap();
541 let socket = dir.path().join("roomd.sock");
542 let exe = room_bin();
543
544 let mut child = tokio::process::Command::new(&exe)
545 .args(["daemon", "--socket"])
546 .arg(&socket)
547 .stdin(std::process::Stdio::null())
548 .stdout(std::process::Stdio::null())
549 .stderr(std::process::Stdio::null())
550 .spawn()
551 .expect("failed to spawn room daemon");
552
553 for _ in 0..200 {
555 if tokio::net::UnixStream::connect(&socket).await.is_ok() {
556 break;
557 }
558 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
559 }
560 assert!(
561 tokio::net::UnixStream::connect(&socket).await.is_ok(),
562 "daemon socket not ready"
563 );
564
565 ensure_daemon_running_at(&socket, &exe).await.unwrap();
567
568 child.kill().await.ok();
569 }
570
571 #[tokio::test]
574 #[ignore = "spawns a real daemon process; run explicitly with `cargo test -- --ignored`"]
575 async fn ensure_daemon_starts_daemon_and_writes_pid() {
576 let dir = tempfile::TempDir::new().unwrap();
577 let socket = dir.path().join("autostart.sock");
578 let exe = room_bin();
579
580 ensure_daemon_running_at(&socket, &exe).await.unwrap();
582
583 assert!(
585 tokio::net::UnixStream::connect(&socket).await.is_ok(),
586 "daemon socket not connectable after auto-start"
587 );
588 }
595
596 #[test]
599 fn resolve_explicit_per_room_socket_is_not_daemon() {
600 let per_room = crate::paths::room_single_socket_path("myroom");
601 let target = resolve_socket_target("myroom", Some(&per_room));
602 assert_eq!(target.path, per_room);
603 assert!(
604 target.daemon_room.is_none(),
605 "per-room socket should not set daemon_room"
606 );
607 }
608
609 #[test]
610 fn resolve_explicit_daemon_socket_is_daemon() {
611 let daemon_sock = PathBuf::from("/tmp/roomd.sock");
612 let target = resolve_socket_target("myroom", Some(&daemon_sock));
613 assert_eq!(target.path, daemon_sock);
614 assert_eq!(target.daemon_room.as_deref(), Some("myroom"));
615 }
616
617 #[test]
618 fn resolve_explicit_custom_path_is_daemon() {
619 let custom = PathBuf::from("/var/run/roomd-test.sock");
620 let target = resolve_socket_target("chat", Some(&custom));
621 assert_eq!(target.path, custom);
622 assert_eq!(target.daemon_room.as_deref(), Some("chat"));
623 }
624
625 #[test]
626 fn resolve_auto_no_daemon_falls_back_to_per_room() {
627 let daemon_path = crate::paths::room_socket_path();
631 if !daemon_path.exists() {
632 let target = resolve_socket_target("myroom", None);
633 assert_eq!(target.path, crate::paths::room_single_socket_path("myroom"));
634 assert!(target.daemon_room.is_none());
635 }
636 }
638
639 static TRANSPORT_ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
643
644 #[test]
645 fn resolve_daemon_binary_uses_room_binary_env() {
646 let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
647 let key = "ROOM_BINARY";
648 let prev = std::env::var(key).ok();
649
650 let target = std::env::current_exe().unwrap();
652 std::env::set_var(key, &target);
653 let result = resolve_daemon_binary().unwrap();
654 assert_eq!(result, target, "should use ROOM_BINARY when set");
655
656 match prev {
657 Some(v) => std::env::set_var(key, v),
658 None => std::env::remove_var(key),
659 }
660 }
661
662 #[test]
663 fn resolve_daemon_binary_ignores_nonexistent_room_binary() {
664 let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
665 let key = "ROOM_BINARY";
666 let prev = std::env::var(key).ok();
667
668 std::env::set_var(key, "/nonexistent/path/to/room");
669 let result = resolve_daemon_binary().unwrap();
670 assert_ne!(
672 result,
673 std::path::PathBuf::from("/nonexistent/path/to/room"),
674 "should skip ROOM_BINARY when path does not exist"
675 );
676
677 match prev {
678 Some(v) => std::env::set_var(key, v),
679 None => std::env::remove_var(key),
680 }
681 }
682
683 #[test]
684 fn resolve_daemon_binary_falls_back_without_env() {
685 let _lock = TRANSPORT_ENV_LOCK.lock().unwrap();
686 let key = "ROOM_BINARY";
687 let prev = std::env::var(key).ok();
688
689 std::env::remove_var(key);
690 let result = resolve_daemon_binary().unwrap();
691 assert!(result.exists(), "resolved binary should exist: {result:?}");
693
694 match prev {
695 Some(v) => std::env::set_var(key, v),
696 None => std::env::remove_var(key),
697 }
698 }
699}