Skip to main content

room_daemon/broker/daemon/
mod.rs

1//! Multi-room daemon: manages N rooms in a single process.
2//!
3//! `DaemonState` wraps a map of room_id → `RoomState` and provides room
4//! lifecycle (create/destroy/get). The daemon listens on a single UDS
5//! socket at a configurable path and dispatches connections to the correct
6//! room based on an extended handshake protocol.
7//!
8//! ## Handshake protocol
9//!
10//! The first line of a UDS connection to the daemon can carry one of two
11//! prefixes:
12//!
13//! - `ROOM:<room_id>:<rest>` — route to an existing room. The rest of the
14//!   line is the standard per-room handshake (`SEND:`, `TOKEN:`, `JOIN:`,
15//!   or plain username).
16//! - `CREATE:<room_id>` — create a new room. A second line carries the
17//!   room configuration as JSON (`{"visibility":"public","invite":[]}`).
18//! - `DESTROY:<room_id>` — destroy a room. Signals shutdown to connected
19//!   clients and removes the room from the daemon's map.
20//!
21//! If no recognised prefix is present, the connection is rejected with an error.
22//!
23//! Examples:
24//! ```text
25//! ROOM:myroom:JOIN:alice       → join room "myroom" as "alice"
26//! ROOM:myroom:TOKEN:<uuid>     → authenticated send to "myroom"
27//! ROOM:myroom:SEND:bob         → legacy unauthenticated send to "myroom"
28//! ROOM:myroom:alice            → interactive join to "myroom" as "alice"
29//! CREATE:newroom               → create room "newroom" (config on next line)
30//! DESTROY:myroom               → destroy room "myroom"
31//! ```
32
33mod config;
34mod dispatcher;
35mod lifecycle;
36mod migration;
37mod pid;
38
39pub use config::{validate_room_id, DaemonConfig};
40pub(crate) use lifecycle::create_room_entry;
41pub use pid::{is_pid_alive, remove_pid_file, write_pid_file};
42
43use std::{
44    collections::HashMap,
45    sync::{
46        atomic::{AtomicU64, AtomicUsize, Ordering},
47        Arc,
48    },
49};
50
51use tokio::{
52    net::UnixListener,
53    sync::{watch, Mutex},
54};
55
56use crate::registry::UserRegistry;
57
58use super::{
59    service::CrossRoomResolver,
60    state::{RoomState, TokenMap},
61    ws::{self, DaemonWsState},
62};
63
64use dispatcher::dispatch_connection;
65use migration::load_or_migrate_registry;
66
67/// Registry of active rooms, keyed by room_id.
68pub(crate) type RoomMap = Arc<Mutex<HashMap<String, Arc<RoomState>>>>;
69
70/// Multi-room daemon state.
71pub struct DaemonState {
72    pub(crate) rooms: RoomMap,
73    pub(crate) config: DaemonConfig,
74    /// Global client ID counter shared across all rooms.
75    pub(crate) next_client_id: Arc<AtomicU64>,
76    /// Daemon-level shutdown signal.
77    pub(crate) shutdown: Arc<watch::Sender<bool>>,
78    /// System-level token map shared across all rooms (runtime cache).
79    ///
80    /// A single `Arc<Mutex<HashMap>>` instance is cloned into every room's
81    /// `token_map`. Tokens issued in any room are valid in all rooms managed
82    /// by this daemon. Seeded from `user_registry` on startup; kept in sync
83    /// by [`super::auth::issue_token_via_registry`].
84    pub(crate) system_token_map: TokenMap,
85    /// Daemon-level user registry — sole persistence layer for cross-room identity.
86    ///
87    /// Stores user profiles, room memberships, and tokens to
88    /// `~/.room/state/users.json`. New sessions register/update here;
89    /// `system_token_map` is derived from this registry at startup and kept
90    /// in sync on every join.
91    pub(crate) user_registry: Arc<tokio::sync::Mutex<UserRegistry>>,
92    /// Number of currently active UDS connections.
93    ///
94    /// Incremented when a connection is accepted; decremented when the
95    /// connection task completes. When the count drops to zero the daemon
96    /// starts a grace period timer before sending the shutdown signal.
97    pub(crate) connection_count: Arc<AtomicUsize>,
98}
99
100impl DaemonState {
101    /// Create a new daemon with the given configuration and no rooms.
102    pub fn new(config: DaemonConfig) -> Self {
103        let (shutdown_tx, _) = watch::channel(false);
104
105        // Load UserRegistry from disk (sole source of truth for identity).
106        //
107        // Migration path: if `users.json` (UserRegistry) does not exist but
108        // the legacy `tokens.json` (system_token_map from #334) does, import
109        // the flat token map into a fresh registry so existing sessions survive
110        // the upgrade without requiring a forced re-join.
111        let registry = load_or_migrate_registry(&config);
112
113        // Seed the runtime token map from the registry so existing tokens remain
114        // valid across daemon restarts without requiring a fresh join.
115        let token_snapshot = registry.token_snapshot();
116
117        Self {
118            rooms: Arc::new(Mutex::new(HashMap::new())),
119            config,
120            next_client_id: Arc::new(AtomicU64::new(0)),
121            shutdown: Arc::new(shutdown_tx),
122            system_token_map: Arc::new(Mutex::new(token_snapshot)),
123            user_registry: Arc::new(tokio::sync::Mutex::new(registry)),
124            connection_count: Arc::new(AtomicUsize::new(0)),
125        }
126    }
127
128    /// Create a room and register it. Returns `Err` if the room ID is invalid
129    /// or the room already exists.
130    pub async fn create_room(&self, room_id: &str) -> Result<(), String> {
131        create_room_entry(
132            room_id,
133            None,
134            &self.rooms,
135            &self.config,
136            &self.system_token_map,
137            Some(self.user_registry.clone()),
138        )
139        .await
140    }
141
142    /// Create a room with explicit configuration. Returns `Err` if the room ID
143    /// is invalid or the room already exists.
144    pub async fn create_room_with_config(
145        &self,
146        room_id: &str,
147        config: room_protocol::RoomConfig,
148    ) -> Result<(), String> {
149        create_room_entry(
150            room_id,
151            Some(config),
152            &self.rooms,
153            &self.config,
154            &self.system_token_map,
155            Some(self.user_registry.clone()),
156        )
157        .await
158    }
159
160    /// Get a room's config, if it exists.
161    pub async fn get_room_config(&self, room_id: &str) -> Option<room_protocol::RoomConfig> {
162        self.rooms
163            .lock()
164            .await
165            .get(room_id)
166            .and_then(|s| s.config.clone())
167    }
168
169    /// Destroy a room. Returns `Err` if the room does not exist.
170    ///
171    /// Signals the room's shutdown so connected clients receive EOF.
172    pub async fn destroy_room(&self, room_id: &str) -> Result<(), String> {
173        let mut rooms = self.rooms.lock().await;
174        let state = rooms
175            .remove(room_id)
176            .ok_or_else(|| format!("room not found: {room_id}"))?;
177
178        // Signal the room's shutdown so any connected clients receive EOF.
179        let _ = state.shutdown.send(true);
180        Ok(())
181    }
182
183    /// Check if a room exists.
184    pub async fn has_room(&self, room_id: &str) -> bool {
185        self.rooms.lock().await.contains_key(room_id)
186    }
187
188    /// Get a handle to the daemon-level shutdown sender.
189    pub fn shutdown_handle(&self) -> Arc<watch::Sender<bool>> {
190        self.shutdown.clone()
191    }
192
193    /// List all active room IDs.
194    pub async fn list_rooms(&self) -> Vec<String> {
195        self.rooms.lock().await.keys().cloned().collect()
196    }
197
198    /// Insert a token directly into a room's token map, bypassing the join
199    /// permission check. Intended for integration tests only.
200    #[doc(hidden)]
201    pub async fn test_inject_token(
202        &self,
203        room_id: &str,
204        username: &str,
205        token: &str,
206    ) -> Result<(), String> {
207        let rooms = self.rooms.lock().await;
208        let room = rooms
209            .get(room_id)
210            .ok_or_else(|| format!("room not found: {room_id}"))?;
211        room.auth
212            .token_map
213            .lock()
214            .await
215            .insert(token.to_owned(), username.to_owned());
216        Ok(())
217    }
218
219    /// Run the daemon: listen on UDS, dispatch connections to rooms.
220    ///
221    /// When the last UDS connection closes, starts a grace period timer
222    /// (`config.grace_period_secs`). If no new connection arrives before the
223    /// timer fires, sends a shutdown signal. Any new connection during the
224    /// grace period cancels the timer. On exit, cleans up the PID file and
225    /// socket file.
226    pub async fn run(&self) -> anyhow::Result<()> {
227        // Write PID file only for the default daemon socket.  Daemons with an
228        // explicit socket override (tests, CI) are independent instances and
229        // must not clobber the system PID file.
230        let pid_path = if self.config.socket_path == crate::paths::room_socket_path() {
231            match write_pid_file(&crate::paths::room_pid_path()) {
232                Ok(()) => Some(crate::paths::room_pid_path()),
233                Err(e) => {
234                    eprintln!("[daemon] failed to write PID file: {e}");
235                    None
236                }
237            }
238        } else {
239            None
240        };
241
242        // Remove stale socket synchronously.
243        if self.config.socket_path.exists() {
244            std::fs::remove_file(&self.config.socket_path)?;
245        }
246
247        let listener = UnixListener::bind(&self.config.socket_path)?;
248        eprintln!(
249            "[daemon] listening on {}",
250            self.config.socket_path.display()
251        );
252
253        let mut shutdown_rx = self.shutdown.subscribe();
254        let grace_duration = tokio::time::Duration::from_secs(self.config.grace_period_secs);
255
256        // mpsc channel: connection tasks notify the main loop when they close.
257        let (close_tx, mut close_rx) = tokio::sync::mpsc::channel::<()>(64);
258
259        // Optional grace period sleep — active when the last connection closes.
260        let mut grace_sleep: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
261
262        // Start WebSocket/REST server if configured.
263        if let Some(port) = self.config.ws_port {
264            let ws_state = DaemonWsState {
265                rooms: self.rooms.clone(),
266                next_client_id: self.next_client_id.clone(),
267                config: self.config.clone(),
268                system_token_map: self.system_token_map.clone(),
269                user_registry: self.user_registry.clone(),
270            };
271            let app = ws::create_daemon_router(ws_state);
272            let tcp = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
273            eprintln!("[daemon] WebSocket/REST listening on port {port}");
274            tokio::spawn(async move {
275                if let Err(e) = axum::serve(tcp, app).await {
276                    eprintln!("[daemon] WS server error: {e}");
277                }
278            });
279        }
280
281        let result = loop {
282            // Build the grace future: fires if a grace sleep is active,
283            // otherwise stays pending forever.
284            let grace_fut = async {
285                match grace_sleep.as_mut() {
286                    Some(s) => {
287                        s.await;
288                    }
289                    None => std::future::pending::<()>().await,
290                }
291            };
292
293            tokio::select! {
294                accept = listener.accept() => {
295                    let (stream, _) = match accept {
296                        Ok(a) => a,
297                        Err(e) => break Err(e.into()),
298                    };
299                    // Cancel any pending grace timer — we have a new connection.
300                    grace_sleep = None;
301
302                    let count = self.connection_count.clone();
303                    count.fetch_add(1, Ordering::SeqCst);
304                    let rooms = self.rooms.clone();
305                    let next_id = self.next_client_id.clone();
306                    let cfg = self.config.clone();
307                    let sys_tokens = self.system_token_map.clone();
308                    let registry = self.user_registry.clone();
309                    let tx = close_tx.clone();
310
311                    tokio::spawn(async move {
312                        if let Err(e) = dispatch_connection(stream, &rooms, &next_id, &cfg, &sys_tokens, &registry).await {
313                            eprintln!("[daemon] connection error: {e:#}");
314                        }
315                        count.fetch_sub(1, Ordering::SeqCst);
316                        // Notify main loop so it can start the grace timer.
317                        let _ = tx.send(()).await;
318                    });
319                }
320                Some(()) = close_rx.recv() => {
321                    // A connection closed. Start grace period if none remain.
322                    if self.connection_count.load(Ordering::SeqCst) == 0 {
323                        eprintln!(
324                            "[daemon] no connections — grace period {}s started",
325                            self.config.grace_period_secs
326                        );
327                        grace_sleep =
328                            Some(Box::pin(tokio::time::sleep(grace_duration)));
329                    }
330                }
331                _ = grace_fut => {
332                    eprintln!("[daemon] grace period expired, shutting down");
333                    let _ = self.shutdown.send(true);
334                    // The shutdown_rx arm will fire on the next iteration; break
335                    // here directly to avoid a double-exit path.
336                    break Ok(());
337                }
338                _ = shutdown_rx.changed() => {
339                    eprintln!("[daemon] shutdown requested, exiting");
340                    if let Some(ref p) = pid_path {
341                        remove_pid_file(p);
342                    }
343                    break Ok(());
344                }
345            }
346        };
347
348        // Clean up ephemeral files on exit.
349        let _ = std::fs::remove_file(&self.config.socket_path);
350        let _ = std::fs::remove_file(crate::paths::room_pid_path());
351        // Remove per-room meta files written during room creation.
352        for room_id in self.list_rooms().await {
353            let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
354        }
355
356        result
357    }
358}
359
360// ── CrossRoomResolver for daemon mode ────────────────────────────────────────
361
362/// Resolves a room ID to its [`RoomState`] by looking it up in the daemon's
363/// room map. Attached to every room created by the daemon so that plugin
364/// commands can target a different room via `--room <id>`.
365pub(crate) struct DaemonRoomResolver {
366    rooms: RoomMap,
367}
368
369impl DaemonRoomResolver {
370    pub(crate) fn new(rooms: RoomMap) -> Self {
371        Self { rooms }
372    }
373}
374
375impl CrossRoomResolver for DaemonRoomResolver {
376    fn resolve_room(
377        &self,
378        room_id: &str,
379    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<Arc<RoomState>>> + Send + '_>>
380    {
381        let room_id = room_id.to_owned();
382        Box::pin(async move { self.rooms.lock().await.get(&room_id).cloned() })
383    }
384}
385
386// ── Tests ─────────────────────────────────────────────────────────────────────
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use config::MAX_ROOM_ID_LEN;
392    use lifecycle::build_initial_subscriptions;
393    use std::path::PathBuf;
394
395    // ── PID management ───────────────────────────────────────────────────
396
397    #[test]
398    fn write_pid_file_creates_file_with_current_pid() {
399        let dir = tempfile::TempDir::new().unwrap();
400        let path = dir.path().join("test.pid");
401        write_pid_file(&path).unwrap();
402        let content = std::fs::read_to_string(&path).unwrap();
403        let pid: u32 = content.trim().parse().expect("PID should be a number");
404        assert_eq!(pid, std::process::id());
405    }
406
407    #[test]
408    fn is_pid_alive_true_for_current_process() {
409        let dir = tempfile::TempDir::new().unwrap();
410        let path = dir.path().join("test.pid");
411        write_pid_file(&path).unwrap();
412        assert!(is_pid_alive(&path), "current process should be alive");
413    }
414
415    #[test]
416    fn is_pid_alive_false_for_missing_file() {
417        let path = std::path::Path::new("/tmp/nonexistent-room-test-99999999.pid");
418        assert!(!is_pid_alive(path));
419    }
420
421    #[test]
422    fn remove_pid_file_deletes_file() {
423        let dir = tempfile::TempDir::new().unwrap();
424        let path = dir.path().join("remove.pid");
425        write_pid_file(&path).unwrap();
426        assert!(path.exists());
427        remove_pid_file(&path);
428        assert!(!path.exists());
429    }
430
431    #[test]
432    fn remove_pid_file_noop_when_missing() {
433        // Should not panic if the file is already gone.
434        let path = std::path::Path::new("/tmp/gone-99999999.pid");
435        remove_pid_file(path); // must not panic
436    }
437
438    // ── DaemonState lifecycle ─────────────────────────────────────────────
439
440    /// Test helper: look up a room's state by ID.
441    async fn get_room(daemon: &DaemonState, room_id: &str) -> Arc<RoomState> {
442        daemon
443            .rooms
444            .lock()
445            .await
446            .get(room_id)
447            .cloned()
448            .unwrap_or_else(|| panic!("room {room_id} not found"))
449    }
450
451    #[tokio::test]
452    async fn create_room_succeeds() {
453        let daemon = DaemonState::new(DaemonConfig::default());
454        assert!(daemon.create_room("test-room").await.is_ok());
455        let state = get_room(&daemon, "test-room").await;
456        assert_eq!(*state.room_id, "test-room");
457    }
458
459    #[tokio::test]
460    async fn create_duplicate_room_fails() {
461        let daemon = DaemonState::new(DaemonConfig::default());
462        daemon.create_room("dup").await.unwrap();
463        let result = daemon.create_room("dup").await;
464        assert!(result.is_err());
465        assert!(result.unwrap_err().contains("already exists"));
466    }
467
468    #[tokio::test]
469    async fn has_room_returns_true_for_created() {
470        let daemon = DaemonState::new(DaemonConfig::default());
471        daemon.create_room("room-a").await.unwrap();
472        assert!(daemon.has_room("room-a").await);
473        assert!(!daemon.has_room("room-b").await);
474    }
475
476    #[tokio::test]
477    async fn destroy_room_removes_it() {
478        let daemon = DaemonState::new(DaemonConfig::default());
479        daemon.create_room("doomed").await.unwrap();
480        assert!(daemon.destroy_room("doomed").await.is_ok());
481        assert!(!daemon.has_room("doomed").await);
482    }
483
484    #[tokio::test]
485    async fn destroy_nonexistent_room_fails() {
486        let daemon = DaemonState::new(DaemonConfig::default());
487        let result = daemon.destroy_room("nope").await;
488        assert!(result.is_err());
489        assert!(result.unwrap_err().contains("not found"));
490    }
491
492    #[tokio::test]
493    async fn destroy_room_signals_shutdown() {
494        let daemon = DaemonState::new(DaemonConfig::default());
495        daemon.create_room("shutme").await.unwrap();
496        let state = get_room(&daemon, "shutme").await;
497        let rx = state.shutdown.subscribe();
498        assert!(!*rx.borrow());
499
500        daemon.destroy_room("shutme").await.unwrap();
501        // The shutdown signal should now be true.
502        assert!(*rx.borrow());
503    }
504
505    #[tokio::test]
506    async fn list_rooms_returns_all() {
507        let daemon = DaemonState::new(DaemonConfig::default());
508        daemon.create_room("alpha").await.unwrap();
509        daemon.create_room("beta").await.unwrap();
510        daemon.create_room("gamma").await.unwrap();
511
512        let mut rooms = daemon.list_rooms().await;
513        rooms.sort();
514        assert_eq!(rooms, vec!["alpha", "beta", "gamma"]);
515    }
516
517    #[tokio::test]
518    async fn list_rooms_empty_initially() {
519        let daemon = DaemonState::new(DaemonConfig::default());
520        assert!(daemon.list_rooms().await.is_empty());
521    }
522
523    #[tokio::test]
524    async fn create_room_initializes_plugins() {
525        let daemon = DaemonState::new(DaemonConfig::default());
526        daemon.create_room("plugtest").await.unwrap();
527        let state = get_room(&daemon, "plugtest").await;
528        // help is a builtin (not in plugin registry), stats should be registered
529        assert!(state.plugin_registry.resolve("help").is_none());
530        assert!(state.plugin_registry.resolve("stats").is_some());
531    }
532
533    // ── DaemonConfig ──────────────────────────────────────────────────────
534
535    #[test]
536    fn config_chat_path_format() {
537        let config = DaemonConfig {
538            data_dir: PathBuf::from("/var/room"),
539            ..DaemonConfig::default()
540        };
541        assert_eq!(
542            config.chat_path("myroom"),
543            PathBuf::from("/var/room/myroom.chat")
544        );
545    }
546
547    #[test]
548    fn config_default_socket_path() {
549        let config = DaemonConfig::default();
550        assert_eq!(config.socket_path, crate::paths::room_socket_path());
551    }
552
553    // ── create_room_with_config ───────────────────────────────────────────
554
555    #[tokio::test]
556    async fn create_room_with_dm_config() {
557        let daemon = DaemonState::new(DaemonConfig::default());
558        let config = room_protocol::RoomConfig::dm("alice", "bob");
559        assert!(daemon
560            .create_room_with_config("dm-alice-bob", config)
561            .await
562            .is_ok());
563
564        let state = get_room(&daemon, "dm-alice-bob").await;
565        let cfg = state.config.as_ref().unwrap();
566        assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
567        assert_eq!(cfg.max_members, Some(2));
568        assert!(cfg.invite_list.contains("alice"));
569        assert!(cfg.invite_list.contains("bob"));
570    }
571
572    #[tokio::test]
573    async fn create_room_with_config_duplicate_fails() {
574        let daemon = DaemonState::new(DaemonConfig::default());
575        let config = room_protocol::RoomConfig::public("owner");
576        daemon
577            .create_room_with_config("dup", config.clone())
578            .await
579            .unwrap();
580        assert!(daemon.create_room_with_config("dup", config).await.is_err());
581    }
582
583    #[tokio::test]
584    async fn get_room_config_returns_none_for_unconfigured() {
585        let daemon = DaemonState::new(DaemonConfig::default());
586        daemon.create_room("plain").await.unwrap();
587        assert!(daemon.get_room_config("plain").await.is_none());
588    }
589
590    #[tokio::test]
591    async fn get_room_config_returns_config_when_present() {
592        let daemon = DaemonState::new(DaemonConfig::default());
593        let config = room_protocol::RoomConfig::dm("alice", "bob");
594        daemon
595            .create_room_with_config("dm-alice-bob", config)
596            .await
597            .unwrap();
598        let cfg = daemon.get_room_config("dm-alice-bob").await.unwrap();
599        assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
600    }
601
602    #[tokio::test]
603    async fn dm_room_id_deterministic_and_lookup_works() {
604        let daemon = DaemonState::new(DaemonConfig::default());
605        let room_id = room_protocol::dm_room_id("bob", "alice").unwrap();
606        assert_eq!(room_id, "dm-alice-bob");
607
608        let config = room_protocol::RoomConfig::dm("bob", "alice");
609        daemon
610            .create_room_with_config(&room_id, config)
611            .await
612            .unwrap();
613        assert!(daemon.has_room("dm-alice-bob").await);
614        // Reverse order gives the same room_id
615        assert_eq!(
616            room_protocol::dm_room_id("alice", "bob").unwrap(),
617            "dm-alice-bob"
618        );
619    }
620
621    // ── validate_room_id ──────────────────────────────────────────────────
622
623    #[test]
624    fn valid_room_ids() {
625        for id in [
626            "lobby",
627            "agent-room-2",
628            "my_room",
629            "Room.1",
630            "dm-alice-bob",
631            "a",
632            &"x".repeat(MAX_ROOM_ID_LEN),
633        ] {
634            assert!(validate_room_id(id).is_ok(), "should accept: {id:?}");
635        }
636    }
637
638    #[test]
639    fn empty_room_id_rejected() {
640        let err = validate_room_id("").unwrap_err();
641        assert!(err.contains("empty"), "{err}");
642    }
643
644    #[test]
645    fn room_id_too_long_rejected() {
646        let long = "x".repeat(MAX_ROOM_ID_LEN + 1);
647        let err = validate_room_id(&long).unwrap_err();
648        assert!(err.contains("too long"), "{err}");
649    }
650
651    #[test]
652    fn dot_dot_traversal_rejected() {
653        for id in ["..", "room/../etc", "..secret", "a..b"] {
654            let err = validate_room_id(id).unwrap_err();
655            assert!(err.contains(".."), "should reject {id:?}: {err}");
656        }
657    }
658
659    #[test]
660    fn single_dot_rejected() {
661        let err = validate_room_id(".").unwrap_err();
662        assert!(err.contains(".."), "{err}");
663    }
664
665    #[test]
666    fn slash_rejected() {
667        for id in ["room/sub", "/etc/passwd", "a/b/c"] {
668            let err = validate_room_id(id).unwrap_err();
669            assert!(err.contains("unsafe"), "should reject {id:?}: {err}");
670        }
671    }
672
673    #[test]
674    fn backslash_rejected() {
675        let err = validate_room_id("room\\sub").unwrap_err();
676        assert!(err.contains("unsafe"), "{err}");
677    }
678
679    #[test]
680    fn null_byte_rejected() {
681        let err = validate_room_id("room\0id").unwrap_err();
682        assert!(err.contains("unsafe"), "{err}");
683    }
684
685    #[test]
686    fn whitespace_rejected() {
687        for id in ["room name", "room\tid", "room\nid", " leading", "trailing "] {
688            let err = validate_room_id(id).unwrap_err();
689            assert!(err.contains("whitespace"), "should reject {id:?}: {err}");
690        }
691    }
692
693    #[test]
694    fn other_unsafe_chars_rejected() {
695        for ch in [':', '*', '?', '"', '<', '>', '|'] {
696            let id = format!("room{ch}id");
697            let err = validate_room_id(&id).unwrap_err();
698            assert!(err.contains("unsafe"), "should reject {ch:?}: {err}");
699        }
700    }
701
702    #[tokio::test]
703    async fn create_room_rejects_invalid_id() {
704        let daemon = DaemonState::new(DaemonConfig::default());
705        assert!(daemon.create_room("room/sub").await.is_err());
706        assert!(daemon.create_room("..").await.is_err());
707        assert!(daemon.create_room("").await.is_err());
708        assert!(daemon.create_room("room name").await.is_err());
709    }
710
711    #[tokio::test]
712    async fn create_room_with_config_rejects_invalid_id() {
713        let daemon = DaemonState::new(DaemonConfig::default());
714        let config = room_protocol::RoomConfig::public("owner");
715        assert!(daemon
716            .create_room_with_config("../etc", config)
717            .await
718            .is_err());
719    }
720
721    // ── DM auto-subscribe ─────────────────────────────────────────────────
722
723    #[tokio::test]
724    async fn dm_room_auto_subscribes_both_participants() {
725        let daemon = DaemonState::new(DaemonConfig::default());
726        let config = room_protocol::RoomConfig::dm("alice", "bob");
727        daemon
728            .create_room_with_config("dm-alice-bob", config)
729            .await
730            .unwrap();
731
732        let state = get_room(&daemon, "dm-alice-bob").await;
733        let subs = state.filters.subscription_map.lock().await;
734        assert_eq!(subs.len(), 2);
735        assert_eq!(
736            subs.get("alice"),
737            Some(&room_protocol::SubscriptionTier::Full)
738        );
739        assert_eq!(
740            subs.get("bob"),
741            Some(&room_protocol::SubscriptionTier::Full)
742        );
743    }
744
745    #[tokio::test]
746    async fn public_room_starts_with_no_subscriptions() {
747        let daemon = DaemonState::new(DaemonConfig::default());
748        let config = room_protocol::RoomConfig::public("owner");
749        daemon
750            .create_room_with_config("lobby", config)
751            .await
752            .unwrap();
753
754        let state = get_room(&daemon, "lobby").await;
755        let subs = state.filters.subscription_map.lock().await;
756        assert!(subs.is_empty());
757    }
758
759    #[tokio::test]
760    async fn unconfigured_room_starts_with_no_subscriptions() {
761        let daemon = DaemonState::new(DaemonConfig::default());
762        daemon.create_room("plain").await.unwrap();
763
764        let state = get_room(&daemon, "plain").await;
765        let subs = state.filters.subscription_map.lock().await;
766        assert!(subs.is_empty());
767    }
768
769    #[tokio::test]
770    async fn dm_auto_subscribe_uses_full_tier() {
771        let daemon = DaemonState::new(DaemonConfig::default());
772        let config = room_protocol::RoomConfig::dm("carol", "dave");
773        daemon
774            .create_room_with_config("dm-carol-dave", config)
775            .await
776            .unwrap();
777
778        let state = get_room(&daemon, "dm-carol-dave").await;
779        let subs = state.filters.subscription_map.lock().await;
780        // Verify it's Full, not MentionsOnly
781        for (_, tier) in subs.iter() {
782            assert_eq!(*tier, room_protocol::SubscriptionTier::Full);
783        }
784    }
785
786    #[test]
787    fn build_initial_subscriptions_dm_populates() {
788        let config = room_protocol::RoomConfig::dm("alice", "bob");
789        let subs = build_initial_subscriptions(&config);
790        assert_eq!(subs.len(), 2);
791        assert_eq!(subs["alice"], room_protocol::SubscriptionTier::Full);
792        assert_eq!(subs["bob"], room_protocol::SubscriptionTier::Full);
793    }
794
795    #[test]
796    fn build_initial_subscriptions_public_empty() {
797        let config = room_protocol::RoomConfig::public("owner");
798        let subs = build_initial_subscriptions(&config);
799        assert!(subs.is_empty());
800    }
801
802    // ── DaemonConfig grace_period_secs ────────────────────────────────────
803
804    #[test]
805    fn default_grace_period_is_30() {
806        let config = DaemonConfig::default();
807        assert_eq!(config.grace_period_secs, 30);
808    }
809
810    #[test]
811    fn custom_grace_period_preserved() {
812        let config = DaemonConfig {
813            grace_period_secs: 0,
814            ..DaemonConfig::default()
815        };
816        assert_eq!(config.grace_period_secs, 0);
817    }
818
819    // ── connection_count refcount ─────────────────────────────────────────
820
821    #[tokio::test]
822    async fn connection_count_starts_at_zero() {
823        let daemon = DaemonState::new(DaemonConfig::default());
824        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
825    }
826
827    #[tokio::test]
828    async fn connection_count_increments_and_decrements() {
829        let count = Arc::new(AtomicUsize::new(0));
830        count.fetch_add(1, Ordering::SeqCst);
831        count.fetch_add(1, Ordering::SeqCst);
832        assert_eq!(count.load(Ordering::SeqCst), 2);
833        count.fetch_sub(1, Ordering::SeqCst);
834        assert_eq!(count.load(Ordering::SeqCst), 1);
835        count.fetch_sub(1, Ordering::SeqCst);
836        assert_eq!(count.load(Ordering::SeqCst), 0);
837    }
838
839    /// Verify that the daemon exits cleanly when the shutdown signal is sent.
840    /// Uses an Arc<DaemonState> so the run() task can hold a reference while
841    /// the test also holds one to send the shutdown signal.
842    #[tokio::test]
843    async fn daemon_exits_on_shutdown_signal() {
844        let dir = tempfile::TempDir::new().unwrap();
845        let socket = dir.path().join("test-grace.sock");
846        std::fs::create_dir_all(dir.path().join("data")).unwrap();
847        std::fs::create_dir_all(dir.path().join("state")).unwrap();
848
849        let config = DaemonConfig {
850            socket_path: socket.clone(),
851            data_dir: dir.path().join("data"),
852            state_dir: dir.path().join("state"),
853            ws_port: None,
854            grace_period_secs: 0,
855        };
856        let daemon = Arc::new(DaemonState::new(config));
857        let shutdown = daemon.shutdown_handle();
858
859        let daemon2 = Arc::clone(&daemon);
860        let handle = tokio::spawn(async move { daemon2.run().await });
861
862        // Wait for socket to become connectable (daemon is up).
863        for _ in 0..100 {
864            if tokio::net::UnixStream::connect(&socket).await.is_ok() {
865                break;
866            }
867            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
868        }
869        assert!(
870            tokio::net::UnixStream::connect(&socket).await.is_ok(),
871            "daemon socket not ready"
872        );
873
874        // Send shutdown — daemon should exit quickly.
875        let _ = shutdown.send(true);
876        let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await;
877        assert!(result.is_ok(), "daemon did not exit within 5s");
878        assert!(result.unwrap().unwrap().is_ok(), "run() returned error");
879    }
880
881    /// Verify that a new connection during the grace period resets the timer.
882    /// We check this by confirming connection_count goes from 0 → 1 → 0 without
883    /// a premature shutdown.
884    #[tokio::test]
885    async fn grace_period_cancelled_by_new_connection() {
886        let dir = tempfile::TempDir::new().unwrap();
887        let socket = dir.path().join("test-cancel-grace.sock");
888
889        let config = DaemonConfig {
890            socket_path: socket.clone(),
891            data_dir: dir.path().join("data"),
892            state_dir: dir.path().join("state"),
893            ws_port: None,
894            grace_period_secs: 60, // long grace — should not fire
895        };
896        let daemon = DaemonState::new(config);
897
898        // Manually exercise the counter: simulate connect + disconnect.
899        daemon.connection_count.fetch_add(1, Ordering::SeqCst);
900        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
901        daemon.connection_count.fetch_sub(1, Ordering::SeqCst);
902        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
903
904        // Simulate a second connection arriving (cancels grace timer).
905        daemon.connection_count.fetch_add(1, Ordering::SeqCst);
906        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
907
908        // Daemon has not shut down.
909        assert!(!*daemon.shutdown.borrow());
910    }
911
912    // ── migrate_legacy_tmpdir_tokens ──────────────────────────────────────
913
914    /// Write a token file to `dir` in the format written by old `room join`.
915    fn write_legacy_token(dir: &std::path::Path, room_id: &str, username: &str, token: &str) {
916        let name = format!("room-{room_id}-{username}.token");
917        let data = serde_json::json!({"username": username, "token": token});
918        std::fs::write(dir.join(name), format!("{data}\n")).unwrap();
919    }
920
921    #[test]
922    fn migrate_legacy_tmpdir_tokens_imports_token() {
923        let token_dir = tempfile::TempDir::new().unwrap();
924        let state_dir = tempfile::TempDir::new().unwrap();
925        write_legacy_token(token_dir.path(), "lobby", "alice", "legacy-uuid-alice");
926
927        let mut registry = UserRegistry::new(state_dir.path().to_owned());
928
929        migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
930
931        assert_eq!(registry.validate_token("legacy-uuid-alice"), Some("alice"));
932        assert!(registry.get_user("alice").is_some());
933    }
934
935    #[test]
936    fn migrate_legacy_tmpdir_tokens_idempotent() {
937        let token_dir = tempfile::TempDir::new().unwrap();
938        let state_dir = tempfile::TempDir::new().unwrap();
939        write_legacy_token(token_dir.path(), "lobby", "bob", "tok-bob");
940
941        let mut registry = UserRegistry::new(state_dir.path().to_owned());
942        migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
943        migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
944
945        // Token still valid and exactly one entry for bob.
946        assert_eq!(registry.validate_token("tok-bob"), Some("bob"));
947        let snap = registry.token_snapshot();
948        assert_eq!(snap.values().filter(|u| u.as_str() == "bob").count(), 1);
949    }
950
951    #[test]
952    fn migrate_legacy_tmpdir_tokens_skips_non_token_files() {
953        let token_dir = tempfile::TempDir::new().unwrap();
954        let state_dir = tempfile::TempDir::new().unwrap();
955        std::fs::write(token_dir.path().join("roomd.sock"), "not a token").unwrap();
956        std::fs::write(token_dir.path().join("something.json"), "{}").unwrap();
957
958        let mut registry = UserRegistry::new(state_dir.path().to_owned());
959        migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
960
961        assert!(registry.list_users().is_empty());
962    }
963
964    #[test]
965    fn migrate_legacy_tmpdir_tokens_skips_malformed_json() {
966        let token_dir = tempfile::TempDir::new().unwrap();
967        let state_dir = tempfile::TempDir::new().unwrap();
968        std::fs::write(token_dir.path().join("room-x-bad.token"), "not-json{{{").unwrap();
969
970        let mut registry = UserRegistry::new(state_dir.path().to_owned());
971        migration::migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
972
973        assert!(registry.list_users().is_empty());
974    }
975
976    // ── DaemonRoomResolver ───────────────────────────────────────────────
977
978    #[tokio::test]
979    async fn daemon_room_resolver_finds_existing_room() {
980        let daemon = DaemonState::new(DaemonConfig::default());
981        daemon.create_room("target").await.unwrap();
982
983        let resolver = DaemonRoomResolver::new(daemon.rooms.clone());
984        let result = resolver.resolve_room("target").await;
985        assert!(result.is_some());
986        assert_eq!(result.unwrap().room_id.as_str(), "target");
987    }
988
989    #[tokio::test]
990    async fn daemon_room_resolver_returns_none_for_unknown() {
991        let daemon = DaemonState::new(DaemonConfig::default());
992        let resolver = DaemonRoomResolver::new(daemon.rooms.clone());
993        assert!(resolver.resolve_room("nonexistent").await.is_none());
994    }
995
996    #[tokio::test]
997    async fn created_rooms_have_resolver_attached() {
998        let daemon = DaemonState::new(DaemonConfig::default());
999        daemon.create_room("room-a").await.unwrap();
1000
1001        let state = get_room(&daemon, "room-a").await;
1002        assert!(
1003            state.cross_room_resolver.get().is_some(),
1004            "daemon-created rooms must have a cross-room resolver attached"
1005        );
1006    }
1007
1008    #[tokio::test]
1009    async fn cross_room_resolver_resolves_sibling_room() {
1010        let daemon = DaemonState::new(DaemonConfig::default());
1011        daemon.create_room("room-a").await.unwrap();
1012        daemon.create_room("room-b").await.unwrap();
1013
1014        let state_a = get_room(&daemon, "room-a").await;
1015        let resolver = state_a.cross_room_resolver.get().unwrap();
1016        let resolved = resolver.resolve_room("room-b").await;
1017        assert!(resolved.is_some(), "resolver on room-a should find room-b");
1018        assert_eq!(resolved.unwrap().room_id.as_str(), "room-b");
1019    }
1020}