1mod config;
34mod dispatcher;
35mod lifecycle;
36mod migration;
37mod pid;
38
39pub use config::{validate_room_id, DaemonConfig};
40pub(crate) use lifecycle::create_room_entry;
41pub use pid::{is_pid_alive, remove_pid_file, write_pid_file};
42
43use std::{
44 collections::HashMap,
45 sync::{
46 atomic::{AtomicU64, AtomicUsize, Ordering},
47 Arc,
48 },
49};
50
51use tokio::{
52 net::UnixListener,
53 sync::{watch, Mutex},
54};
55
56use crate::registry::UserRegistry;
57
58use super::{
59 service::CrossRoomResolver,
60 state::{RoomState, TokenMap},
61 ws::{self, DaemonWsState},
62};
63
64use dispatcher::dispatch_connection;
65use migration::load_or_migrate_registry;
66
67pub(crate) type RoomMap = Arc<Mutex<HashMap<String, Arc<RoomState>>>>;
69
70pub struct DaemonState {
72 pub(crate) rooms: RoomMap,
73 pub(crate) config: DaemonConfig,
74 pub(crate) next_client_id: Arc<AtomicU64>,
76 pub(crate) shutdown: Arc<watch::Sender<bool>>,
78 pub(crate) system_token_map: TokenMap,
85 pub(crate) user_registry: Arc<tokio::sync::Mutex<UserRegistry>>,
92 pub(crate) connection_count: Arc<AtomicUsize>,
98}
99
100impl DaemonState {
101 pub fn new(config: DaemonConfig) -> Self {
103 let (shutdown_tx, _) = watch::channel(false);
104
105 let registry = load_or_migrate_registry(&config);
112
113 let token_snapshot = registry.token_snapshot();
116
117 Self {
118 rooms: Arc::new(Mutex::new(HashMap::new())),
119 config,
120 next_client_id: Arc::new(AtomicU64::new(0)),
121 shutdown: Arc::new(shutdown_tx),
122 system_token_map: Arc::new(Mutex::new(token_snapshot)),
123 user_registry: Arc::new(tokio::sync::Mutex::new(registry)),
124 connection_count: Arc::new(AtomicUsize::new(0)),
125 }
126 }
127
128 pub async fn create_room(&self, room_id: &str) -> Result<(), String> {
131 create_room_entry(
132 room_id,
133 None,
134 &self.rooms,
135 &self.config,
136 &self.system_token_map,
137 Some(self.user_registry.clone()),
138 )
139 .await
140 }
141
142 pub async fn create_room_with_config(
145 &self,
146 room_id: &str,
147 config: room_protocol::RoomConfig,
148 ) -> Result<(), String> {
149 create_room_entry(
150 room_id,
151 Some(config),
152 &self.rooms,
153 &self.config,
154 &self.system_token_map,
155 Some(self.user_registry.clone()),
156 )
157 .await
158 }
159
160 pub async fn get_room_config(&self, room_id: &str) -> Option<room_protocol::RoomConfig> {
162 self.rooms
163 .lock()
164 .await
165 .get(room_id)
166 .and_then(|s| s.config.clone())
167 }
168
169 pub async fn destroy_room(&self, room_id: &str) -> Result<(), String> {
173 let mut rooms = self.rooms.lock().await;
174 let state = rooms
175 .remove(room_id)
176 .ok_or_else(|| format!("room not found: {room_id}"))?;
177
178 let _ = state.shutdown.send(true);
180 Ok(())
181 }
182
183 pub async fn has_room(&self, room_id: &str) -> bool {
185 self.rooms.lock().await.contains_key(room_id)
186 }
187
188 pub fn shutdown_handle(&self) -> Arc<watch::Sender<bool>> {
190 self.shutdown.clone()
191 }
192
193 pub async fn list_rooms(&self) -> Vec<String> {
195 self.rooms.lock().await.keys().cloned().collect()
196 }
197
198 #[doc(hidden)]
201 pub async fn test_inject_token(
202 &self,
203 room_id: &str,
204 username: &str,
205 token: &str,
206 ) -> Result<(), String> {
207 let rooms = self.rooms.lock().await;
208 let room = rooms
209 .get(room_id)
210 .ok_or_else(|| format!("room not found: {room_id}"))?;
211 room.auth
212 .token_map
213 .lock()
214 .await
215 .insert(token.to_owned(), username.to_owned());
216 Ok(())
217 }
218
219 pub async fn run(&self) -> anyhow::Result<()> {
227 let pid_path = if self.config.socket_path == crate::paths::room_socket_path() {
231 match write_pid_file(&crate::paths::room_pid_path()) {
232 Ok(()) => Some(crate::paths::room_pid_path()),
233 Err(e) => {
234 eprintln!("[daemon] failed to write PID file: {e}");
235 None
236 }
237 }
238 } else {
239 None
240 };
241
242 if self.config.socket_path.exists() {
244 std::fs::remove_file(&self.config.socket_path)?;
245 }
246
247 let listener = UnixListener::bind(&self.config.socket_path)?;
248 eprintln!(
249 "[daemon] listening on {}",
250 self.config.socket_path.display()
251 );
252
253 let mut shutdown_rx = self.shutdown.subscribe();
254 let grace_duration = tokio::time::Duration::from_secs(self.config.grace_period_secs);
255
256 let (close_tx, mut close_rx) = tokio::sync::mpsc::channel::<()>(64);
258
259 let mut grace_sleep: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
261
262 if let Some(port) = self.config.ws_port {
264 let ws_state = DaemonWsState {
265 rooms: self.rooms.clone(),
266 next_client_id: self.next_client_id.clone(),
267 config: self.config.clone(),
268 system_token_map: self.system_token_map.clone(),
269 user_registry: self.user_registry.clone(),
270 };
271 let app = ws::create_daemon_router(ws_state);
272 let tcp = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
273 eprintln!("[daemon] WebSocket/REST listening on port {port}");
274 tokio::spawn(async move {
275 if let Err(e) = axum::serve(tcp, app).await {
276 eprintln!("[daemon] WS server error: {e}");
277 }
278 });
279 }
280
281 let result = loop {
282 let grace_fut = async {
285 match grace_sleep.as_mut() {
286 Some(s) => {
287 s.await;
288 }
289 None => std::future::pending::<()>().await,
290 }
291 };
292
293 tokio::select! {
294 accept = listener.accept() => {
295 let (stream, _) = match accept {
296 Ok(a) => a,
297 Err(e) => break Err(e.into()),
298 };
299 grace_sleep = None;
301
302 let count = self.connection_count.clone();
303 count.fetch_add(1, Ordering::SeqCst);
304 let rooms = self.rooms.clone();
305 let next_id = self.next_client_id.clone();
306 let cfg = self.config.clone();
307 let sys_tokens = self.system_token_map.clone();
308 let registry = self.user_registry.clone();
309 let tx = close_tx.clone();
310
311 tokio::spawn(async move {
312 if let Err(e) = dispatch_connection(stream, &rooms, &next_id, &cfg, &sys_tokens, ®istry).await {
313 eprintln!("[daemon] connection error: {e:#}");
314 }
315 count.fetch_sub(1, Ordering::SeqCst);
316 let _ = tx.send(()).await;
318 });
319 }
320 Some(()) = close_rx.recv() => {
321 if self.connection_count.load(Ordering::SeqCst) == 0 {
323 eprintln!(
324 "[daemon] no connections — grace period {}s started",
325 self.config.grace_period_secs
326 );
327 grace_sleep =
328 Some(Box::pin(tokio::time::sleep(grace_duration)));
329 }
330 }
331 _ = grace_fut => {
332 eprintln!("[daemon] grace period expired, shutting down");
333 let _ = self.shutdown.send(true);
334 break Ok(());
337 }
338 _ = shutdown_rx.changed() => {
339 eprintln!("[daemon] shutdown requested, exiting");
340 if let Some(ref p) = pid_path {
341 remove_pid_file(p);
342 }
343 break Ok(());
344 }
345 }
346 };
347
348 let _ = std::fs::remove_file(&self.config.socket_path);
350 let _ = std::fs::remove_file(crate::paths::room_pid_path());
351 for room_id in self.list_rooms().await {
353 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
354 }
355
356 result
357 }
358}
359
360pub(crate) struct DaemonRoomResolver {
366 rooms: RoomMap,
367}
368
369impl DaemonRoomResolver {
370 pub(crate) fn new(rooms: RoomMap) -> Self {
371 Self { rooms }
372 }
373}
374
375impl CrossRoomResolver for DaemonRoomResolver {
376 fn resolve_room(
377 &self,
378 room_id: &str,
379 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<Arc<RoomState>>> + Send + '_>>
380 {
381 let room_id = room_id.to_owned();
382 Box::pin(async move { self.rooms.lock().await.get(&room_id).cloned() })
383 }
384}
385
386#[cfg(test)]
389mod tests {
390 use super::*;
391 use config::MAX_ROOM_ID_LEN;
392 use lifecycle::build_initial_subscriptions;
393 use std::path::PathBuf;
394
395 #[test]
398 fn write_pid_file_creates_file_with_current_pid() {
399 let dir = tempfile::TempDir::new().unwrap();
400 let path = dir.path().join("test.pid");
401 write_pid_file(&path).unwrap();
402 let content = std::fs::read_to_string(&path).unwrap();
403 let pid: u32 = content.trim().parse().expect("PID should be a number");
404 assert_eq!(pid, std::process::id());
405 }
406
407 #[test]
408 fn is_pid_alive_true_for_current_process() {
409 let dir = tempfile::TempDir::new().unwrap();
410 let path = dir.path().join("test.pid");
411 write_pid_file(&path).unwrap();
412 assert!(is_pid_alive(&path), "current process should be alive");
413 }
414
415 #[test]
416 fn is_pid_alive_false_for_missing_file() {
417 let path = std::path::Path::new("/tmp/nonexistent-room-test-99999999.pid");
418 assert!(!is_pid_alive(path));
419 }
420
421 #[test]
422 fn remove_pid_file_deletes_file() {
423 let dir = tempfile::TempDir::new().unwrap();
424 let path = dir.path().join("remove.pid");
425 write_pid_file(&path).unwrap();
426 assert!(path.exists());
427 remove_pid_file(&path);
428 assert!(!path.exists());
429 }
430
431 #[test]
432 fn remove_pid_file_noop_when_missing() {
433 let path = std::path::Path::new("/tmp/gone-99999999.pid");
435 remove_pid_file(path); }
437
438 async fn get_room(daemon: &DaemonState, room_id: &str) -> Arc<RoomState> {
442 daemon
443 .rooms
444 .lock()
445 .await
446 .get(room_id)
447 .cloned()
448 .unwrap_or_else(|| panic!("room {room_id} not found"))
449 }
450
451 #[tokio::test]
452 async fn create_room_succeeds() {
453 let daemon = DaemonState::new(DaemonConfig::default());
454 assert!(daemon.create_room("test-room").await.is_ok());
455 let state = get_room(&daemon, "test-room").await;
456 assert_eq!(*state.room_id, "test-room");
457 }
458
459 #[tokio::test]
460 async fn create_duplicate_room_fails() {
461 let daemon = DaemonState::new(DaemonConfig::default());
462 daemon.create_room("dup").await.unwrap();
463 let result = daemon.create_room("dup").await;
464 assert!(result.is_err());
465 assert!(result.unwrap_err().contains("already exists"));
466 }
467
468 #[tokio::test]
469 async fn has_room_returns_true_for_created() {
470 let daemon = DaemonState::new(DaemonConfig::default());
471 daemon.create_room("room-a").await.unwrap();
472 assert!(daemon.has_room("room-a").await);
473 assert!(!daemon.has_room("room-b").await);
474 }
475
476 #[tokio::test]
477 async fn destroy_room_removes_it() {
478 let daemon = DaemonState::new(DaemonConfig::default());
479 daemon.create_room("doomed").await.unwrap();
480 assert!(daemon.destroy_room("doomed").await.is_ok());
481 assert!(!daemon.has_room("doomed").await);
482 }
483
484 #[tokio::test]
485 async fn destroy_nonexistent_room_fails() {
486 let daemon = DaemonState::new(DaemonConfig::default());
487 let result = daemon.destroy_room("nope").await;
488 assert!(result.is_err());
489 assert!(result.unwrap_err().contains("not found"));
490 }
491
492 #[tokio::test]
493 async fn destroy_room_signals_shutdown() {
494 let daemon = DaemonState::new(DaemonConfig::default());
495 daemon.create_room("shutme").await.unwrap();
496 let state = get_room(&daemon, "shutme").await;
497 let rx = state.shutdown.subscribe();
498 assert!(!*rx.borrow());
499
500 daemon.destroy_room("shutme").await.unwrap();
501 assert!(*rx.borrow());
503 }
504
505 #[tokio::test]
506 async fn list_rooms_returns_all() {
507 let daemon = DaemonState::new(DaemonConfig::default());
508 daemon.create_room("alpha").await.unwrap();
509 daemon.create_room("beta").await.unwrap();
510 daemon.create_room("gamma").await.unwrap();
511
512 let mut rooms = daemon.list_rooms().await;
513 rooms.sort();
514 assert_eq!(rooms, vec!["alpha", "beta", "gamma"]);
515 }
516
517 #[tokio::test]
518 async fn list_rooms_empty_initially() {
519 let daemon = DaemonState::new(DaemonConfig::default());
520 assert!(daemon.list_rooms().await.is_empty());
521 }
522
523 #[tokio::test]
524 async fn create_room_initializes_plugins() {
525 let daemon = DaemonState::new(DaemonConfig::default());
526 daemon.create_room("plugtest").await.unwrap();
527 let state = get_room(&daemon, "plugtest").await;
528 assert!(state.plugin_registry.resolve("help").is_none());
530 assert!(state.plugin_registry.resolve("stats").is_some());
531 }
532
533 #[test]
536 fn config_chat_path_format() {
537 let config = DaemonConfig {
538 data_dir: PathBuf::from("/var/room"),
539 ..DaemonConfig::default()
540 };
541 assert_eq!(
542 config.chat_path("myroom"),
543 PathBuf::from("/var/room/myroom.chat")
544 );
545 }
546
547 #[test]
548 fn config_default_socket_path() {
549 let config = DaemonConfig::default();
550 assert_eq!(config.socket_path, crate::paths::room_socket_path());
551 }
552
553 #[tokio::test]
556 async fn create_room_with_dm_config() {
557 let daemon = DaemonState::new(DaemonConfig::default());
558 let config = room_protocol::RoomConfig::dm("alice", "bob");
559 assert!(daemon
560 .create_room_with_config("dm-alice-bob", config)
561 .await
562 .is_ok());
563
564 let state = get_room(&daemon, "dm-alice-bob").await;
565 let cfg = state.config.as_ref().unwrap();
566 assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
567 assert_eq!(cfg.max_members, Some(2));
568 assert!(cfg.invite_list.contains("alice"));
569 assert!(cfg.invite_list.contains("bob"));
570 }
571
572 #[tokio::test]
573 async fn create_room_with_config_duplicate_fails() {
574 let daemon = DaemonState::new(DaemonConfig::default());
575 let config = room_protocol::RoomConfig::public("owner");
576 daemon
577 .create_room_with_config("dup", config.clone())
578 .await
579 .unwrap();
580 assert!(daemon.create_room_with_config("dup", config).await.is_err());
581 }
582
583 #[tokio::test]
584 async fn get_room_config_returns_none_for_unconfigured() {
585 let daemon = DaemonState::new(DaemonConfig::default());
586 daemon.create_room("plain").await.unwrap();
587 assert!(daemon.get_room_config("plain").await.is_none());
588 }
589
590 #[tokio::test]
591 async fn get_room_config_returns_config_when_present() {
592 let daemon = DaemonState::new(DaemonConfig::default());
593 let config = room_protocol::RoomConfig::dm("alice", "bob");
594 daemon
595 .create_room_with_config("dm-alice-bob", config)
596 .await
597 .unwrap();
598 let cfg = daemon.get_room_config("dm-alice-bob").await.unwrap();
599 assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
600 }
601
602 #[tokio::test]
603 async fn dm_room_id_deterministic_and_lookup_works() {
604 let daemon = DaemonState::new(DaemonConfig::default());
605 let room_id = room_protocol::dm_room_id("bob", "alice").unwrap();
606 assert_eq!(room_id, "dm-alice-bob");
607
608 let config = room_protocol::RoomConfig::dm("bob", "alice");
609 daemon
610 .create_room_with_config(&room_id, config)
611 .await
612 .unwrap();
613 assert!(daemon.has_room("dm-alice-bob").await);
614 assert_eq!(
616 room_protocol::dm_room_id("alice", "bob").unwrap(),
617 "dm-alice-bob"
618 );
619 }
620
621 #[test]
624 fn valid_room_ids() {
625 for id in [
626 "lobby",
627 "agent-room-2",
628 "my_room",
629 "Room.1",
630 "dm-alice-bob",
631 "a",
632 &"x".repeat(MAX_ROOM_ID_LEN),
633 ] {
634 assert!(validate_room_id(id).is_ok(), "should accept: {id:?}");
635 }
636 }
637
638 #[test]
639 fn empty_room_id_rejected() {
640 let err = validate_room_id("").unwrap_err();
641 assert!(err.contains("empty"), "{err}");
642 }
643
644 #[test]
645 fn room_id_too_long_rejected() {
646 let long = "x".repeat(MAX_ROOM_ID_LEN + 1);
647 let err = validate_room_id(&long).unwrap_err();
648 assert!(err.contains("too long"), "{err}");
649 }
650
651 #[test]
652 fn dot_dot_traversal_rejected() {
653 for id in ["..", "room/../etc", "..secret", "a..b"] {
654 let err = validate_room_id(id).unwrap_err();
655 assert!(err.contains(".."), "should reject {id:?}: {err}");
656 }
657 }
658
659 #[test]
660 fn single_dot_rejected() {
661 let err = validate_room_id(".").unwrap_err();
662 assert!(err.contains(".."), "{err}");
663 }
664
665 #[test]
666 fn slash_rejected() {
667 for id in ["room/sub", "/etc/passwd", "a/b/c"] {
668 let err = validate_room_id(id).unwrap_err();
669 assert!(err.contains("unsafe"), "should reject {id:?}: {err}");
670 }
671 }
672
673 #[test]
674 fn backslash_rejected() {
675 let err = validate_room_id("room\\sub").unwrap_err();
676 assert!(err.contains("unsafe"), "{err}");
677 }
678
679 #[test]
680 fn null_byte_rejected() {
681 let err = validate_room_id("room\0id").unwrap_err();
682 assert!(err.contains("unsafe"), "{err}");
683 }
684
685 #[test]
686 fn whitespace_rejected() {
687 for id in ["room name", "room\tid", "room\nid", " leading", "trailing "] {
688 let err = validate_room_id(id).unwrap_err();
689 assert!(err.contains("whitespace"), "should reject {id:?}: {err}");
690 }
691 }
692
693 #[test]
694 fn other_unsafe_chars_rejected() {
695 for ch in [':', '*', '?', '"', '<', '>', '|'] {
696 let id = format!("room{ch}id");
697 let err = validate_room_id(&id).unwrap_err();
698 assert!(err.contains("unsafe"), "should reject {ch:?}: {err}");
699 }
700 }
701
702 #[tokio::test]
703 async fn create_room_rejects_invalid_id() {
704 let daemon = DaemonState::new(DaemonConfig::default());
705 assert!(daemon.create_room("room/sub").await.is_err());
706 assert!(daemon.create_room("..").await.is_err());
707 assert!(daemon.create_room("").await.is_err());
708 assert!(daemon.create_room("room name").await.is_err());
709 }
710
711 #[tokio::test]
712 async fn create_room_with_config_rejects_invalid_id() {
713 let daemon = DaemonState::new(DaemonConfig::default());
714 let config = room_protocol::RoomConfig::public("owner");
715 assert!(daemon
716 .create_room_with_config("../etc", config)
717 .await
718 .is_err());
719 }
720
721 #[tokio::test]
724 async fn dm_room_auto_subscribes_both_participants() {
725 let daemon = DaemonState::new(DaemonConfig::default());
726 let config = room_protocol::RoomConfig::dm("alice", "bob");
727 daemon
728 .create_room_with_config("dm-alice-bob", config)
729 .await
730 .unwrap();
731
732 let state = get_room(&daemon, "dm-alice-bob").await;
733 let subs = state.filters.subscription_map.lock().await;
734 assert_eq!(subs.len(), 2);
735 assert_eq!(
736 subs.get("alice"),
737 Some(&room_protocol::SubscriptionTier::Full)
738 );
739 assert_eq!(
740 subs.get("bob"),
741 Some(&room_protocol::SubscriptionTier::Full)
742 );
743 }
744
745 #[tokio::test]
746 async fn public_room_starts_with_no_subscriptions() {
747 let daemon = DaemonState::new(DaemonConfig::default());
748 let config = room_protocol::RoomConfig::public("owner");
749 daemon
750 .create_room_with_config("lobby", config)
751 .await
752 .unwrap();
753
754 let state = get_room(&daemon, "lobby").await;
755 let subs = state.filters.subscription_map.lock().await;
756 assert!(subs.is_empty());
757 }
758
759 #[tokio::test]
760 async fn unconfigured_room_starts_with_no_subscriptions() {
761 let daemon = DaemonState::new(DaemonConfig::default());
762 daemon.create_room("plain").await.unwrap();
763
764 let state = get_room(&daemon, "plain").await;
765 let subs = state.filters.subscription_map.lock().await;
766 assert!(subs.is_empty());
767 }
768
769 #[tokio::test]
770 async fn dm_auto_subscribe_uses_full_tier() {
771 let daemon = DaemonState::new(DaemonConfig::default());
772 let config = room_protocol::RoomConfig::dm("carol", "dave");
773 daemon
774 .create_room_with_config("dm-carol-dave", config)
775 .await
776 .unwrap();
777
778 let state = get_room(&daemon, "dm-carol-dave").await;
779 let subs = state.filters.subscription_map.lock().await;
780 for (_, tier) in subs.iter() {
782 assert_eq!(*tier, room_protocol::SubscriptionTier::Full);
783 }
784 }
785
786 #[test]
787 fn build_initial_subscriptions_dm_populates() {
788 let config = room_protocol::RoomConfig::dm("alice", "bob");
789 let subs = build_initial_subscriptions(&config);
790 assert_eq!(subs.len(), 2);
791 assert_eq!(subs["alice"], room_protocol::SubscriptionTier::Full);
792 assert_eq!(subs["bob"], room_protocol::SubscriptionTier::Full);
793 }
794
795 #[test]
796 fn build_initial_subscriptions_public_empty() {
797 let config = room_protocol::RoomConfig::public("owner");
798 let subs = build_initial_subscriptions(&config);
799 assert!(subs.is_empty());
800 }
801
802 #[test]
805 fn default_grace_period_is_30() {
806 let config = DaemonConfig::default();
807 assert_eq!(config.grace_period_secs, 30);
808 }
809
810 #[test]
811 fn custom_grace_period_preserved() {
812 let config = DaemonConfig {
813 grace_period_secs: 0,
814 ..DaemonConfig::default()
815 };
816 assert_eq!(config.grace_period_secs, 0);
817 }
818
819 #[tokio::test]
822 async fn connection_count_starts_at_zero() {
823 let daemon = DaemonState::new(DaemonConfig::default());
824 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
825 }
826
827 #[tokio::test]
828 async fn connection_count_increments_and_decrements() {
829 let count = Arc::new(AtomicUsize::new(0));
830 count.fetch_add(1, Ordering::SeqCst);
831 count.fetch_add(1, Ordering::SeqCst);
832 assert_eq!(count.load(Ordering::SeqCst), 2);
833 count.fetch_sub(1, Ordering::SeqCst);
834 assert_eq!(count.load(Ordering::SeqCst), 1);
835 count.fetch_sub(1, Ordering::SeqCst);
836 assert_eq!(count.load(Ordering::SeqCst), 0);
837 }
838
839 #[tokio::test]
843 async fn daemon_exits_on_shutdown_signal() {
844 let dir = tempfile::TempDir::new().unwrap();
845 let socket = dir.path().join("test-grace.sock");
846 std::fs::create_dir_all(dir.path().join("data")).unwrap();
847 std::fs::create_dir_all(dir.path().join("state")).unwrap();
848
849 let config = DaemonConfig {
850 socket_path: socket.clone(),
851 data_dir: dir.path().join("data"),
852 state_dir: dir.path().join("state"),
853 ws_port: None,
854 grace_period_secs: 0,
855 };
856 let daemon = Arc::new(DaemonState::new(config));
857 let shutdown = daemon.shutdown_handle();
858
859 let daemon2 = Arc::clone(&daemon);
860 let handle = tokio::spawn(async move { daemon2.run().await });
861
862 for _ in 0..100 {
864 if tokio::net::UnixStream::connect(&socket).await.is_ok() {
865 break;
866 }
867 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
868 }
869 assert!(
870 tokio::net::UnixStream::connect(&socket).await.is_ok(),
871 "daemon socket not ready"
872 );
873
874 let _ = shutdown.send(true);
876 let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await;
877 assert!(result.is_ok(), "daemon did not exit within 5s");
878 assert!(result.unwrap().unwrap().is_ok(), "run() returned error");
879 }
880
881 #[tokio::test]
885 async fn grace_period_cancelled_by_new_connection() {
886 let dir = tempfile::TempDir::new().unwrap();
887 let socket = dir.path().join("test-cancel-grace.sock");
888
889 let config = DaemonConfig {
890 socket_path: socket.clone(),
891 data_dir: dir.path().join("data"),
892 state_dir: dir.path().join("state"),
893 ws_port: None,
894 grace_period_secs: 60, };
896 let daemon = DaemonState::new(config);
897
898 daemon.connection_count.fetch_add(1, Ordering::SeqCst);
900 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
901 daemon.connection_count.fetch_sub(1, Ordering::SeqCst);
902 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
903
904 daemon.connection_count.fetch_add(1, Ordering::SeqCst);
906 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
907
908 assert!(!*daemon.shutdown.borrow());
910 }
911
912 fn write_legacy_token(dir: &std::path::Path, room_id: &str, username: &str, token: &str) {
916 let name = format!("room-{room_id}-{username}.token");
917 let data = serde_json::json!({"username": username, "token": token});
918 std::fs::write(dir.join(name), format!("{data}\n")).unwrap();
919 }
920
921 #[test]
922 fn migrate_legacy_tmpdir_tokens_imports_token() {
923 let token_dir = tempfile::TempDir::new().unwrap();
924 let state_dir = tempfile::TempDir::new().unwrap();
925 write_legacy_token(token_dir.path(), "lobby", "alice", "legacy-uuid-alice");
926
927 let mut registry = UserRegistry::new(state_dir.path().to_owned());
928
929 migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
930
931 assert_eq!(registry.validate_token("legacy-uuid-alice"), Some("alice"));
932 assert!(registry.get_user("alice").is_some());
933 }
934
935 #[test]
936 fn migrate_legacy_tmpdir_tokens_idempotent() {
937 let token_dir = tempfile::TempDir::new().unwrap();
938 let state_dir = tempfile::TempDir::new().unwrap();
939 write_legacy_token(token_dir.path(), "lobby", "bob", "tok-bob");
940
941 let mut registry = UserRegistry::new(state_dir.path().to_owned());
942 migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
943 migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
944
945 assert_eq!(registry.validate_token("tok-bob"), Some("bob"));
947 let snap = registry.token_snapshot();
948 assert_eq!(snap.values().filter(|u| u.as_str() == "bob").count(), 1);
949 }
950
951 #[test]
952 fn migrate_legacy_tmpdir_tokens_skips_non_token_files() {
953 let token_dir = tempfile::TempDir::new().unwrap();
954 let state_dir = tempfile::TempDir::new().unwrap();
955 std::fs::write(token_dir.path().join("roomd.sock"), "not a token").unwrap();
956 std::fs::write(token_dir.path().join("something.json"), "{}").unwrap();
957
958 let mut registry = UserRegistry::new(state_dir.path().to_owned());
959 migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
960
961 assert!(registry.list_users().is_empty());
962 }
963
964 #[test]
965 fn migrate_legacy_tmpdir_tokens_skips_malformed_json() {
966 let token_dir = tempfile::TempDir::new().unwrap();
967 let state_dir = tempfile::TempDir::new().unwrap();
968 std::fs::write(token_dir.path().join("room-x-bad.token"), "not-json{{{").unwrap();
969
970 let mut registry = UserRegistry::new(state_dir.path().to_owned());
971 migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
972
973 assert!(registry.list_users().is_empty());
974 }
975
976 #[tokio::test]
979 async fn daemon_room_resolver_finds_existing_room() {
980 let daemon = DaemonState::new(DaemonConfig::default());
981 daemon.create_room("target").await.unwrap();
982
983 let resolver = DaemonRoomResolver::new(daemon.rooms.clone());
984 let result = resolver.resolve_room("target").await;
985 assert!(result.is_some());
986 assert_eq!(result.unwrap().room_id.as_str(), "target");
987 }
988
989 #[tokio::test]
990 async fn daemon_room_resolver_returns_none_for_unknown() {
991 let daemon = DaemonState::new(DaemonConfig::default());
992 let resolver = DaemonRoomResolver::new(daemon.rooms.clone());
993 assert!(resolver.resolve_room("nonexistent").await.is_none());
994 }
995
996 #[tokio::test]
997 async fn created_rooms_have_resolver_attached() {
998 let daemon = DaemonState::new(DaemonConfig::default());
999 daemon.create_room("room-a").await.unwrap();
1000
1001 let state = get_room(&daemon, "room-a").await;
1002 assert!(
1003 state.cross_room_resolver.get().is_some(),
1004 "daemon-created rooms must have a cross-room resolver attached"
1005 );
1006 }
1007
1008 #[tokio::test]
1009 async fn cross_room_resolver_resolves_sibling_room() {
1010 let daemon = DaemonState::new(DaemonConfig::default());
1011 daemon.create_room("room-a").await.unwrap();
1012 daemon.create_room("room-b").await.unwrap();
1013
1014 let state_a = get_room(&daemon, "room-a").await;
1015 let resolver = state_a.cross_room_resolver.get().unwrap();
1016 let resolved = resolver.resolve_room("room-b").await;
1017 assert!(resolved.is_some(), "resolver on room-a should find room-b");
1018 assert_eq!(resolved.unwrap().room_id.as_str(), "room-b");
1019 }
1020}