Skip to main content

room_cli/broker/
daemon.rs

1//! Multi-room daemon: manages N rooms in a single process.
2//!
3//! `DaemonState` wraps a map of room_id → `RoomState` and provides room
4//! lifecycle (create/destroy/get). The daemon listens on a single UDS
5//! socket at a configurable path and dispatches connections to the correct
6//! room based on an extended handshake protocol.
7//!
8//! ## Handshake protocol
9//!
10//! The first line of a UDS connection to the daemon can carry one of two
11//! prefixes:
12//!
13//! - `ROOM:<room_id>:<rest>` — route to an existing room. The rest of the
14//!   line is the standard per-room handshake (`SEND:`, `TOKEN:`, `JOIN:`,
15//!   or plain username).
16//! - `CREATE:<room_id>` — create a new room. A second line carries the
17//!   room configuration as JSON (`{"visibility":"public","invite":[]}`).
18//! - `DESTROY:<room_id>` — destroy a room. Signals shutdown to connected
19//!   clients and removes the room from the daemon's map.
20//!
21//! If no recognised prefix is present, the connection is rejected with an error.
22//!
23//! Examples:
24//! ```text
25//! ROOM:myroom:JOIN:alice       → join room "myroom" as "alice"
26//! ROOM:myroom:TOKEN:<uuid>     → authenticated send to "myroom"
27//! ROOM:myroom:SEND:bob         → legacy unauthenticated send to "myroom"
28//! ROOM:myroom:alice            → interactive join to "myroom" as "alice"
29//! CREATE:newroom               → create room "newroom" (config on next line)
30//! DESTROY:myroom               → destroy room "myroom"
31//! ```
32
33use std::{
34    collections::HashMap,
35    path::PathBuf,
36    sync::{
37        atomic::{AtomicU64, AtomicUsize, Ordering},
38        Arc,
39    },
40};
41
42use tokio::{
43    net::UnixListener,
44    sync::{broadcast, watch, Mutex},
45};
46
47use crate::registry::UserRegistry;
48
49use super::{
50    handle_oneshot_send,
51    state::{RoomState, TokenMap},
52    ws::{self, DaemonWsState},
53};
54
55/// Characters that are unsafe in filesystem paths or shell contexts.
56const UNSAFE_CHARS: &[char] = &['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\0'];
57
58// ── PID file management ───────────────────────────────────────────────────────
59
60/// Write the current process's PID to `path` (creates or overwrites).
61pub fn write_pid_file(path: &std::path::Path) -> std::io::Result<()> {
62    std::fs::write(path, std::process::id().to_string())
63}
64
65/// Returns `true` if the PID recorded in `path` belongs to a running process.
66///
67/// Returns `false` when the file is missing, unreadable, or unparseable, and
68/// when the process is confirmed dead (ESRCH).
69pub fn is_pid_alive(path: &std::path::Path) -> bool {
70    let Ok(contents) = std::fs::read_to_string(path) else {
71        return false;
72    };
73    let Ok(pid) = contents.trim().parse::<u32>() else {
74        return false;
75    };
76    pid_alive(pid)
77}
78
79/// Remove the PID file, ignoring errors (best-effort cleanup).
80pub fn remove_pid_file(path: &std::path::Path) {
81    let _ = std::fs::remove_file(path);
82}
83
84/// Check whether a process with the given PID is currently running.
85///
86/// Uses POSIX `kill(pid, 0)` — signal 0 never delivers a signal but the kernel
87/// validates whether the calling process may signal `pid`, returning:
88/// - `0`  → process exists
89/// - `-1` with `EPERM` (errno 1)  → process exists, permission denied
90/// - `-1` with `ESRCH` (errno 3)  → no such process
91#[cfg(unix)]
92fn pid_alive(pid: u32) -> bool {
93    extern "C" {
94        fn kill(pid: i32, sig: i32) -> i32;
95    }
96    // SAFETY: kill(pid, 0) never delivers a signal; it only checks liveness.
97    let ret = unsafe { kill(pid as i32, 0) };
98    if ret == 0 {
99        return true;
100    }
101    // EPERM == 1 on Linux and macOS: process exists but we lack permission.
102    std::io::Error::last_os_error().raw_os_error() == Some(1)
103}
104
105#[cfg(not(unix))]
106fn pid_alive(_pid: u32) -> bool {
107    // Conservative: assume the process is alive on non-Unix platforms.
108    true
109}
110
111/// Maximum allowed length for a room ID.
112const MAX_ROOM_ID_LEN: usize = 64;
113
114/// Validate a room ID for filesystem safety.
115///
116/// Rejects IDs that are empty, too long, contain path traversal sequences
117/// (`..`), whitespace, or filesystem-unsafe characters.
118pub fn validate_room_id(room_id: &str) -> Result<(), String> {
119    if room_id.is_empty() {
120        return Err("room ID cannot be empty".into());
121    }
122    if room_id.len() > MAX_ROOM_ID_LEN {
123        return Err(format!(
124            "room ID too long ({} chars, max {MAX_ROOM_ID_LEN})",
125            room_id.len()
126        ));
127    }
128    if room_id == "." || room_id == ".." || room_id.contains("..") {
129        return Err("room ID cannot contain '..'".into());
130    }
131    if room_id.chars().any(|c| c.is_whitespace()) {
132        return Err("room ID cannot contain whitespace".into());
133    }
134    if let Some(bad) = room_id.chars().find(|c| UNSAFE_CHARS.contains(c)) {
135        return Err(format!("room ID contains unsafe character: {bad:?}"));
136    }
137    Ok(())
138}
139
140/// Configuration for the daemon.
141#[derive(Debug, Clone)]
142pub struct DaemonConfig {
143    /// Path to the daemon UDS socket (ephemeral, platform-native temp dir).
144    pub socket_path: PathBuf,
145    /// Directory for chat files. Each room gets `<data_dir>/<room_id>.chat`.
146    /// Defaults to `~/.room/data/`; overridable with `--data-dir`.
147    pub data_dir: PathBuf,
148    /// Directory for state files (token maps, cursors, subscriptions).
149    /// Defaults to `~/.room/state/`.
150    pub state_dir: PathBuf,
151    /// Optional WebSocket/REST port.
152    pub ws_port: Option<u16>,
153    /// Seconds to wait after the last connection closes before shutting down.
154    ///
155    /// Default is 30 seconds. Set to 0 for immediate shutdown when the last
156    /// client disconnects. Has no effect if there are always active connections.
157    pub grace_period_secs: u64,
158}
159
160impl DaemonConfig {
161    /// Resolve the chat file path for a given room.
162    pub fn chat_path(&self, room_id: &str) -> PathBuf {
163        self.data_dir.join(format!("{room_id}.chat"))
164    }
165
166    /// Resolve the token-map persistence path for a given room.
167    pub fn token_map_path(&self, room_id: &str) -> PathBuf {
168        crate::paths::broker_tokens_path(&self.state_dir, room_id)
169    }
170
171    /// System-level token persistence path: `<state_dir>/tokens.json`.
172    ///
173    /// Used by the daemon to share a single token store across all rooms.
174    /// Production default is `~/.room/state/tokens.json`; tests override
175    /// `state_dir` with a temp directory.
176    pub fn system_tokens_path(&self) -> PathBuf {
177        self.state_dir.join("tokens.json")
178    }
179
180    /// Resolve the subscription-map persistence path for a given room.
181    pub fn subscription_map_path(&self, room_id: &str) -> PathBuf {
182        crate::paths::broker_subscriptions_path(&self.state_dir, room_id)
183    }
184}
185
186impl Default for DaemonConfig {
187    fn default() -> Self {
188        Self {
189            socket_path: crate::paths::room_socket_path(),
190            data_dir: crate::paths::room_data_dir(),
191            state_dir: crate::paths::room_state_dir(),
192            ws_port: None,
193            grace_period_secs: 30,
194        }
195    }
196}
197
198/// Registry of active rooms, keyed by room_id.
199pub(crate) type RoomMap = Arc<Mutex<HashMap<String, Arc<RoomState>>>>;
200
201/// Multi-room daemon state.
202pub struct DaemonState {
203    pub(crate) rooms: RoomMap,
204    pub(crate) config: DaemonConfig,
205    /// Global client ID counter shared across all rooms.
206    pub(crate) next_client_id: Arc<AtomicU64>,
207    /// Daemon-level shutdown signal.
208    pub(crate) shutdown: Arc<watch::Sender<bool>>,
209    /// System-level token map shared across all rooms (runtime cache).
210    ///
211    /// A single `Arc<Mutex<HashMap>>` instance is cloned into every room's
212    /// `token_map`. Tokens issued in any room are valid in all rooms managed
213    /// by this daemon. Seeded from `user_registry` on startup; kept in sync
214    /// by [`super::auth::issue_token_via_registry`].
215    pub(crate) system_token_map: TokenMap,
216    /// Daemon-level user registry — sole persistence layer for cross-room identity.
217    ///
218    /// Stores user profiles, room memberships, and tokens to
219    /// `~/.room/state/users.json`. New sessions register/update here;
220    /// `system_token_map` is derived from this registry at startup and kept
221    /// in sync on every join.
222    pub(crate) user_registry: Arc<tokio::sync::Mutex<UserRegistry>>,
223    /// Number of currently active UDS connections.
224    ///
225    /// Incremented when a connection is accepted; decremented when the
226    /// connection task completes. When the count drops to zero the daemon
227    /// starts a grace period timer before sending the shutdown signal.
228    pub(crate) connection_count: Arc<AtomicUsize>,
229}
230
231impl DaemonState {
232    /// Create a new daemon with the given configuration and no rooms.
233    pub fn new(config: DaemonConfig) -> Self {
234        let (shutdown_tx, _) = watch::channel(false);
235
236        // Load UserRegistry from disk (sole source of truth for identity).
237        //
238        // Migration path: if `users.json` (UserRegistry) does not exist but
239        // the legacy `tokens.json` (system_token_map from #334) does, import
240        // the flat token map into a fresh registry so existing sessions survive
241        // the upgrade without requiring a forced re-join.
242        let registry = load_or_migrate_registry(&config);
243
244        // Seed the runtime token map from the registry so existing tokens remain
245        // valid across daemon restarts without requiring a fresh join.
246        let token_snapshot = registry.token_snapshot();
247
248        Self {
249            rooms: Arc::new(Mutex::new(HashMap::new())),
250            config,
251            next_client_id: Arc::new(AtomicU64::new(0)),
252            shutdown: Arc::new(shutdown_tx),
253            system_token_map: Arc::new(Mutex::new(token_snapshot)),
254            user_registry: Arc::new(tokio::sync::Mutex::new(registry)),
255            connection_count: Arc::new(AtomicUsize::new(0)),
256        }
257    }
258
259    /// Create a room and register it. Returns `Err` if the room ID is invalid
260    /// or the room already exists.
261    pub async fn create_room(&self, room_id: &str) -> Result<(), String> {
262        create_room_entry(
263            room_id,
264            None,
265            &self.rooms,
266            &self.config,
267            &self.system_token_map,
268            Some(self.user_registry.clone()),
269        )
270        .await
271    }
272
273    /// Create a room with explicit configuration. Returns `Err` if the room ID
274    /// is invalid or the room already exists.
275    pub async fn create_room_with_config(
276        &self,
277        room_id: &str,
278        config: room_protocol::RoomConfig,
279    ) -> Result<(), String> {
280        create_room_entry(
281            room_id,
282            Some(config),
283            &self.rooms,
284            &self.config,
285            &self.system_token_map,
286            Some(self.user_registry.clone()),
287        )
288        .await
289    }
290
291    /// Get a room's config, if it exists.
292    pub async fn get_room_config(&self, room_id: &str) -> Option<room_protocol::RoomConfig> {
293        self.rooms
294            .lock()
295            .await
296            .get(room_id)
297            .and_then(|s| s.config.clone())
298    }
299
300    /// Destroy a room. Returns `Err` if the room does not exist.
301    ///
302    /// Signals the room's shutdown so connected clients receive EOF.
303    pub async fn destroy_room(&self, room_id: &str) -> Result<(), String> {
304        let mut rooms = self.rooms.lock().await;
305        let state = rooms
306            .remove(room_id)
307            .ok_or_else(|| format!("room not found: {room_id}"))?;
308
309        // Signal the room's shutdown so any connected clients receive EOF.
310        let _ = state.shutdown.send(true);
311        Ok(())
312    }
313
314    /// Check if a room exists.
315    pub async fn has_room(&self, room_id: &str) -> bool {
316        self.rooms.lock().await.contains_key(room_id)
317    }
318
319    /// Get a handle to the daemon-level shutdown sender.
320    pub fn shutdown_handle(&self) -> Arc<watch::Sender<bool>> {
321        self.shutdown.clone()
322    }
323
324    /// List all active room IDs.
325    pub async fn list_rooms(&self) -> Vec<String> {
326        self.rooms.lock().await.keys().cloned().collect()
327    }
328
329    /// Insert a token directly into a room's token map, bypassing the join
330    /// permission check. Intended for integration tests only.
331    #[doc(hidden)]
332    pub async fn test_inject_token(
333        &self,
334        room_id: &str,
335        username: &str,
336        token: &str,
337    ) -> Result<(), String> {
338        let rooms = self.rooms.lock().await;
339        let room = rooms
340            .get(room_id)
341            .ok_or_else(|| format!("room not found: {room_id}"))?;
342        room.token_map
343            .lock()
344            .await
345            .insert(token.to_owned(), username.to_owned());
346        Ok(())
347    }
348
349    /// Run the daemon: listen on UDS, dispatch connections to rooms.
350    ///
351    /// When the last UDS connection closes, starts a grace period timer
352    /// (`config.grace_period_secs`). If no new connection arrives before the
353    /// timer fires, sends a shutdown signal. Any new connection during the
354    /// grace period cancels the timer. On exit, cleans up the PID file and
355    /// socket file.
356    pub async fn run(&self) -> anyhow::Result<()> {
357        // Write PID file only for the default daemon socket.  Daemons with an
358        // explicit socket override (tests, CI) are independent instances and
359        // must not clobber the system PID file.
360        let pid_path = if self.config.socket_path == crate::paths::room_socket_path() {
361            match write_pid_file(&crate::paths::room_pid_path()) {
362                Ok(()) => Some(crate::paths::room_pid_path()),
363                Err(e) => {
364                    eprintln!("[daemon] failed to write PID file: {e}");
365                    None
366                }
367            }
368        } else {
369            None
370        };
371
372        // Remove stale socket synchronously.
373        if self.config.socket_path.exists() {
374            std::fs::remove_file(&self.config.socket_path)?;
375        }
376
377        let listener = UnixListener::bind(&self.config.socket_path)?;
378        eprintln!(
379            "[daemon] listening on {}",
380            self.config.socket_path.display()
381        );
382
383        let mut shutdown_rx = self.shutdown.subscribe();
384        let grace_duration = tokio::time::Duration::from_secs(self.config.grace_period_secs);
385
386        // mpsc channel: connection tasks notify the main loop when they close.
387        let (close_tx, mut close_rx) = tokio::sync::mpsc::channel::<()>(64);
388
389        // Optional grace period sleep — active when the last connection closes.
390        let mut grace_sleep: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
391
392        // Start WebSocket/REST server if configured.
393        if let Some(port) = self.config.ws_port {
394            let ws_state = DaemonWsState {
395                rooms: self.rooms.clone(),
396                next_client_id: self.next_client_id.clone(),
397                config: self.config.clone(),
398                system_token_map: self.system_token_map.clone(),
399                user_registry: self.user_registry.clone(),
400            };
401            let app = ws::create_daemon_router(ws_state);
402            let tcp = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
403            eprintln!("[daemon] WebSocket/REST listening on port {port}");
404            tokio::spawn(async move {
405                if let Err(e) = axum::serve(tcp, app).await {
406                    eprintln!("[daemon] WS server error: {e}");
407                }
408            });
409        }
410
411        let result = loop {
412            // Build the grace future: fires if a grace sleep is active,
413            // otherwise stays pending forever.
414            let grace_fut = async {
415                match grace_sleep.as_mut() {
416                    Some(s) => {
417                        s.await;
418                    }
419                    None => std::future::pending::<()>().await,
420                }
421            };
422
423            tokio::select! {
424                accept = listener.accept() => {
425                    let (stream, _) = match accept {
426                        Ok(a) => a,
427                        Err(e) => break Err(e.into()),
428                    };
429                    // Cancel any pending grace timer — we have a new connection.
430                    grace_sleep = None;
431
432                    let count = self.connection_count.clone();
433                    count.fetch_add(1, Ordering::SeqCst);
434                    let rooms = self.rooms.clone();
435                    let next_id = self.next_client_id.clone();
436                    let cfg = self.config.clone();
437                    let sys_tokens = self.system_token_map.clone();
438                    let registry = self.user_registry.clone();
439                    let tx = close_tx.clone();
440
441                    tokio::spawn(async move {
442                        if let Err(e) = dispatch_connection(stream, &rooms, &next_id, &cfg, &sys_tokens, &registry).await {
443                            eprintln!("[daemon] connection error: {e:#}");
444                        }
445                        count.fetch_sub(1, Ordering::SeqCst);
446                        // Notify main loop so it can start the grace timer.
447                        let _ = tx.send(()).await;
448                    });
449                }
450                Some(()) = close_rx.recv() => {
451                    // A connection closed. Start grace period if none remain.
452                    if self.connection_count.load(Ordering::SeqCst) == 0 {
453                        eprintln!(
454                            "[daemon] no connections — grace period {}s started",
455                            self.config.grace_period_secs
456                        );
457                        grace_sleep =
458                            Some(Box::pin(tokio::time::sleep(grace_duration)));
459                    }
460                }
461                _ = grace_fut => {
462                    eprintln!("[daemon] grace period expired, shutting down");
463                    let _ = self.shutdown.send(true);
464                    // The shutdown_rx arm will fire on the next iteration; break
465                    // here directly to avoid a double-exit path.
466                    break Ok(());
467                }
468                _ = shutdown_rx.changed() => {
469                    eprintln!("[daemon] shutdown requested, exiting");
470                    if let Some(ref p) = pid_path {
471                        remove_pid_file(p);
472                    }
473                    break Ok(());
474                }
475            }
476        };
477
478        // Clean up ephemeral files on exit.
479        let _ = std::fs::remove_file(&self.config.socket_path);
480        let _ = std::fs::remove_file(crate::paths::room_pid_path());
481        // Remove per-room meta files written during room creation.
482        for room_id in self.list_rooms().await {
483            let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
484        }
485
486        result
487    }
488}
489
490/// Load `UserRegistry` from `users.json`, or migrate from the legacy
491/// `tokens.json` (written by the #334 system-token-map implementation)
492/// if `users.json` does not yet exist.
493///
494/// After loading (or creating) the registry, always scans the legacy runtime
495/// directory for per-room `.token` files left by older `room join` invocations
496/// and imports any that are not already present. This lets clients that joined
497/// before the `~/.room/state/` migration continue to use their existing tokens
498/// without a forced re-join.
499fn load_or_migrate_registry(config: &DaemonConfig) -> UserRegistry {
500    let users_path = config.state_dir.join("users.json");
501
502    let mut registry = if users_path.exists() {
503        // Fast path: users.json exists — use it directly.
504        UserRegistry::load(config.state_dir.clone()).unwrap_or_else(|e| {
505            eprintln!("[daemon] failed to load user registry: {e}; starting empty");
506            UserRegistry::new(config.state_dir.clone())
507        })
508    } else {
509        // Migration path: import from legacy tokens.json if present.
510        let tokens_path = config.system_tokens_path();
511        if tokens_path.exists() {
512            let legacy = super::auth::load_token_map(&tokens_path);
513            if !legacy.is_empty() {
514                eprintln!(
515                    "[daemon] migrating {} token(s) from tokens.json to users.json",
516                    legacy.len()
517                );
518                let mut reg = UserRegistry::new(config.state_dir.clone());
519                for (token, username) in &legacy {
520                    // register_user_idempotent is a no-op if already present.
521                    if let Err(e) = reg.register_user_idempotent(username) {
522                        eprintln!("[daemon] migration: register {username}: {e}");
523                        continue;
524                    }
525                    // Re-insert the existing token directly via issue_token so the
526                    // UUID is preserved. Since UserRegistry.issue_token generates a
527                    // new UUID, we instead manipulate the token map via the public
528                    // API by revoking nothing and accepting the registry's new token.
529                    // Trade-off: legacy UUIDs are replaced; clients must re-join.
530                    // This is acceptable — migration is a one-time event.
531                    let _ = reg.issue_token(username);
532                    let _ = token; // legacy token not preserved — clients must re-join
533                }
534                if let Err(e) = reg.save() {
535                    eprintln!("[daemon] migration save failed: {e}");
536                }
537                reg
538            } else {
539                // tokens.json exists but is empty — start fresh.
540                UserRegistry::new(config.state_dir.clone())
541            }
542        } else {
543            // Neither file exists — start fresh.
544            UserRegistry::new(config.state_dir.clone())
545        }
546    };
547
548    // Always scan the legacy runtime dir for old per-room token files and
549    // import any that are not already in the registry. Idempotent — safe to
550    // run on every startup.
551    migrate_legacy_tmpdir_tokens(&mut registry);
552
553    registry
554}
555
556/// Scan the legacy runtime directory for per-room token files and import
557/// them into `registry`.
558///
559/// Before `~/.room/state/` was introduced, `room join` wrote token files to
560/// the platform runtime directory (`$TMPDIR` on macOS, `/tmp/` on Linux)
561/// as `room-<room_id>-<username>.token`. This function reads each such file,
562/// parses the `username` and `token` fields, and imports them — preserving
563/// the UUID so existing clients do not need to re-join. Files whose tokens
564/// are already in the registry are silently skipped (idempotent).
565fn migrate_legacy_tmpdir_tokens(registry: &mut UserRegistry) {
566    let legacy_dir = crate::paths::legacy_token_dir();
567    migrate_legacy_tmpdir_tokens_from(&legacy_dir, registry);
568}
569
570/// Inner implementation of [`migrate_legacy_tmpdir_tokens`] that accepts an
571/// explicit directory. Extracted so tests can pass a temp directory without
572/// modifying process environment variables.
573fn migrate_legacy_tmpdir_tokens_from(legacy_dir: &std::path::Path, registry: &mut UserRegistry) {
574    let entries = match std::fs::read_dir(legacy_dir) {
575        Ok(e) => e,
576        Err(_) => return,
577    };
578    let mut count = 0usize;
579    for entry in entries.filter_map(|e| e.ok()) {
580        let path = entry.path();
581        let name = match path.file_name().and_then(|n| n.to_str()) {
582            Some(n) => n.to_owned(),
583            None => continue,
584        };
585        if !name.starts_with("room-") || !name.ends_with(".token") {
586            continue;
587        }
588        let data = match std::fs::read_to_string(&path) {
589            Ok(d) => d,
590            Err(_) => continue,
591        };
592        let v: serde_json::Value = match serde_json::from_str(data.trim()) {
593            Ok(v) => v,
594            Err(_) => continue,
595        };
596        let (username, token) = match (v["username"].as_str(), v["token"].as_str()) {
597            (Some(u), Some(t)) if !u.is_empty() && !t.is_empty() => (u.to_owned(), t.to_owned()),
598            _ => continue,
599        };
600        if let Err(e) = registry.register_user_idempotent(&username) {
601            eprintln!("[daemon] legacy token migration: register {username}: {e}");
602            continue;
603        }
604        match registry.import_token(&username, &token) {
605            Ok(()) => count += 1,
606            Err(e) => {
607                eprintln!("[daemon] legacy token migration: import token for {username}: {e}")
608            }
609        }
610    }
611    if count > 0 {
612        eprintln!(
613            "[daemon] imported {count} legacy token(s) from {}",
614            legacy_dir.display()
615        );
616    }
617}
618
619/// Build the initial subscription map for a room based on its config.
620///
621/// DM rooms auto-subscribe both participants at `Full` so they receive all
622/// messages without an explicit `/subscribe` call. Other room types start
623/// with an empty subscription map (users subscribe explicitly or via
624/// auto-subscribe-on-mention).
625fn build_initial_subscriptions(
626    config: &room_protocol::RoomConfig,
627) -> HashMap<String, room_protocol::SubscriptionTier> {
628    let mut subs = HashMap::new();
629    if config.visibility == room_protocol::RoomVisibility::Dm {
630        for user in &config.invite_list {
631            subs.insert(user.clone(), room_protocol::SubscriptionTier::Full);
632        }
633    }
634    subs
635}
636
637/// Core room-creation logic shared by UDS and REST paths.
638///
639/// Validates the room ID, checks for duplicates, builds a [`RoomState`], and
640/// inserts it into the room map. Pass `config: None` to create a configless
641/// room (no invite list, no visibility constraint).
642///
643/// `registry` is attached to the [`RoomState`] via [`RoomState::set_registry`]
644/// so that admin commands (`/kick`, `/reauth`) can also revoke tokens from the
645/// daemon-level [`UserRegistry`] in addition to the in-memory token map.
646pub(crate) async fn create_room_entry(
647    room_id: &str,
648    config: Option<room_protocol::RoomConfig>,
649    rooms: &RoomMap,
650    daemon_config: &DaemonConfig,
651    system_token_map: &TokenMap,
652    registry: Option<Arc<tokio::sync::Mutex<UserRegistry>>>,
653) -> Result<(), String> {
654    validate_room_id(room_id)?;
655    {
656        let map = rooms.lock().await;
657        if map.contains_key(room_id) {
658            return Err(format!("room already exists: {room_id}"));
659        }
660    }
661
662    let chat_path = daemon_config.chat_path(room_id);
663    let subscription_map_path = daemon_config.subscription_map_path(room_id);
664
665    let persisted_subs = super::commands::load_subscription_map(&subscription_map_path);
666    let merged_subs = if let Some(ref cfg) = config {
667        let mut initial = build_initial_subscriptions(cfg);
668        initial.extend(persisted_subs);
669        initial
670    } else {
671        persisted_subs
672    };
673
674    // All rooms in this daemon share the same token map so a token
675    // issued in any room is valid in all rooms.
676    let state = RoomState::new(
677        room_id.to_owned(),
678        chat_path,
679        daemon_config.system_tokens_path(),
680        subscription_map_path,
681        Arc::clone(system_token_map),
682        Arc::new(Mutex::new(merged_subs)),
683        config,
684    )?;
685    if let Some(reg) = registry {
686        state.set_registry(reg);
687    }
688
689    rooms.lock().await.insert(room_id.to_owned(), state);
690
691    // Write a meta file so one-shot commands (poll, watch, pull) can find the
692    // chat file without a broker connection. The meta file lives in the
693    // platform runtime dir alongside the daemon socket.
694    let meta_path = crate::paths::room_meta_path(room_id);
695    let chat_path_str = daemon_config.chat_path(room_id);
696    let meta_json = serde_json::json!({ "chat_path": chat_path_str });
697    let _ = std::fs::write(&meta_path, meta_json.to_string());
698
699    Ok(())
700}
701
702/// Handle a `DESTROY:<room_id>` request: remove the room from the daemon.
703///
704/// Protocol:
705/// 1. Client sends `DESTROY:<room_id>\n`
706/// 2. Client sends `<token>\n` on the next line.
707/// 3. Daemon validates the token, then responds with
708///    `{"type":"room_destroyed","room":"<id>"}\n` or an error.
709///
710/// Connected clients receive EOF when the room's shutdown signal fires.
711/// Chat files are preserved on disk.
712async fn handle_destroy(
713    room_id: &str,
714    reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
715    write_half: &mut tokio::net::unix::OwnedWriteHalf,
716    rooms: &RoomMap,
717    user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
718) -> anyhow::Result<()> {
719    use tokio::io::AsyncWriteExt;
720
721    if room_id.is_empty() {
722        let err = serde_json::json!({
723            "type": "error",
724            "code": "invalid_room_id",
725            "message": "room ID is empty"
726        });
727        write_half.write_all(format!("{err}\n").as_bytes()).await?;
728        return Ok(());
729    }
730
731    // Read the token from the second line.
732    let mut token_line = String::new();
733    super::read_line_limited(reader, &mut token_line).await?;
734    let token = token_line.trim();
735
736    if token.is_empty() {
737        let err = serde_json::json!({
738            "type": "error",
739            "code": "missing_token",
740            "message": "DESTROY requires a valid token on the second line"
741        });
742        write_half.write_all(format!("{err}\n").as_bytes()).await?;
743        return Ok(());
744    }
745
746    // Validate against UserRegistry.
747    {
748        let reg = user_registry.lock().await;
749        if reg.validate_token(token).is_none() {
750            let err = serde_json::json!({
751                "type": "error",
752                "code": "invalid_token",
753                "message": "token is not valid"
754            });
755            write_half.write_all(format!("{err}\n").as_bytes()).await?;
756            return Ok(());
757        }
758    }
759
760    // Remove the room and signal shutdown.
761    let state = {
762        let mut map = rooms.lock().await;
763        map.remove(room_id)
764    };
765
766    match state {
767        Some(s) => {
768            // Signal shutdown so connected clients receive EOF.
769            let _ = s.shutdown.send(true);
770            let ok = serde_json::json!({
771                "type": "room_destroyed",
772                "room": room_id
773            });
774            write_half.write_all(format!("{ok}\n").as_bytes()).await?;
775        }
776        None => {
777            let err = serde_json::json!({
778                "type": "error",
779                "code": "room_not_found",
780                "room": room_id
781            });
782            write_half.write_all(format!("{err}\n").as_bytes()).await?;
783        }
784    }
785
786    Ok(())
787}
788
789/// Handle a `CREATE:<room_id>` request: validate, read config, create the room.
790///
791/// Protocol:
792/// 1. Client sends `CREATE:<room_id>\n`
793/// 2. Client sends config JSON on the next line: `{"visibility":"public","invite":[]}\n`
794/// 3. Daemon responds with `{"type":"room_created","room":"<id>"}\n` or an error envelope.
795async fn handle_create(
796    room_id: &str,
797    reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
798    write_half: &mut tokio::net::unix::OwnedWriteHalf,
799    rooms: &RoomMap,
800    daemon_config: &DaemonConfig,
801    system_token_map: &TokenMap,
802    user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
803) -> anyhow::Result<()> {
804    use tokio::io::AsyncWriteExt;
805
806    // Validate room ID.
807    if let Err(e) = validate_room_id(room_id) {
808        let err = serde_json::json!({
809            "type": "error",
810            "code": "invalid_room_id",
811            "message": e
812        });
813        write_half.write_all(format!("{err}\n").as_bytes()).await?;
814        return Ok(());
815    }
816
817    // Check for duplicate before reading config (fast-fail).
818    {
819        let map = rooms.lock().await;
820        if map.contains_key(room_id) {
821            let err = serde_json::json!({
822                "type": "error",
823                "code": "room_exists",
824                "message": format!("room already exists: {room_id}")
825            });
826            write_half.write_all(format!("{err}\n").as_bytes()).await?;
827            return Ok(());
828        }
829    }
830
831    // Read config JSON from second line.
832    let mut config_line = String::new();
833    super::read_line_limited(reader, &mut config_line).await?;
834    let config_str = config_line.trim();
835
836    let (visibility_str, invite, token): (String, Vec<String>, Option<String>) =
837        if config_str.is_empty() {
838            ("public".into(), vec![], None)
839        } else {
840            let v: serde_json::Value = match serde_json::from_str(config_str) {
841                Ok(v) => v,
842                Err(e) => {
843                    let err = serde_json::json!({
844                        "type": "error",
845                        "code": "invalid_config",
846                        "message": format!("invalid config JSON: {e}")
847                    });
848                    write_half.write_all(format!("{err}\n").as_bytes()).await?;
849                    return Ok(());
850                }
851            };
852            let vis = v["visibility"].as_str().unwrap_or("public").to_owned();
853            let inv = v["invite"]
854                .as_array()
855                .map(|arr| {
856                    arr.iter()
857                        .filter_map(|v| v.as_str().map(|s| s.to_owned()))
858                        .collect()
859                })
860                .unwrap_or_default();
861            let tok = v["token"].as_str().map(|s| s.to_owned());
862            (vis, inv, tok)
863        };
864
865    // Validate the token.
866    let token_str = match token.as_deref() {
867        Some(t) if !t.is_empty() => t,
868        _ => {
869            let err = serde_json::json!({
870                "type": "error",
871                "code": "missing_token",
872                "message": "CREATE requires a valid token in the config JSON"
873            });
874            write_half.write_all(format!("{err}\n").as_bytes()).await?;
875            return Ok(());
876        }
877    };
878    {
879        let reg = user_registry.lock().await;
880        if reg.validate_token(token_str).is_none() {
881            let err = serde_json::json!({
882                "type": "error",
883                "code": "invalid_token",
884                "message": "token is not valid"
885            });
886            write_half.write_all(format!("{err}\n").as_bytes()).await?;
887            return Ok(());
888        }
889    }
890
891    // Build RoomConfig from the parsed visibility + invite list.
892    let room_config = match visibility_str.as_str() {
893        "public" => room_protocol::RoomConfig {
894            visibility: room_protocol::RoomVisibility::Public,
895            max_members: None,
896            invite_list: invite.into_iter().collect(),
897            created_by: "system".to_owned(),
898            created_at: chrono::Utc::now().to_rfc3339(),
899        },
900        "private" => room_protocol::RoomConfig {
901            visibility: room_protocol::RoomVisibility::Private,
902            max_members: None,
903            invite_list: invite.into_iter().collect(),
904            created_by: "system".to_owned(),
905            created_at: chrono::Utc::now().to_rfc3339(),
906        },
907        "dm" => {
908            if invite.len() != 2 {
909                let err = serde_json::json!({
910                    "type": "error",
911                    "code": "invalid_config",
912                    "message": "dm visibility requires exactly 2 users in invite list"
913                });
914                write_half.write_all(format!("{err}\n").as_bytes()).await?;
915                return Ok(());
916            }
917            room_protocol::RoomConfig::dm(&invite[0], &invite[1])
918        }
919        other => {
920            let err = serde_json::json!({
921                "type": "error",
922                "code": "invalid_config",
923                "message": format!("unknown visibility: {other}")
924            });
925            write_half.write_all(format!("{err}\n").as_bytes()).await?;
926            return Ok(());
927        }
928    };
929
930    // Delegate to the shared room-creation helper.
931    if let Err(e) = create_room_entry(
932        room_id,
933        Some(room_config),
934        rooms,
935        daemon_config,
936        system_token_map,
937        Some(user_registry.clone()),
938    )
939    .await
940    {
941        let err = serde_json::json!({
942            "type": "error",
943            "code": "internal",
944            "message": e
945        });
946        write_half.write_all(format!("{err}\n").as_bytes()).await?;
947        return Ok(());
948    }
949
950    let ok = serde_json::json!({
951        "type": "room_created",
952        "room": room_id
953    });
954    write_half.write_all(format!("{ok}\n").as_bytes()).await?;
955    Ok(())
956}
957
958/// Handle a global `JOIN:<username>` request at daemon level.
959///
960/// Registers the user in the global UserRegistry (or returns the existing token
961/// if already registered) and writes the token response. No room association.
962async fn handle_global_join(
963    username: &str,
964    write_half: &mut tokio::net::unix::OwnedWriteHalf,
965    registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
966) -> anyhow::Result<()> {
967    use tokio::io::AsyncWriteExt;
968
969    let mut reg = registry.lock().await;
970
971    // If user already has a token, return it. Otherwise register and issue.
972    let token = if reg.has_token_for_user(username) {
973        // Find existing token via snapshot (reverse lookup: token→user).
974        reg.token_snapshot()
975            .into_iter()
976            .find(|(_, u)| u == username)
977            .map(|(t, _)| t)
978            .expect("has_token_for_user was true but no token found")
979    } else {
980        reg.register_user_idempotent(username)
981            .map_err(|e| anyhow::anyhow!("registration failed: {e}"))?;
982        reg.issue_token(username)
983            .map_err(|e| anyhow::anyhow!("token issuance failed: {e}"))?
984    };
985
986    let resp = serde_json::json!({
987        "type": "token",
988        "token": token,
989        "username": username
990    });
991    write_half.write_all(format!("{resp}\n").as_bytes()).await?;
992    Ok(())
993}
994
995/// Dispatch a raw UDS connection to the correct room based on the handshake.
996///
997/// Handles two top-level protocols:
998/// - `CREATE:<room_id>` — create a new room (reads config JSON from second line)
999/// - `ROOM:<room_id>:<rest>` — route to an existing room
1000async fn dispatch_connection(
1001    stream: tokio::net::UnixStream,
1002    rooms: &RoomMap,
1003    next_client_id: &Arc<AtomicU64>,
1004    daemon_config: &DaemonConfig,
1005    system_token_map: &TokenMap,
1006    user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
1007) -> anyhow::Result<()> {
1008    use tokio::io::{AsyncWriteExt, BufReader};
1009
1010    let (read_half, mut write_half) = stream.into_split();
1011    let mut reader = BufReader::new(read_half);
1012
1013    let mut first = String::new();
1014    super::read_line_limited(&mut reader, &mut first).await?;
1015    let first_line = first.trim();
1016
1017    if first_line.is_empty() {
1018        return Ok(());
1019    }
1020
1021    use super::handshake::{
1022        parse_client_handshake, parse_daemon_prefix, ClientHandshake, DaemonPrefix,
1023    };
1024    let (room_id, rest) = match parse_daemon_prefix(first_line) {
1025        DaemonPrefix::Destroy(room_id) => {
1026            return handle_destroy(&room_id, &mut reader, &mut write_half, rooms, user_registry)
1027                .await;
1028        }
1029        DaemonPrefix::Create(room_id) => {
1030            return handle_create(
1031                &room_id,
1032                &mut reader,
1033                &mut write_half,
1034                rooms,
1035                daemon_config,
1036                system_token_map,
1037                user_registry,
1038            )
1039            .await;
1040        }
1041        DaemonPrefix::Join(username) => {
1042            return handle_global_join(&username, &mut write_half, user_registry).await;
1043        }
1044        DaemonPrefix::Room { room_id, rest } => (room_id, rest),
1045        DaemonPrefix::Unknown => {
1046            let err = serde_json::json!({
1047                "type": "error",
1048                "code": "missing_room_prefix",
1049                "message": "daemon mode requires ROOM:<room_id>: or CREATE:<room_id> prefix"
1050            });
1051            write_half.write_all(format!("{err}\n").as_bytes()).await?;
1052            return Ok(());
1053        }
1054    };
1055
1056    // Look up the room.
1057    let state = {
1058        let map = rooms.lock().await;
1059        map.get(room_id.as_str()).cloned()
1060    };
1061
1062    let state = match state {
1063        Some(s) => s,
1064        None => {
1065            let err = serde_json::json!({
1066                "type": "error",
1067                "code": "room_not_found",
1068                "room": room_id
1069            });
1070            write_half.write_all(format!("{err}\n").as_bytes()).await?;
1071            return Ok(());
1072        }
1073    };
1074
1075    let cid = next_client_id.fetch_add(1, Ordering::SeqCst) + 1;
1076
1077    // Dispatch based on the per-room handshake after the ROOM: prefix.
1078    let username = match parse_client_handshake(&rest) {
1079        ClientHandshake::Send(u) => {
1080            eprintln!(
1081                "[broker/daemon] DEPRECATED: SEND:{u} handshake used — \
1082                 migrate to TOKEN:<uuid> (SEND: will be removed in a future version)"
1083            );
1084            return handle_oneshot_send(u, reader, write_half, &state).await;
1085        }
1086        ClientHandshake::Token(token) => {
1087            // Try room-level token map first, then fall back to global UserRegistry.
1088            let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1089                Some(u) => Some(u),
1090                None => {
1091                    let reg = user_registry.lock().await;
1092                    reg.validate_token(&token).map(|u| u.to_owned())
1093                }
1094            };
1095            return match resolved {
1096                Some(u) => handle_oneshot_send(u, reader, write_half, &state).await,
1097                None => {
1098                    let err = serde_json::json!({"type":"error","code":"invalid_token"});
1099                    write_half
1100                        .write_all(format!("{err}\n").as_bytes())
1101                        .await
1102                        .map_err(Into::into)
1103                }
1104            };
1105        }
1106        ClientHandshake::Join(u) => {
1107            let result = super::auth::handle_oneshot_join_with_registry(
1108                u,
1109                write_half,
1110                user_registry,
1111                &state.token_map,
1112                &state.subscription_map,
1113                state.config.as_ref(),
1114            )
1115            .await;
1116            // Persist auto-subscription from join so it survives broker restart.
1117            super::commands::persist_subscriptions(&state).await;
1118            return result;
1119        }
1120        ClientHandshake::Session(token) => {
1121            // Resolve username from token (room-level first, then UserRegistry).
1122            let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1123                Some(u) => Some(u),
1124                None => {
1125                    let reg = user_registry.lock().await;
1126                    reg.validate_token(&token).map(|u| u.to_owned())
1127                }
1128            };
1129            match resolved {
1130                Some(u) => u,
1131                None => {
1132                    let err = serde_json::json!({"type":"error","code":"invalid_token"});
1133                    write_half.write_all(format!("{err}\n").as_bytes()).await?;
1134                    return Ok(());
1135                }
1136            }
1137        }
1138        ClientHandshake::Interactive(u) => {
1139            eprintln!(
1140                "[broker/daemon] DEPRECATED: unauthenticated interactive join for '{u}' — \
1141                 migrate to SESSION:<token>"
1142            );
1143            u
1144        }
1145    };
1146
1147    // Interactive join (authenticated via SESSION: or deprecated plain username).
1148    if username.is_empty() {
1149        return Ok(());
1150    }
1151
1152    // Check join permission before entering interactive session.
1153    if let Err(reason) = super::auth::check_join_permission(&username, state.config.as_ref()) {
1154        let err = serde_json::json!({
1155            "type": "error",
1156            "code": "join_denied",
1157            "message": reason,
1158            "username": username
1159        });
1160        write_half.write_all(format!("{err}\n").as_bytes()).await?;
1161        return Ok(());
1162    }
1163
1164    // Register client in room, then hand off to the full interactive handler.
1165    let (tx, _) = broadcast::channel::<String>(256);
1166    state
1167        .clients
1168        .lock()
1169        .await
1170        .insert(cid, (String::new(), tx.clone()));
1171
1172    let result =
1173        super::run_interactive_session(cid, &username, reader, write_half, tx, &state).await;
1174
1175    state.clients.lock().await.remove(&cid);
1176    result
1177}
1178
1179// ── Tests ─────────────────────────────────────────────────────────────────────
1180
1181#[cfg(test)]
1182mod tests {
1183    use super::*;
1184
1185    // ── PID management ───────────────────────────────────────────────────
1186
1187    #[test]
1188    fn write_pid_file_creates_file_with_current_pid() {
1189        let dir = tempfile::TempDir::new().unwrap();
1190        let path = dir.path().join("test.pid");
1191        write_pid_file(&path).unwrap();
1192        let content = std::fs::read_to_string(&path).unwrap();
1193        let pid: u32 = content.trim().parse().expect("PID should be a number");
1194        assert_eq!(pid, std::process::id());
1195    }
1196
1197    #[test]
1198    fn is_pid_alive_true_for_current_process() {
1199        let dir = tempfile::TempDir::new().unwrap();
1200        let path = dir.path().join("test.pid");
1201        write_pid_file(&path).unwrap();
1202        assert!(is_pid_alive(&path), "current process should be alive");
1203    }
1204
1205    #[test]
1206    fn is_pid_alive_false_for_missing_file() {
1207        let path = std::path::Path::new("/tmp/nonexistent-room-test-99999999.pid");
1208        assert!(!is_pid_alive(path));
1209    }
1210
1211    #[test]
1212    fn remove_pid_file_deletes_file() {
1213        let dir = tempfile::TempDir::new().unwrap();
1214        let path = dir.path().join("remove.pid");
1215        write_pid_file(&path).unwrap();
1216        assert!(path.exists());
1217        remove_pid_file(&path);
1218        assert!(!path.exists());
1219    }
1220
1221    #[test]
1222    fn remove_pid_file_noop_when_missing() {
1223        // Should not panic if the file is already gone.
1224        let path = std::path::Path::new("/tmp/gone-99999999.pid");
1225        remove_pid_file(path); // must not panic
1226    }
1227
1228    // ── DaemonState lifecycle ─────────────────────────────────────────────
1229
1230    /// Test helper: look up a room's state by ID.
1231    async fn get_room(daemon: &DaemonState, room_id: &str) -> Arc<RoomState> {
1232        daemon
1233            .rooms
1234            .lock()
1235            .await
1236            .get(room_id)
1237            .cloned()
1238            .unwrap_or_else(|| panic!("room {room_id} not found"))
1239    }
1240
1241    #[tokio::test]
1242    async fn create_room_succeeds() {
1243        let daemon = DaemonState::new(DaemonConfig::default());
1244        assert!(daemon.create_room("test-room").await.is_ok());
1245        let state = get_room(&daemon, "test-room").await;
1246        assert_eq!(*state.room_id, "test-room");
1247    }
1248
1249    #[tokio::test]
1250    async fn create_duplicate_room_fails() {
1251        let daemon = DaemonState::new(DaemonConfig::default());
1252        daemon.create_room("dup").await.unwrap();
1253        let result = daemon.create_room("dup").await;
1254        assert!(result.is_err());
1255        assert!(result.unwrap_err().contains("already exists"));
1256    }
1257
1258    #[tokio::test]
1259    async fn has_room_returns_true_for_created() {
1260        let daemon = DaemonState::new(DaemonConfig::default());
1261        daemon.create_room("room-a").await.unwrap();
1262        assert!(daemon.has_room("room-a").await);
1263        assert!(!daemon.has_room("room-b").await);
1264    }
1265
1266    #[tokio::test]
1267    async fn destroy_room_removes_it() {
1268        let daemon = DaemonState::new(DaemonConfig::default());
1269        daemon.create_room("doomed").await.unwrap();
1270        assert!(daemon.destroy_room("doomed").await.is_ok());
1271        assert!(!daemon.has_room("doomed").await);
1272    }
1273
1274    #[tokio::test]
1275    async fn destroy_nonexistent_room_fails() {
1276        let daemon = DaemonState::new(DaemonConfig::default());
1277        let result = daemon.destroy_room("nope").await;
1278        assert!(result.is_err());
1279        assert!(result.unwrap_err().contains("not found"));
1280    }
1281
1282    #[tokio::test]
1283    async fn destroy_room_signals_shutdown() {
1284        let daemon = DaemonState::new(DaemonConfig::default());
1285        daemon.create_room("shutme").await.unwrap();
1286        let state = get_room(&daemon, "shutme").await;
1287        let rx = state.shutdown.subscribe();
1288        assert!(!*rx.borrow());
1289
1290        daemon.destroy_room("shutme").await.unwrap();
1291        // The shutdown signal should now be true.
1292        assert!(*rx.borrow());
1293    }
1294
1295    #[tokio::test]
1296    async fn list_rooms_returns_all() {
1297        let daemon = DaemonState::new(DaemonConfig::default());
1298        daemon.create_room("alpha").await.unwrap();
1299        daemon.create_room("beta").await.unwrap();
1300        daemon.create_room("gamma").await.unwrap();
1301
1302        let mut rooms = daemon.list_rooms().await;
1303        rooms.sort();
1304        assert_eq!(rooms, vec!["alpha", "beta", "gamma"]);
1305    }
1306
1307    #[tokio::test]
1308    async fn list_rooms_empty_initially() {
1309        let daemon = DaemonState::new(DaemonConfig::default());
1310        assert!(daemon.list_rooms().await.is_empty());
1311    }
1312
1313    #[tokio::test]
1314    async fn create_room_initializes_plugins() {
1315        let daemon = DaemonState::new(DaemonConfig::default());
1316        daemon.create_room("plugtest").await.unwrap();
1317        let state = get_room(&daemon, "plugtest").await;
1318        // help and stats should be registered
1319        assert!(state.plugin_registry.resolve("help").is_some());
1320        assert!(state.plugin_registry.resolve("stats").is_some());
1321    }
1322
1323    // ── DaemonConfig ──────────────────────────────────────────────────────
1324
1325    #[test]
1326    fn config_chat_path_format() {
1327        let config = DaemonConfig {
1328            data_dir: PathBuf::from("/var/room"),
1329            ..DaemonConfig::default()
1330        };
1331        assert_eq!(
1332            config.chat_path("myroom"),
1333            PathBuf::from("/var/room/myroom.chat")
1334        );
1335    }
1336
1337    #[test]
1338    fn config_default_socket_path() {
1339        let config = DaemonConfig::default();
1340        assert_eq!(config.socket_path, crate::paths::room_socket_path());
1341    }
1342
1343    // ── create_room_with_config ───────────────────────────────────────────
1344
1345    #[tokio::test]
1346    async fn create_room_with_dm_config() {
1347        let daemon = DaemonState::new(DaemonConfig::default());
1348        let config = room_protocol::RoomConfig::dm("alice", "bob");
1349        assert!(daemon
1350            .create_room_with_config("dm-alice-bob", config)
1351            .await
1352            .is_ok());
1353
1354        let state = get_room(&daemon, "dm-alice-bob").await;
1355        let cfg = state.config.as_ref().unwrap();
1356        assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1357        assert_eq!(cfg.max_members, Some(2));
1358        assert!(cfg.invite_list.contains("alice"));
1359        assert!(cfg.invite_list.contains("bob"));
1360    }
1361
1362    #[tokio::test]
1363    async fn create_room_with_config_duplicate_fails() {
1364        let daemon = DaemonState::new(DaemonConfig::default());
1365        let config = room_protocol::RoomConfig::public("owner");
1366        daemon
1367            .create_room_with_config("dup", config.clone())
1368            .await
1369            .unwrap();
1370        assert!(daemon.create_room_with_config("dup", config).await.is_err());
1371    }
1372
1373    #[tokio::test]
1374    async fn get_room_config_returns_none_for_unconfigured() {
1375        let daemon = DaemonState::new(DaemonConfig::default());
1376        daemon.create_room("plain").await.unwrap();
1377        assert!(daemon.get_room_config("plain").await.is_none());
1378    }
1379
1380    #[tokio::test]
1381    async fn get_room_config_returns_config_when_present() {
1382        let daemon = DaemonState::new(DaemonConfig::default());
1383        let config = room_protocol::RoomConfig::dm("alice", "bob");
1384        daemon
1385            .create_room_with_config("dm-alice-bob", config)
1386            .await
1387            .unwrap();
1388        let cfg = daemon.get_room_config("dm-alice-bob").await.unwrap();
1389        assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1390    }
1391
1392    #[tokio::test]
1393    async fn dm_room_id_deterministic_and_lookup_works() {
1394        let daemon = DaemonState::new(DaemonConfig::default());
1395        let room_id = room_protocol::dm_room_id("bob", "alice").unwrap();
1396        assert_eq!(room_id, "dm-alice-bob");
1397
1398        let config = room_protocol::RoomConfig::dm("bob", "alice");
1399        daemon
1400            .create_room_with_config(&room_id, config)
1401            .await
1402            .unwrap();
1403        assert!(daemon.has_room("dm-alice-bob").await);
1404        // Reverse order gives the same room_id
1405        assert_eq!(
1406            room_protocol::dm_room_id("alice", "bob").unwrap(),
1407            "dm-alice-bob"
1408        );
1409    }
1410
1411    // ── validate_room_id ──────────────────────────────────────────────────
1412
1413    #[test]
1414    fn valid_room_ids() {
1415        for id in [
1416            "lobby",
1417            "agent-room-2",
1418            "my_room",
1419            "Room.1",
1420            "dm-alice-bob",
1421            "a",
1422            &"x".repeat(MAX_ROOM_ID_LEN),
1423        ] {
1424            assert!(validate_room_id(id).is_ok(), "should accept: {id:?}");
1425        }
1426    }
1427
1428    #[test]
1429    fn empty_room_id_rejected() {
1430        let err = validate_room_id("").unwrap_err();
1431        assert!(err.contains("empty"), "{err}");
1432    }
1433
1434    #[test]
1435    fn room_id_too_long_rejected() {
1436        let long = "x".repeat(MAX_ROOM_ID_LEN + 1);
1437        let err = validate_room_id(&long).unwrap_err();
1438        assert!(err.contains("too long"), "{err}");
1439    }
1440
1441    #[test]
1442    fn dot_dot_traversal_rejected() {
1443        for id in ["..", "room/../etc", "..secret", "a..b"] {
1444            let err = validate_room_id(id).unwrap_err();
1445            assert!(err.contains(".."), "should reject {id:?}: {err}");
1446        }
1447    }
1448
1449    #[test]
1450    fn single_dot_rejected() {
1451        let err = validate_room_id(".").unwrap_err();
1452        assert!(err.contains(".."), "{err}");
1453    }
1454
1455    #[test]
1456    fn slash_rejected() {
1457        for id in ["room/sub", "/etc/passwd", "a/b/c"] {
1458            let err = validate_room_id(id).unwrap_err();
1459            assert!(err.contains("unsafe"), "should reject {id:?}: {err}");
1460        }
1461    }
1462
1463    #[test]
1464    fn backslash_rejected() {
1465        let err = validate_room_id("room\\sub").unwrap_err();
1466        assert!(err.contains("unsafe"), "{err}");
1467    }
1468
1469    #[test]
1470    fn null_byte_rejected() {
1471        let err = validate_room_id("room\0id").unwrap_err();
1472        assert!(err.contains("unsafe"), "{err}");
1473    }
1474
1475    #[test]
1476    fn whitespace_rejected() {
1477        for id in ["room name", "room\tid", "room\nid", " leading", "trailing "] {
1478            let err = validate_room_id(id).unwrap_err();
1479            assert!(err.contains("whitespace"), "should reject {id:?}: {err}");
1480        }
1481    }
1482
1483    #[test]
1484    fn other_unsafe_chars_rejected() {
1485        for ch in [':', '*', '?', '"', '<', '>', '|'] {
1486            let id = format!("room{ch}id");
1487            let err = validate_room_id(&id).unwrap_err();
1488            assert!(err.contains("unsafe"), "should reject {ch:?}: {err}");
1489        }
1490    }
1491
1492    #[tokio::test]
1493    async fn create_room_rejects_invalid_id() {
1494        let daemon = DaemonState::new(DaemonConfig::default());
1495        assert!(daemon.create_room("room/sub").await.is_err());
1496        assert!(daemon.create_room("..").await.is_err());
1497        assert!(daemon.create_room("").await.is_err());
1498        assert!(daemon.create_room("room name").await.is_err());
1499    }
1500
1501    #[tokio::test]
1502    async fn create_room_with_config_rejects_invalid_id() {
1503        let daemon = DaemonState::new(DaemonConfig::default());
1504        let config = room_protocol::RoomConfig::public("owner");
1505        assert!(daemon
1506            .create_room_with_config("../etc", config)
1507            .await
1508            .is_err());
1509    }
1510
1511    // ── DM auto-subscribe ─────────────────────────────────────────────────
1512
1513    #[tokio::test]
1514    async fn dm_room_auto_subscribes_both_participants() {
1515        let daemon = DaemonState::new(DaemonConfig::default());
1516        let config = room_protocol::RoomConfig::dm("alice", "bob");
1517        daemon
1518            .create_room_with_config("dm-alice-bob", config)
1519            .await
1520            .unwrap();
1521
1522        let state = get_room(&daemon, "dm-alice-bob").await;
1523        let subs = state.subscription_map.lock().await;
1524        assert_eq!(subs.len(), 2);
1525        assert_eq!(
1526            subs.get("alice"),
1527            Some(&room_protocol::SubscriptionTier::Full)
1528        );
1529        assert_eq!(
1530            subs.get("bob"),
1531            Some(&room_protocol::SubscriptionTier::Full)
1532        );
1533    }
1534
1535    #[tokio::test]
1536    async fn public_room_starts_with_no_subscriptions() {
1537        let daemon = DaemonState::new(DaemonConfig::default());
1538        let config = room_protocol::RoomConfig::public("owner");
1539        daemon
1540            .create_room_with_config("lobby", config)
1541            .await
1542            .unwrap();
1543
1544        let state = get_room(&daemon, "lobby").await;
1545        let subs = state.subscription_map.lock().await;
1546        assert!(subs.is_empty());
1547    }
1548
1549    #[tokio::test]
1550    async fn unconfigured_room_starts_with_no_subscriptions() {
1551        let daemon = DaemonState::new(DaemonConfig::default());
1552        daemon.create_room("plain").await.unwrap();
1553
1554        let state = get_room(&daemon, "plain").await;
1555        let subs = state.subscription_map.lock().await;
1556        assert!(subs.is_empty());
1557    }
1558
1559    #[tokio::test]
1560    async fn dm_auto_subscribe_uses_full_tier() {
1561        let daemon = DaemonState::new(DaemonConfig::default());
1562        let config = room_protocol::RoomConfig::dm("carol", "dave");
1563        daemon
1564            .create_room_with_config("dm-carol-dave", config)
1565            .await
1566            .unwrap();
1567
1568        let state = get_room(&daemon, "dm-carol-dave").await;
1569        let subs = state.subscription_map.lock().await;
1570        // Verify it's Full, not MentionsOnly
1571        for (_, tier) in subs.iter() {
1572            assert_eq!(*tier, room_protocol::SubscriptionTier::Full);
1573        }
1574    }
1575
1576    #[test]
1577    fn build_initial_subscriptions_dm_populates() {
1578        let config = room_protocol::RoomConfig::dm("alice", "bob");
1579        let subs = build_initial_subscriptions(&config);
1580        assert_eq!(subs.len(), 2);
1581        assert_eq!(subs["alice"], room_protocol::SubscriptionTier::Full);
1582        assert_eq!(subs["bob"], room_protocol::SubscriptionTier::Full);
1583    }
1584
1585    #[test]
1586    fn build_initial_subscriptions_public_empty() {
1587        let config = room_protocol::RoomConfig::public("owner");
1588        let subs = build_initial_subscriptions(&config);
1589        assert!(subs.is_empty());
1590    }
1591
1592    // ── DaemonConfig grace_period_secs ────────────────────────────────────
1593
1594    #[test]
1595    fn default_grace_period_is_30() {
1596        let config = DaemonConfig::default();
1597        assert_eq!(config.grace_period_secs, 30);
1598    }
1599
1600    #[test]
1601    fn custom_grace_period_preserved() {
1602        let config = DaemonConfig {
1603            grace_period_secs: 0,
1604            ..DaemonConfig::default()
1605        };
1606        assert_eq!(config.grace_period_secs, 0);
1607    }
1608
1609    // ── connection_count refcount ─────────────────────────────────────────
1610
1611    #[tokio::test]
1612    async fn connection_count_starts_at_zero() {
1613        let daemon = DaemonState::new(DaemonConfig::default());
1614        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1615    }
1616
1617    #[tokio::test]
1618    async fn connection_count_increments_and_decrements() {
1619        let count = Arc::new(AtomicUsize::new(0));
1620        count.fetch_add(1, Ordering::SeqCst);
1621        count.fetch_add(1, Ordering::SeqCst);
1622        assert_eq!(count.load(Ordering::SeqCst), 2);
1623        count.fetch_sub(1, Ordering::SeqCst);
1624        assert_eq!(count.load(Ordering::SeqCst), 1);
1625        count.fetch_sub(1, Ordering::SeqCst);
1626        assert_eq!(count.load(Ordering::SeqCst), 0);
1627    }
1628
1629    /// Verify that the daemon exits cleanly when the shutdown signal is sent.
1630    /// Uses an Arc<DaemonState> so the run() task can hold a reference while
1631    /// the test also holds one to send the shutdown signal.
1632    #[tokio::test]
1633    async fn daemon_exits_on_shutdown_signal() {
1634        let dir = tempfile::TempDir::new().unwrap();
1635        let socket = dir.path().join("test-grace.sock");
1636        std::fs::create_dir_all(dir.path().join("data")).unwrap();
1637        std::fs::create_dir_all(dir.path().join("state")).unwrap();
1638
1639        let config = DaemonConfig {
1640            socket_path: socket.clone(),
1641            data_dir: dir.path().join("data"),
1642            state_dir: dir.path().join("state"),
1643            ws_port: None,
1644            grace_period_secs: 0,
1645        };
1646        let daemon = Arc::new(DaemonState::new(config));
1647        let shutdown = daemon.shutdown_handle();
1648
1649        let daemon2 = Arc::clone(&daemon);
1650        let handle = tokio::spawn(async move { daemon2.run().await });
1651
1652        // Wait for socket to become connectable (daemon is up).
1653        for _ in 0..100 {
1654            if tokio::net::UnixStream::connect(&socket).await.is_ok() {
1655                break;
1656            }
1657            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1658        }
1659        assert!(
1660            tokio::net::UnixStream::connect(&socket).await.is_ok(),
1661            "daemon socket not ready"
1662        );
1663
1664        // Send shutdown — daemon should exit quickly.
1665        let _ = shutdown.send(true);
1666        let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await;
1667        assert!(result.is_ok(), "daemon did not exit within 5s");
1668        assert!(result.unwrap().unwrap().is_ok(), "run() returned error");
1669    }
1670
1671    /// Verify that a new connection during the grace period resets the timer.
1672    /// We check this by confirming connection_count goes from 0 → 1 → 0 without
1673    /// a premature shutdown.
1674    #[tokio::test]
1675    async fn grace_period_cancelled_by_new_connection() {
1676        let dir = tempfile::TempDir::new().unwrap();
1677        let socket = dir.path().join("test-cancel-grace.sock");
1678
1679        let config = DaemonConfig {
1680            socket_path: socket.clone(),
1681            data_dir: dir.path().join("data"),
1682            state_dir: dir.path().join("state"),
1683            ws_port: None,
1684            grace_period_secs: 60, // long grace — should not fire
1685        };
1686        let daemon = DaemonState::new(config);
1687
1688        // Manually exercise the counter: simulate connect + disconnect.
1689        daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1690        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1691        daemon.connection_count.fetch_sub(1, Ordering::SeqCst);
1692        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1693
1694        // Simulate a second connection arriving (cancels grace timer).
1695        daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1696        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1697
1698        // Daemon has not shut down.
1699        assert!(!*daemon.shutdown.borrow());
1700    }
1701
1702    // ── migrate_legacy_tmpdir_tokens ──────────────────────────────────────
1703
1704    /// Write a token file to `dir` in the format written by old `room join`.
1705    fn write_legacy_token(dir: &std::path::Path, room_id: &str, username: &str, token: &str) {
1706        let name = format!("room-{room_id}-{username}.token");
1707        let data = serde_json::json!({"username": username, "token": token});
1708        std::fs::write(dir.join(name), format!("{data}\n")).unwrap();
1709    }
1710
1711    #[test]
1712    fn migrate_legacy_tmpdir_tokens_imports_token() {
1713        let token_dir = tempfile::TempDir::new().unwrap();
1714        let state_dir = tempfile::TempDir::new().unwrap();
1715        write_legacy_token(token_dir.path(), "lobby", "alice", "legacy-uuid-alice");
1716
1717        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1718
1719        // Override the legacy dir by temporarily pointing TMPDIR at token_dir.
1720        // Because legacy_token_dir() reads env on macOS, we run the function
1721        // directly on the directory to avoid touching the process environment.
1722        // Instead we call the inner logic directly with a helper that accepts
1723        // a custom dir.
1724        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1725
1726        assert_eq!(registry.validate_token("legacy-uuid-alice"), Some("alice"));
1727        assert!(registry.get_user("alice").is_some());
1728    }
1729
1730    #[test]
1731    fn migrate_legacy_tmpdir_tokens_idempotent() {
1732        let token_dir = tempfile::TempDir::new().unwrap();
1733        let state_dir = tempfile::TempDir::new().unwrap();
1734        write_legacy_token(token_dir.path(), "lobby", "bob", "tok-bob");
1735
1736        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1737        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1738        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1739
1740        // Token still valid and exactly one entry for bob.
1741        assert_eq!(registry.validate_token("tok-bob"), Some("bob"));
1742        let snap = registry.token_snapshot();
1743        assert_eq!(snap.values().filter(|u| u.as_str() == "bob").count(), 1);
1744    }
1745
1746    #[test]
1747    fn migrate_legacy_tmpdir_tokens_skips_non_token_files() {
1748        let token_dir = tempfile::TempDir::new().unwrap();
1749        let state_dir = tempfile::TempDir::new().unwrap();
1750        std::fs::write(token_dir.path().join("roomd.sock"), "not a token").unwrap();
1751        std::fs::write(token_dir.path().join("something.json"), "{}").unwrap();
1752
1753        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1754        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1755
1756        assert!(registry.list_users().is_empty());
1757    }
1758
1759    #[test]
1760    fn migrate_legacy_tmpdir_tokens_skips_malformed_json() {
1761        let token_dir = tempfile::TempDir::new().unwrap();
1762        let state_dir = tempfile::TempDir::new().unwrap();
1763        std::fs::write(token_dir.path().join("room-x-bad.token"), "not-json{{{").unwrap();
1764
1765        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1766        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1767
1768        assert!(registry.list_users().is_empty());
1769    }
1770}