Skip to main content

room_cli/broker/
daemon.rs

1//! Multi-room daemon: manages N rooms in a single process.
2//!
3//! `DaemonState` wraps a map of room_id → `RoomState` and provides room
4//! lifecycle (create/destroy/get). The daemon listens on a single UDS
5//! socket at a configurable path and dispatches connections to the correct
6//! room based on an extended handshake protocol.
7//!
8//! ## Handshake protocol
9//!
10//! The first line of a UDS connection to the daemon can carry one of two
11//! prefixes:
12//!
13//! - `ROOM:<room_id>:<rest>` — route to an existing room. The rest of the
14//!   line is the standard per-room handshake (`SEND:`, `TOKEN:`, `JOIN:`,
15//!   or plain username).
16//! - `CREATE:<room_id>` — create a new room. A second line carries the
17//!   room configuration as JSON (`{"visibility":"public","invite":[]}`).
18//! - `DESTROY:<room_id>` — destroy a room. Signals shutdown to connected
19//!   clients and removes the room from the daemon's map.
20//!
21//! If no recognised prefix is present, the connection is rejected with an error.
22//!
23//! Examples:
24//! ```text
25//! ROOM:myroom:JOIN:alice       → join room "myroom" as "alice"
26//! ROOM:myroom:TOKEN:<uuid>     → authenticated send to "myroom"
27//! ROOM:myroom:SEND:bob         → legacy unauthenticated send to "myroom"
28//! ROOM:myroom:alice            → interactive join to "myroom" as "alice"
29//! CREATE:newroom               → create room "newroom" (config on next line)
30//! DESTROY:myroom               → destroy room "myroom"
31//! ```
32
33use std::{
34    collections::HashMap,
35    path::PathBuf,
36    sync::{
37        atomic::{AtomicU64, AtomicUsize, Ordering},
38        Arc,
39    },
40};
41
42use tokio::{
43    net::UnixListener,
44    sync::{broadcast, watch, Mutex},
45};
46
47use crate::registry::UserRegistry;
48
49use super::{
50    handle_oneshot_send,
51    state::{RoomState, TokenMap},
52    ws::{self, DaemonWsState},
53};
54
55/// Characters that are unsafe in filesystem paths or shell contexts.
56const UNSAFE_CHARS: &[char] = &['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\0'];
57
58// ── PID file management ───────────────────────────────────────────────────────
59
60/// Write the current process's PID to `path` (creates or overwrites).
61pub fn write_pid_file(path: &std::path::Path) -> std::io::Result<()> {
62    std::fs::write(path, std::process::id().to_string())
63}
64
65/// Returns `true` if the PID recorded in `path` belongs to a running process.
66///
67/// Returns `false` when the file is missing, unreadable, or unparseable, and
68/// when the process is confirmed dead (ESRCH).
69pub fn is_pid_alive(path: &std::path::Path) -> bool {
70    let Ok(contents) = std::fs::read_to_string(path) else {
71        return false;
72    };
73    let Ok(pid) = contents.trim().parse::<u32>() else {
74        return false;
75    };
76    pid_alive(pid)
77}
78
79/// Remove the PID file, ignoring errors (best-effort cleanup).
80pub fn remove_pid_file(path: &std::path::Path) {
81    let _ = std::fs::remove_file(path);
82}
83
84/// Check whether a process with the given PID is currently running.
85///
86/// Uses POSIX `kill(pid, 0)` — signal 0 never delivers a signal but the kernel
87/// validates whether the calling process may signal `pid`, returning:
88/// - `0`  → process exists
89/// - `-1` with `EPERM` (errno 1)  → process exists, permission denied
90/// - `-1` with `ESRCH` (errno 3)  → no such process
91#[cfg(unix)]
92fn pid_alive(pid: u32) -> bool {
93    extern "C" {
94        fn kill(pid: i32, sig: i32) -> i32;
95    }
96    // SAFETY: kill(pid, 0) never delivers a signal; it only checks liveness.
97    let ret = unsafe { kill(pid as i32, 0) };
98    if ret == 0 {
99        return true;
100    }
101    // EPERM == 1 on Linux and macOS: process exists but we lack permission.
102    std::io::Error::last_os_error().raw_os_error() == Some(1)
103}
104
105#[cfg(not(unix))]
106fn pid_alive(_pid: u32) -> bool {
107    // Conservative: assume the process is alive on non-Unix platforms.
108    true
109}
110
111/// Maximum allowed length for a room ID.
112const MAX_ROOM_ID_LEN: usize = 64;
113
114/// Validate a room ID for filesystem safety.
115///
116/// Rejects IDs that are empty, too long, contain path traversal sequences
117/// (`..`), whitespace, or filesystem-unsafe characters.
118pub fn validate_room_id(room_id: &str) -> Result<(), String> {
119    if room_id.is_empty() {
120        return Err("room ID cannot be empty".into());
121    }
122    if room_id.len() > MAX_ROOM_ID_LEN {
123        return Err(format!(
124            "room ID too long ({} chars, max {MAX_ROOM_ID_LEN})",
125            room_id.len()
126        ));
127    }
128    if room_id == "." || room_id == ".." || room_id.contains("..") {
129        return Err("room ID cannot contain '..'".into());
130    }
131    if room_id.chars().any(|c| c.is_whitespace()) {
132        return Err("room ID cannot contain whitespace".into());
133    }
134    if let Some(bad) = room_id.chars().find(|c| UNSAFE_CHARS.contains(c)) {
135        return Err(format!("room ID contains unsafe character: {bad:?}"));
136    }
137    Ok(())
138}
139
140/// Configuration for the daemon.
141#[derive(Debug, Clone)]
142pub struct DaemonConfig {
143    /// Path to the daemon UDS socket (ephemeral, platform-native temp dir).
144    pub socket_path: PathBuf,
145    /// Directory for chat files. Each room gets `<data_dir>/<room_id>.chat`.
146    /// Defaults to `~/.room/data/`; overridable with `--data-dir`.
147    pub data_dir: PathBuf,
148    /// Directory for state files (token maps, cursors, subscriptions).
149    /// Defaults to `~/.room/state/`.
150    pub state_dir: PathBuf,
151    /// Optional WebSocket/REST port.
152    pub ws_port: Option<u16>,
153    /// Seconds to wait after the last connection closes before shutting down.
154    ///
155    /// Default is 30 seconds. Set to 0 for immediate shutdown when the last
156    /// client disconnects. Has no effect if there are always active connections.
157    pub grace_period_secs: u64,
158}
159
160impl DaemonConfig {
161    /// Resolve the chat file path for a given room.
162    pub fn chat_path(&self, room_id: &str) -> PathBuf {
163        self.data_dir.join(format!("{room_id}.chat"))
164    }
165
166    /// Resolve the token-map persistence path for a given room.
167    pub fn token_map_path(&self, room_id: &str) -> PathBuf {
168        crate::paths::broker_tokens_path(&self.state_dir, room_id)
169    }
170
171    /// System-level token persistence path: `<state_dir>/tokens.json`.
172    ///
173    /// Used by the daemon to share a single token store across all rooms.
174    /// Production default is `~/.room/state/tokens.json`; tests override
175    /// `state_dir` with a temp directory.
176    pub fn system_tokens_path(&self) -> PathBuf {
177        self.state_dir.join("tokens.json")
178    }
179
180    /// Resolve the subscription-map persistence path for a given room.
181    pub fn subscription_map_path(&self, room_id: &str) -> PathBuf {
182        crate::paths::broker_subscriptions_path(&self.state_dir, room_id)
183    }
184
185    /// Resolve the event-filter-map persistence path for a given room.
186    pub fn event_filter_map_path(&self, room_id: &str) -> PathBuf {
187        crate::paths::broker_event_filters_path(&self.state_dir, room_id)
188    }
189}
190
191impl Default for DaemonConfig {
192    fn default() -> Self {
193        Self {
194            socket_path: crate::paths::room_socket_path(),
195            data_dir: crate::paths::room_data_dir(),
196            state_dir: crate::paths::room_state_dir(),
197            ws_port: None,
198            grace_period_secs: 30,
199        }
200    }
201}
202
203/// Registry of active rooms, keyed by room_id.
204pub(crate) type RoomMap = Arc<Mutex<HashMap<String, Arc<RoomState>>>>;
205
206/// Multi-room daemon state.
207pub struct DaemonState {
208    pub(crate) rooms: RoomMap,
209    pub(crate) config: DaemonConfig,
210    /// Global client ID counter shared across all rooms.
211    pub(crate) next_client_id: Arc<AtomicU64>,
212    /// Daemon-level shutdown signal.
213    pub(crate) shutdown: Arc<watch::Sender<bool>>,
214    /// System-level token map shared across all rooms (runtime cache).
215    ///
216    /// A single `Arc<Mutex<HashMap>>` instance is cloned into every room's
217    /// `token_map`. Tokens issued in any room are valid in all rooms managed
218    /// by this daemon. Seeded from `user_registry` on startup; kept in sync
219    /// by [`super::auth::issue_token_via_registry`].
220    pub(crate) system_token_map: TokenMap,
221    /// Daemon-level user registry — sole persistence layer for cross-room identity.
222    ///
223    /// Stores user profiles, room memberships, and tokens to
224    /// `~/.room/state/users.json`. New sessions register/update here;
225    /// `system_token_map` is derived from this registry at startup and kept
226    /// in sync on every join.
227    pub(crate) user_registry: Arc<tokio::sync::Mutex<UserRegistry>>,
228    /// Number of currently active UDS connections.
229    ///
230    /// Incremented when a connection is accepted; decremented when the
231    /// connection task completes. When the count drops to zero the daemon
232    /// starts a grace period timer before sending the shutdown signal.
233    pub(crate) connection_count: Arc<AtomicUsize>,
234}
235
236impl DaemonState {
237    /// Create a new daemon with the given configuration and no rooms.
238    pub fn new(config: DaemonConfig) -> Self {
239        let (shutdown_tx, _) = watch::channel(false);
240
241        // Load UserRegistry from disk (sole source of truth for identity).
242        //
243        // Migration path: if `users.json` (UserRegistry) does not exist but
244        // the legacy `tokens.json` (system_token_map from #334) does, import
245        // the flat token map into a fresh registry so existing sessions survive
246        // the upgrade without requiring a forced re-join.
247        let registry = load_or_migrate_registry(&config);
248
249        // Seed the runtime token map from the registry so existing tokens remain
250        // valid across daemon restarts without requiring a fresh join.
251        let token_snapshot = registry.token_snapshot();
252
253        Self {
254            rooms: Arc::new(Mutex::new(HashMap::new())),
255            config,
256            next_client_id: Arc::new(AtomicU64::new(0)),
257            shutdown: Arc::new(shutdown_tx),
258            system_token_map: Arc::new(Mutex::new(token_snapshot)),
259            user_registry: Arc::new(tokio::sync::Mutex::new(registry)),
260            connection_count: Arc::new(AtomicUsize::new(0)),
261        }
262    }
263
264    /// Create a room and register it. Returns `Err` if the room ID is invalid
265    /// or the room already exists.
266    pub async fn create_room(&self, room_id: &str) -> Result<(), String> {
267        create_room_entry(
268            room_id,
269            None,
270            &self.rooms,
271            &self.config,
272            &self.system_token_map,
273            Some(self.user_registry.clone()),
274        )
275        .await
276    }
277
278    /// Create a room with explicit configuration. Returns `Err` if the room ID
279    /// is invalid or the room already exists.
280    pub async fn create_room_with_config(
281        &self,
282        room_id: &str,
283        config: room_protocol::RoomConfig,
284    ) -> Result<(), String> {
285        create_room_entry(
286            room_id,
287            Some(config),
288            &self.rooms,
289            &self.config,
290            &self.system_token_map,
291            Some(self.user_registry.clone()),
292        )
293        .await
294    }
295
296    /// Get a room's config, if it exists.
297    pub async fn get_room_config(&self, room_id: &str) -> Option<room_protocol::RoomConfig> {
298        self.rooms
299            .lock()
300            .await
301            .get(room_id)
302            .and_then(|s| s.config.clone())
303    }
304
305    /// Destroy a room. Returns `Err` if the room does not exist.
306    ///
307    /// Signals the room's shutdown so connected clients receive EOF.
308    pub async fn destroy_room(&self, room_id: &str) -> Result<(), String> {
309        let mut rooms = self.rooms.lock().await;
310        let state = rooms
311            .remove(room_id)
312            .ok_or_else(|| format!("room not found: {room_id}"))?;
313
314        // Signal the room's shutdown so any connected clients receive EOF.
315        let _ = state.shutdown.send(true);
316        Ok(())
317    }
318
319    /// Check if a room exists.
320    pub async fn has_room(&self, room_id: &str) -> bool {
321        self.rooms.lock().await.contains_key(room_id)
322    }
323
324    /// Get a handle to the daemon-level shutdown sender.
325    pub fn shutdown_handle(&self) -> Arc<watch::Sender<bool>> {
326        self.shutdown.clone()
327    }
328
329    /// List all active room IDs.
330    pub async fn list_rooms(&self) -> Vec<String> {
331        self.rooms.lock().await.keys().cloned().collect()
332    }
333
334    /// Insert a token directly into a room's token map, bypassing the join
335    /// permission check. Intended for integration tests only.
336    #[doc(hidden)]
337    pub async fn test_inject_token(
338        &self,
339        room_id: &str,
340        username: &str,
341        token: &str,
342    ) -> Result<(), String> {
343        let rooms = self.rooms.lock().await;
344        let room = rooms
345            .get(room_id)
346            .ok_or_else(|| format!("room not found: {room_id}"))?;
347        room.token_map
348            .lock()
349            .await
350            .insert(token.to_owned(), username.to_owned());
351        Ok(())
352    }
353
354    /// Run the daemon: listen on UDS, dispatch connections to rooms.
355    ///
356    /// When the last UDS connection closes, starts a grace period timer
357    /// (`config.grace_period_secs`). If no new connection arrives before the
358    /// timer fires, sends a shutdown signal. Any new connection during the
359    /// grace period cancels the timer. On exit, cleans up the PID file and
360    /// socket file.
361    pub async fn run(&self) -> anyhow::Result<()> {
362        // Write PID file only for the default daemon socket.  Daemons with an
363        // explicit socket override (tests, CI) are independent instances and
364        // must not clobber the system PID file.
365        let pid_path = if self.config.socket_path == crate::paths::room_socket_path() {
366            match write_pid_file(&crate::paths::room_pid_path()) {
367                Ok(()) => Some(crate::paths::room_pid_path()),
368                Err(e) => {
369                    eprintln!("[daemon] failed to write PID file: {e}");
370                    None
371                }
372            }
373        } else {
374            None
375        };
376
377        // Remove stale socket synchronously.
378        if self.config.socket_path.exists() {
379            std::fs::remove_file(&self.config.socket_path)?;
380        }
381
382        let listener = UnixListener::bind(&self.config.socket_path)?;
383        eprintln!(
384            "[daemon] listening on {}",
385            self.config.socket_path.display()
386        );
387
388        let mut shutdown_rx = self.shutdown.subscribe();
389        let grace_duration = tokio::time::Duration::from_secs(self.config.grace_period_secs);
390
391        // mpsc channel: connection tasks notify the main loop when they close.
392        let (close_tx, mut close_rx) = tokio::sync::mpsc::channel::<()>(64);
393
394        // Optional grace period sleep — active when the last connection closes.
395        let mut grace_sleep: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
396
397        // Start WebSocket/REST server if configured.
398        if let Some(port) = self.config.ws_port {
399            let ws_state = DaemonWsState {
400                rooms: self.rooms.clone(),
401                next_client_id: self.next_client_id.clone(),
402                config: self.config.clone(),
403                system_token_map: self.system_token_map.clone(),
404                user_registry: self.user_registry.clone(),
405            };
406            let app = ws::create_daemon_router(ws_state);
407            let tcp = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
408            eprintln!("[daemon] WebSocket/REST listening on port {port}");
409            tokio::spawn(async move {
410                if let Err(e) = axum::serve(tcp, app).await {
411                    eprintln!("[daemon] WS server error: {e}");
412                }
413            });
414        }
415
416        let result = loop {
417            // Build the grace future: fires if a grace sleep is active,
418            // otherwise stays pending forever.
419            let grace_fut = async {
420                match grace_sleep.as_mut() {
421                    Some(s) => {
422                        s.await;
423                    }
424                    None => std::future::pending::<()>().await,
425                }
426            };
427
428            tokio::select! {
429                accept = listener.accept() => {
430                    let (stream, _) = match accept {
431                        Ok(a) => a,
432                        Err(e) => break Err(e.into()),
433                    };
434                    // Cancel any pending grace timer — we have a new connection.
435                    grace_sleep = None;
436
437                    let count = self.connection_count.clone();
438                    count.fetch_add(1, Ordering::SeqCst);
439                    let rooms = self.rooms.clone();
440                    let next_id = self.next_client_id.clone();
441                    let cfg = self.config.clone();
442                    let sys_tokens = self.system_token_map.clone();
443                    let registry = self.user_registry.clone();
444                    let tx = close_tx.clone();
445
446                    tokio::spawn(async move {
447                        if let Err(e) = dispatch_connection(stream, &rooms, &next_id, &cfg, &sys_tokens, &registry).await {
448                            eprintln!("[daemon] connection error: {e:#}");
449                        }
450                        count.fetch_sub(1, Ordering::SeqCst);
451                        // Notify main loop so it can start the grace timer.
452                        let _ = tx.send(()).await;
453                    });
454                }
455                Some(()) = close_rx.recv() => {
456                    // A connection closed. Start grace period if none remain.
457                    if self.connection_count.load(Ordering::SeqCst) == 0 {
458                        eprintln!(
459                            "[daemon] no connections — grace period {}s started",
460                            self.config.grace_period_secs
461                        );
462                        grace_sleep =
463                            Some(Box::pin(tokio::time::sleep(grace_duration)));
464                    }
465                }
466                _ = grace_fut => {
467                    eprintln!("[daemon] grace period expired, shutting down");
468                    let _ = self.shutdown.send(true);
469                    // The shutdown_rx arm will fire on the next iteration; break
470                    // here directly to avoid a double-exit path.
471                    break Ok(());
472                }
473                _ = shutdown_rx.changed() => {
474                    eprintln!("[daemon] shutdown requested, exiting");
475                    if let Some(ref p) = pid_path {
476                        remove_pid_file(p);
477                    }
478                    break Ok(());
479                }
480            }
481        };
482
483        // Clean up ephemeral files on exit.
484        let _ = std::fs::remove_file(&self.config.socket_path);
485        let _ = std::fs::remove_file(crate::paths::room_pid_path());
486        // Remove per-room meta files written during room creation.
487        for room_id in self.list_rooms().await {
488            let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
489        }
490
491        result
492    }
493}
494
495/// Load `UserRegistry` from `users.json`, or migrate from the legacy
496/// `tokens.json` (written by the #334 system-token-map implementation)
497/// if `users.json` does not yet exist.
498///
499/// After loading (or creating) the registry, always scans the legacy runtime
500/// directory for per-room `.token` files left by older `room join` invocations
501/// and imports any that are not already present. This lets clients that joined
502/// before the `~/.room/state/` migration continue to use their existing tokens
503/// without a forced re-join.
504fn load_or_migrate_registry(config: &DaemonConfig) -> UserRegistry {
505    let users_path = config.state_dir.join("users.json");
506
507    let mut registry = if users_path.exists() {
508        // Fast path: users.json exists — use it directly.
509        UserRegistry::load(config.state_dir.clone()).unwrap_or_else(|e| {
510            eprintln!("[daemon] failed to load user registry: {e}; starting empty");
511            UserRegistry::new(config.state_dir.clone())
512        })
513    } else {
514        // Migration path: import from legacy tokens.json if present.
515        let tokens_path = config.system_tokens_path();
516        if tokens_path.exists() {
517            let legacy = super::auth::load_token_map(&tokens_path);
518            if !legacy.is_empty() {
519                eprintln!(
520                    "[daemon] migrating {} token(s) from tokens.json to users.json",
521                    legacy.len()
522                );
523                let mut reg = UserRegistry::new(config.state_dir.clone());
524                for (token, username) in &legacy {
525                    // register_user_idempotent is a no-op if already present.
526                    if let Err(e) = reg.register_user_idempotent(username) {
527                        eprintln!("[daemon] migration: register {username}: {e}");
528                        continue;
529                    }
530                    // Re-insert the existing token directly via issue_token so the
531                    // UUID is preserved. Since UserRegistry.issue_token generates a
532                    // new UUID, we instead manipulate the token map via the public
533                    // API by revoking nothing and accepting the registry's new token.
534                    // Trade-off: legacy UUIDs are replaced; clients must re-join.
535                    // This is acceptable — migration is a one-time event.
536                    let _ = reg.issue_token(username);
537                    let _ = token; // legacy token not preserved — clients must re-join
538                }
539                if let Err(e) = reg.save() {
540                    eprintln!("[daemon] migration save failed: {e}");
541                }
542                reg
543            } else {
544                // tokens.json exists but is empty — start fresh.
545                UserRegistry::new(config.state_dir.clone())
546            }
547        } else {
548            // Neither file exists — start fresh.
549            UserRegistry::new(config.state_dir.clone())
550        }
551    };
552
553    // Always scan the legacy runtime dir for old per-room token files and
554    // import any that are not already in the registry. Idempotent — safe to
555    // run on every startup.
556    migrate_legacy_tmpdir_tokens(&mut registry);
557
558    registry
559}
560
561/// Scan the legacy runtime directory for per-room token files and import
562/// them into `registry`.
563///
564/// Before `~/.room/state/` was introduced, `room join` wrote token files to
565/// the platform runtime directory (`$TMPDIR` on macOS, `/tmp/` on Linux)
566/// as `room-<room_id>-<username>.token`. This function reads each such file,
567/// parses the `username` and `token` fields, and imports them — preserving
568/// the UUID so existing clients do not need to re-join. Files whose tokens
569/// are already in the registry are silently skipped (idempotent).
570fn migrate_legacy_tmpdir_tokens(registry: &mut UserRegistry) {
571    let legacy_dir = crate::paths::legacy_token_dir();
572    migrate_legacy_tmpdir_tokens_from(&legacy_dir, registry);
573}
574
575/// Inner implementation of [`migrate_legacy_tmpdir_tokens`] that accepts an
576/// explicit directory. Extracted so tests can pass a temp directory without
577/// modifying process environment variables.
578fn migrate_legacy_tmpdir_tokens_from(legacy_dir: &std::path::Path, registry: &mut UserRegistry) {
579    let entries = match std::fs::read_dir(legacy_dir) {
580        Ok(e) => e,
581        Err(_) => return,
582    };
583    let mut count = 0usize;
584    for entry in entries.filter_map(|e| e.ok()) {
585        let path = entry.path();
586        let name = match path.file_name().and_then(|n| n.to_str()) {
587            Some(n) => n.to_owned(),
588            None => continue,
589        };
590        if !name.starts_with("room-") || !name.ends_with(".token") {
591            continue;
592        }
593        let data = match std::fs::read_to_string(&path) {
594            Ok(d) => d,
595            Err(_) => continue,
596        };
597        let v: serde_json::Value = match serde_json::from_str(data.trim()) {
598            Ok(v) => v,
599            Err(_) => continue,
600        };
601        let (username, token) = match (v["username"].as_str(), v["token"].as_str()) {
602            (Some(u), Some(t)) if !u.is_empty() && !t.is_empty() => (u.to_owned(), t.to_owned()),
603            _ => continue,
604        };
605        if let Err(e) = registry.register_user_idempotent(&username) {
606            eprintln!("[daemon] legacy token migration: register {username}: {e}");
607            continue;
608        }
609        match registry.import_token(&username, &token) {
610            Ok(()) => count += 1,
611            Err(e) => {
612                eprintln!("[daemon] legacy token migration: import token for {username}: {e}")
613            }
614        }
615    }
616    if count > 0 {
617        eprintln!(
618            "[daemon] imported {count} legacy token(s) from {}",
619            legacy_dir.display()
620        );
621    }
622}
623
624/// Build the initial subscription map for a room based on its config.
625///
626/// DM rooms auto-subscribe both participants at `Full` so they receive all
627/// messages without an explicit `/subscribe` call. Other room types start
628/// with an empty subscription map (users subscribe explicitly or via
629/// auto-subscribe-on-mention).
630fn build_initial_subscriptions(
631    config: &room_protocol::RoomConfig,
632) -> HashMap<String, room_protocol::SubscriptionTier> {
633    let mut subs = HashMap::new();
634    if config.visibility == room_protocol::RoomVisibility::Dm {
635        for user in &config.invite_list {
636            subs.insert(user.clone(), room_protocol::SubscriptionTier::Full);
637        }
638    }
639    subs
640}
641
642/// Core room-creation logic shared by UDS and REST paths.
643///
644/// Validates the room ID, checks for duplicates, builds a [`RoomState`], and
645/// inserts it into the room map. Pass `config: None` to create a configless
646/// room (no invite list, no visibility constraint).
647///
648/// `registry` is attached to the [`RoomState`] via [`RoomState::set_registry`]
649/// so that admin commands (`/kick`, `/reauth`) can also revoke tokens from the
650/// daemon-level [`UserRegistry`] in addition to the in-memory token map.
651pub(crate) async fn create_room_entry(
652    room_id: &str,
653    config: Option<room_protocol::RoomConfig>,
654    rooms: &RoomMap,
655    daemon_config: &DaemonConfig,
656    system_token_map: &TokenMap,
657    registry: Option<Arc<tokio::sync::Mutex<UserRegistry>>>,
658) -> Result<(), String> {
659    validate_room_id(room_id)?;
660    {
661        let map = rooms.lock().await;
662        if map.contains_key(room_id) {
663            return Err(format!("room already exists: {room_id}"));
664        }
665    }
666
667    let chat_path = daemon_config.chat_path(room_id);
668    let subscription_map_path = daemon_config.subscription_map_path(room_id);
669
670    let persisted_subs = super::commands::load_subscription_map(&subscription_map_path);
671    let merged_subs = if let Some(ref cfg) = config {
672        let mut initial = build_initial_subscriptions(cfg);
673        initial.extend(persisted_subs);
674        initial
675    } else {
676        persisted_subs
677    };
678
679    // All rooms in this daemon share the same token map so a token
680    // issued in any room is valid in all rooms.
681    let state = RoomState::new(
682        room_id.to_owned(),
683        chat_path,
684        daemon_config.system_tokens_path(),
685        subscription_map_path,
686        Arc::clone(system_token_map),
687        Arc::new(Mutex::new(merged_subs)),
688        config,
689    )?;
690    if let Some(reg) = registry {
691        state.set_registry(reg);
692    }
693
694    // Attach persisted event filters (parallel to subscription map).
695    let ef_path = daemon_config.event_filter_map_path(room_id);
696    let persisted_ef = super::commands::load_event_filter_map(&ef_path);
697    state.set_event_filter_map(Arc::new(Mutex::new(persisted_ef)), ef_path);
698
699    rooms.lock().await.insert(room_id.to_owned(), state);
700
701    // Write a meta file so one-shot commands (poll, watch, pull) can find the
702    // chat file without a broker connection. The meta file lives in the
703    // platform runtime dir alongside the daemon socket.
704    let meta_path = crate::paths::room_meta_path(room_id);
705    let chat_path_str = daemon_config.chat_path(room_id);
706    let meta_json = serde_json::json!({ "chat_path": chat_path_str });
707    let _ = std::fs::write(&meta_path, meta_json.to_string());
708
709    Ok(())
710}
711
712/// Handle a `DESTROY:<room_id>` request: remove the room from the daemon.
713///
714/// Protocol:
715/// 1. Client sends `DESTROY:<room_id>\n`
716/// 2. Client sends `<token>\n` on the next line.
717/// 3. Daemon validates the token, then responds with
718///    `{"type":"room_destroyed","room":"<id>"}\n` or an error.
719///
720/// Connected clients receive EOF when the room's shutdown signal fires.
721/// Chat files are preserved on disk.
722async fn handle_destroy(
723    room_id: &str,
724    reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
725    write_half: &mut tokio::net::unix::OwnedWriteHalf,
726    rooms: &RoomMap,
727    user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
728) -> anyhow::Result<()> {
729    use tokio::io::AsyncWriteExt;
730
731    if room_id.is_empty() {
732        let err = serde_json::json!({
733            "type": "error",
734            "code": "invalid_room_id",
735            "message": "room ID is empty"
736        });
737        write_half.write_all(format!("{err}\n").as_bytes()).await?;
738        return Ok(());
739    }
740
741    // Read the token from the second line.
742    let mut token_line = String::new();
743    super::read_line_limited(reader, &mut token_line).await?;
744    let token = token_line.trim();
745
746    if token.is_empty() {
747        let err = serde_json::json!({
748            "type": "error",
749            "code": "missing_token",
750            "message": "DESTROY requires a valid token on the second line"
751        });
752        write_half.write_all(format!("{err}\n").as_bytes()).await?;
753        return Ok(());
754    }
755
756    // Validate against UserRegistry.
757    {
758        let reg = user_registry.lock().await;
759        if reg.validate_token(token).is_none() {
760            let err = serde_json::json!({
761                "type": "error",
762                "code": "invalid_token",
763                "message": "token is not valid"
764            });
765            write_half.write_all(format!("{err}\n").as_bytes()).await?;
766            return Ok(());
767        }
768    }
769
770    // Remove the room and signal shutdown.
771    let state = {
772        let mut map = rooms.lock().await;
773        map.remove(room_id)
774    };
775
776    match state {
777        Some(s) => {
778            // Signal shutdown so connected clients receive EOF.
779            let _ = s.shutdown.send(true);
780            let ok = serde_json::json!({
781                "type": "room_destroyed",
782                "room": room_id
783            });
784            write_half.write_all(format!("{ok}\n").as_bytes()).await?;
785        }
786        None => {
787            let err = serde_json::json!({
788                "type": "error",
789                "code": "room_not_found",
790                "room": room_id
791            });
792            write_half.write_all(format!("{err}\n").as_bytes()).await?;
793        }
794    }
795
796    Ok(())
797}
798
799/// Handle a `CREATE:<room_id>` request: validate, read config, create the room.
800///
801/// Protocol:
802/// 1. Client sends `CREATE:<room_id>\n`
803/// 2. Client sends config JSON on the next line: `{"visibility":"public","invite":[]}\n`
804/// 3. Daemon responds with `{"type":"room_created","room":"<id>"}\n` or an error envelope.
805async fn handle_create(
806    room_id: &str,
807    reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
808    write_half: &mut tokio::net::unix::OwnedWriteHalf,
809    rooms: &RoomMap,
810    daemon_config: &DaemonConfig,
811    system_token_map: &TokenMap,
812    user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
813) -> anyhow::Result<()> {
814    use tokio::io::AsyncWriteExt;
815
816    // Validate room ID.
817    if let Err(e) = validate_room_id(room_id) {
818        let err = serde_json::json!({
819            "type": "error",
820            "code": "invalid_room_id",
821            "message": e
822        });
823        write_half.write_all(format!("{err}\n").as_bytes()).await?;
824        return Ok(());
825    }
826
827    // Check for duplicate before reading config (fast-fail).
828    {
829        let map = rooms.lock().await;
830        if map.contains_key(room_id) {
831            let err = serde_json::json!({
832                "type": "error",
833                "code": "room_exists",
834                "message": format!("room already exists: {room_id}")
835            });
836            write_half.write_all(format!("{err}\n").as_bytes()).await?;
837            return Ok(());
838        }
839    }
840
841    // Read config JSON from second line.
842    let mut config_line = String::new();
843    super::read_line_limited(reader, &mut config_line).await?;
844    let config_str = config_line.trim();
845
846    let (visibility_str, invite, token): (String, Vec<String>, Option<String>) =
847        if config_str.is_empty() {
848            ("public".into(), vec![], None)
849        } else {
850            let v: serde_json::Value = match serde_json::from_str(config_str) {
851                Ok(v) => v,
852                Err(e) => {
853                    let err = serde_json::json!({
854                        "type": "error",
855                        "code": "invalid_config",
856                        "message": format!("invalid config JSON: {e}")
857                    });
858                    write_half.write_all(format!("{err}\n").as_bytes()).await?;
859                    return Ok(());
860                }
861            };
862            let vis = v["visibility"].as_str().unwrap_or("public").to_owned();
863            let inv = v["invite"]
864                .as_array()
865                .map(|arr| {
866                    arr.iter()
867                        .filter_map(|v| v.as_str().map(|s| s.to_owned()))
868                        .collect()
869                })
870                .unwrap_or_default();
871            let tok = v["token"].as_str().map(|s| s.to_owned());
872            (vis, inv, tok)
873        };
874
875    // Validate the token.
876    let token_str = match token.as_deref() {
877        Some(t) if !t.is_empty() => t,
878        _ => {
879            let err = serde_json::json!({
880                "type": "error",
881                "code": "missing_token",
882                "message": "CREATE requires a valid token in the config JSON"
883            });
884            write_half.write_all(format!("{err}\n").as_bytes()).await?;
885            return Ok(());
886        }
887    };
888    {
889        let reg = user_registry.lock().await;
890        if reg.validate_token(token_str).is_none() {
891            let err = serde_json::json!({
892                "type": "error",
893                "code": "invalid_token",
894                "message": "token is not valid"
895            });
896            write_half.write_all(format!("{err}\n").as_bytes()).await?;
897            return Ok(());
898        }
899    }
900
901    // Build RoomConfig from the parsed visibility + invite list.
902    let room_config = match visibility_str.as_str() {
903        "public" => room_protocol::RoomConfig {
904            visibility: room_protocol::RoomVisibility::Public,
905            max_members: None,
906            invite_list: invite.into_iter().collect(),
907            created_by: "system".to_owned(),
908            created_at: chrono::Utc::now().to_rfc3339(),
909        },
910        "private" => room_protocol::RoomConfig {
911            visibility: room_protocol::RoomVisibility::Private,
912            max_members: None,
913            invite_list: invite.into_iter().collect(),
914            created_by: "system".to_owned(),
915            created_at: chrono::Utc::now().to_rfc3339(),
916        },
917        "dm" => {
918            if invite.len() != 2 {
919                let err = serde_json::json!({
920                    "type": "error",
921                    "code": "invalid_config",
922                    "message": "dm visibility requires exactly 2 users in invite list"
923                });
924                write_half.write_all(format!("{err}\n").as_bytes()).await?;
925                return Ok(());
926            }
927            room_protocol::RoomConfig::dm(&invite[0], &invite[1])
928        }
929        other => {
930            let err = serde_json::json!({
931                "type": "error",
932                "code": "invalid_config",
933                "message": format!("unknown visibility: {other}")
934            });
935            write_half.write_all(format!("{err}\n").as_bytes()).await?;
936            return Ok(());
937        }
938    };
939
940    // Delegate to the shared room-creation helper.
941    if let Err(e) = create_room_entry(
942        room_id,
943        Some(room_config),
944        rooms,
945        daemon_config,
946        system_token_map,
947        Some(user_registry.clone()),
948    )
949    .await
950    {
951        let err = serde_json::json!({
952            "type": "error",
953            "code": "internal",
954            "message": e
955        });
956        write_half.write_all(format!("{err}\n").as_bytes()).await?;
957        return Ok(());
958    }
959
960    let ok = serde_json::json!({
961        "type": "room_created",
962        "room": room_id
963    });
964    write_half.write_all(format!("{ok}\n").as_bytes()).await?;
965    Ok(())
966}
967
968/// Handle a global `JOIN:<username>` request at daemon level.
969///
970/// Registers the user in the global UserRegistry (or returns the existing token
971/// if already registered) and writes the token response. No room association.
972async fn handle_global_join(
973    username: &str,
974    write_half: &mut tokio::net::unix::OwnedWriteHalf,
975    registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
976) -> anyhow::Result<()> {
977    use tokio::io::AsyncWriteExt;
978
979    let mut reg = registry.lock().await;
980
981    // If user already has a token, return it. Otherwise register and issue.
982    let token = if reg.has_token_for_user(username) {
983        // Find existing token via snapshot (reverse lookup: token→user).
984        reg.token_snapshot()
985            .into_iter()
986            .find(|(_, u)| u == username)
987            .map(|(t, _)| t)
988            .expect("has_token_for_user was true but no token found")
989    } else {
990        reg.register_user_idempotent(username)
991            .map_err(|e| anyhow::anyhow!("registration failed: {e}"))?;
992        reg.issue_token(username)
993            .map_err(|e| anyhow::anyhow!("token issuance failed: {e}"))?
994    };
995
996    let resp = serde_json::json!({
997        "type": "token",
998        "token": token,
999        "username": username
1000    });
1001    write_half.write_all(format!("{resp}\n").as_bytes()).await?;
1002    Ok(())
1003}
1004
1005/// Dispatch a raw UDS connection to the correct room based on the handshake.
1006///
1007/// Handles two top-level protocols:
1008/// - `CREATE:<room_id>` — create a new room (reads config JSON from second line)
1009/// - `ROOM:<room_id>:<rest>` — route to an existing room
1010async fn dispatch_connection(
1011    stream: tokio::net::UnixStream,
1012    rooms: &RoomMap,
1013    next_client_id: &Arc<AtomicU64>,
1014    daemon_config: &DaemonConfig,
1015    system_token_map: &TokenMap,
1016    user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
1017) -> anyhow::Result<()> {
1018    use tokio::io::{AsyncWriteExt, BufReader};
1019
1020    let (read_half, mut write_half) = stream.into_split();
1021    let mut reader = BufReader::new(read_half);
1022
1023    let mut first = String::new();
1024    super::read_line_limited(&mut reader, &mut first).await?;
1025    let first_line = first.trim();
1026
1027    if first_line.is_empty() {
1028        return Ok(());
1029    }
1030
1031    use super::handshake::{
1032        parse_client_handshake, parse_daemon_prefix, ClientHandshake, DaemonPrefix,
1033    };
1034    let (room_id, rest) = match parse_daemon_prefix(first_line) {
1035        DaemonPrefix::Destroy(room_id) => {
1036            return handle_destroy(&room_id, &mut reader, &mut write_half, rooms, user_registry)
1037                .await;
1038        }
1039        DaemonPrefix::Create(room_id) => {
1040            return handle_create(
1041                &room_id,
1042                &mut reader,
1043                &mut write_half,
1044                rooms,
1045                daemon_config,
1046                system_token_map,
1047                user_registry,
1048            )
1049            .await;
1050        }
1051        DaemonPrefix::Join(username) => {
1052            return handle_global_join(&username, &mut write_half, user_registry).await;
1053        }
1054        DaemonPrefix::Room { room_id, rest } => (room_id, rest),
1055        DaemonPrefix::Unknown => {
1056            let err = serde_json::json!({
1057                "type": "error",
1058                "code": "missing_room_prefix",
1059                "message": "daemon mode requires ROOM:<room_id>: or CREATE:<room_id> prefix"
1060            });
1061            write_half.write_all(format!("{err}\n").as_bytes()).await?;
1062            return Ok(());
1063        }
1064    };
1065
1066    // Look up the room.
1067    let state = {
1068        let map = rooms.lock().await;
1069        map.get(room_id.as_str()).cloned()
1070    };
1071
1072    let state = match state {
1073        Some(s) => s,
1074        None => {
1075            let err = serde_json::json!({
1076                "type": "error",
1077                "code": "room_not_found",
1078                "room": room_id
1079            });
1080            write_half.write_all(format!("{err}\n").as_bytes()).await?;
1081            return Ok(());
1082        }
1083    };
1084
1085    let cid = next_client_id.fetch_add(1, Ordering::SeqCst) + 1;
1086
1087    // Dispatch based on the per-room handshake after the ROOM: prefix.
1088    let username = match parse_client_handshake(&rest) {
1089        ClientHandshake::Send(u) => {
1090            eprintln!(
1091                "[broker/daemon] DEPRECATED: SEND:{u} handshake used — \
1092                 migrate to TOKEN:<uuid> (SEND: will be removed in a future version)"
1093            );
1094            return handle_oneshot_send(u, reader, write_half, &state).await;
1095        }
1096        ClientHandshake::Token(token) => {
1097            // Try room-level token map first, then fall back to global UserRegistry.
1098            let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1099                Some(u) => Some(u),
1100                None => {
1101                    let reg = user_registry.lock().await;
1102                    reg.validate_token(&token).map(|u| u.to_owned())
1103                }
1104            };
1105            return match resolved {
1106                Some(u) => handle_oneshot_send(u, reader, write_half, &state).await,
1107                None => {
1108                    let err = serde_json::json!({"type":"error","code":"invalid_token"});
1109                    write_half
1110                        .write_all(format!("{err}\n").as_bytes())
1111                        .await
1112                        .map_err(Into::into)
1113                }
1114            };
1115        }
1116        ClientHandshake::Join(u) => {
1117            let result = super::auth::handle_oneshot_join_with_registry(
1118                u,
1119                write_half,
1120                user_registry,
1121                &state.token_map,
1122                &state.subscription_map,
1123                state.config.as_ref(),
1124            )
1125            .await;
1126            // Persist auto-subscription from join so it survives broker restart.
1127            super::commands::persist_subscriptions(&state).await;
1128            return result;
1129        }
1130        ClientHandshake::Session(token) => {
1131            // Resolve username from token (room-level first, then UserRegistry).
1132            let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1133                Some(u) => Some(u),
1134                None => {
1135                    let reg = user_registry.lock().await;
1136                    reg.validate_token(&token).map(|u| u.to_owned())
1137                }
1138            };
1139            match resolved {
1140                Some(u) => u,
1141                None => {
1142                    let err = serde_json::json!({"type":"error","code":"invalid_token"});
1143                    write_half.write_all(format!("{err}\n").as_bytes()).await?;
1144                    return Ok(());
1145                }
1146            }
1147        }
1148        ClientHandshake::Interactive(u) => {
1149            eprintln!(
1150                "[broker/daemon] DEPRECATED: unauthenticated interactive join for '{u}' — \
1151                 migrate to SESSION:<token>"
1152            );
1153            u
1154        }
1155    };
1156
1157    // Interactive join (authenticated via SESSION: or deprecated plain username).
1158    if username.is_empty() {
1159        return Ok(());
1160    }
1161
1162    // Check join permission before entering interactive session.
1163    if let Err(reason) = super::auth::check_join_permission(&username, state.config.as_ref()) {
1164        let err = serde_json::json!({
1165            "type": "error",
1166            "code": "join_denied",
1167            "message": reason,
1168            "username": username
1169        });
1170        write_half.write_all(format!("{err}\n").as_bytes()).await?;
1171        return Ok(());
1172    }
1173
1174    // Register client in room, then hand off to the full interactive handler.
1175    let (tx, _) = broadcast::channel::<String>(256);
1176    state
1177        .clients
1178        .lock()
1179        .await
1180        .insert(cid, (String::new(), tx.clone()));
1181
1182    let result =
1183        super::run_interactive_session(cid, &username, reader, write_half, tx, &state).await;
1184
1185    state.clients.lock().await.remove(&cid);
1186    result
1187}
1188
1189// ── Tests ─────────────────────────────────────────────────────────────────────
1190
1191#[cfg(test)]
1192mod tests {
1193    use super::*;
1194
1195    // ── PID management ───────────────────────────────────────────────────
1196
1197    #[test]
1198    fn write_pid_file_creates_file_with_current_pid() {
1199        let dir = tempfile::TempDir::new().unwrap();
1200        let path = dir.path().join("test.pid");
1201        write_pid_file(&path).unwrap();
1202        let content = std::fs::read_to_string(&path).unwrap();
1203        let pid: u32 = content.trim().parse().expect("PID should be a number");
1204        assert_eq!(pid, std::process::id());
1205    }
1206
1207    #[test]
1208    fn is_pid_alive_true_for_current_process() {
1209        let dir = tempfile::TempDir::new().unwrap();
1210        let path = dir.path().join("test.pid");
1211        write_pid_file(&path).unwrap();
1212        assert!(is_pid_alive(&path), "current process should be alive");
1213    }
1214
1215    #[test]
1216    fn is_pid_alive_false_for_missing_file() {
1217        let path = std::path::Path::new("/tmp/nonexistent-room-test-99999999.pid");
1218        assert!(!is_pid_alive(path));
1219    }
1220
1221    #[test]
1222    fn remove_pid_file_deletes_file() {
1223        let dir = tempfile::TempDir::new().unwrap();
1224        let path = dir.path().join("remove.pid");
1225        write_pid_file(&path).unwrap();
1226        assert!(path.exists());
1227        remove_pid_file(&path);
1228        assert!(!path.exists());
1229    }
1230
1231    #[test]
1232    fn remove_pid_file_noop_when_missing() {
1233        // Should not panic if the file is already gone.
1234        let path = std::path::Path::new("/tmp/gone-99999999.pid");
1235        remove_pid_file(path); // must not panic
1236    }
1237
1238    // ── DaemonState lifecycle ─────────────────────────────────────────────
1239
1240    /// Test helper: look up a room's state by ID.
1241    async fn get_room(daemon: &DaemonState, room_id: &str) -> Arc<RoomState> {
1242        daemon
1243            .rooms
1244            .lock()
1245            .await
1246            .get(room_id)
1247            .cloned()
1248            .unwrap_or_else(|| panic!("room {room_id} not found"))
1249    }
1250
1251    #[tokio::test]
1252    async fn create_room_succeeds() {
1253        let daemon = DaemonState::new(DaemonConfig::default());
1254        assert!(daemon.create_room("test-room").await.is_ok());
1255        let state = get_room(&daemon, "test-room").await;
1256        assert_eq!(*state.room_id, "test-room");
1257    }
1258
1259    #[tokio::test]
1260    async fn create_duplicate_room_fails() {
1261        let daemon = DaemonState::new(DaemonConfig::default());
1262        daemon.create_room("dup").await.unwrap();
1263        let result = daemon.create_room("dup").await;
1264        assert!(result.is_err());
1265        assert!(result.unwrap_err().contains("already exists"));
1266    }
1267
1268    #[tokio::test]
1269    async fn has_room_returns_true_for_created() {
1270        let daemon = DaemonState::new(DaemonConfig::default());
1271        daemon.create_room("room-a").await.unwrap();
1272        assert!(daemon.has_room("room-a").await);
1273        assert!(!daemon.has_room("room-b").await);
1274    }
1275
1276    #[tokio::test]
1277    async fn destroy_room_removes_it() {
1278        let daemon = DaemonState::new(DaemonConfig::default());
1279        daemon.create_room("doomed").await.unwrap();
1280        assert!(daemon.destroy_room("doomed").await.is_ok());
1281        assert!(!daemon.has_room("doomed").await);
1282    }
1283
1284    #[tokio::test]
1285    async fn destroy_nonexistent_room_fails() {
1286        let daemon = DaemonState::new(DaemonConfig::default());
1287        let result = daemon.destroy_room("nope").await;
1288        assert!(result.is_err());
1289        assert!(result.unwrap_err().contains("not found"));
1290    }
1291
1292    #[tokio::test]
1293    async fn destroy_room_signals_shutdown() {
1294        let daemon = DaemonState::new(DaemonConfig::default());
1295        daemon.create_room("shutme").await.unwrap();
1296        let state = get_room(&daemon, "shutme").await;
1297        let rx = state.shutdown.subscribe();
1298        assert!(!*rx.borrow());
1299
1300        daemon.destroy_room("shutme").await.unwrap();
1301        // The shutdown signal should now be true.
1302        assert!(*rx.borrow());
1303    }
1304
1305    #[tokio::test]
1306    async fn list_rooms_returns_all() {
1307        let daemon = DaemonState::new(DaemonConfig::default());
1308        daemon.create_room("alpha").await.unwrap();
1309        daemon.create_room("beta").await.unwrap();
1310        daemon.create_room("gamma").await.unwrap();
1311
1312        let mut rooms = daemon.list_rooms().await;
1313        rooms.sort();
1314        assert_eq!(rooms, vec!["alpha", "beta", "gamma"]);
1315    }
1316
1317    #[tokio::test]
1318    async fn list_rooms_empty_initially() {
1319        let daemon = DaemonState::new(DaemonConfig::default());
1320        assert!(daemon.list_rooms().await.is_empty());
1321    }
1322
1323    #[tokio::test]
1324    async fn create_room_initializes_plugins() {
1325        let daemon = DaemonState::new(DaemonConfig::default());
1326        daemon.create_room("plugtest").await.unwrap();
1327        let state = get_room(&daemon, "plugtest").await;
1328        // help is a builtin (not in plugin registry), stats should be registered
1329        assert!(state.plugin_registry.resolve("help").is_none());
1330        assert!(state.plugin_registry.resolve("stats").is_some());
1331    }
1332
1333    // ── DaemonConfig ──────────────────────────────────────────────────────
1334
1335    #[test]
1336    fn config_chat_path_format() {
1337        let config = DaemonConfig {
1338            data_dir: PathBuf::from("/var/room"),
1339            ..DaemonConfig::default()
1340        };
1341        assert_eq!(
1342            config.chat_path("myroom"),
1343            PathBuf::from("/var/room/myroom.chat")
1344        );
1345    }
1346
1347    #[test]
1348    fn config_default_socket_path() {
1349        let config = DaemonConfig::default();
1350        assert_eq!(config.socket_path, crate::paths::room_socket_path());
1351    }
1352
1353    // ── create_room_with_config ───────────────────────────────────────────
1354
1355    #[tokio::test]
1356    async fn create_room_with_dm_config() {
1357        let daemon = DaemonState::new(DaemonConfig::default());
1358        let config = room_protocol::RoomConfig::dm("alice", "bob");
1359        assert!(daemon
1360            .create_room_with_config("dm-alice-bob", config)
1361            .await
1362            .is_ok());
1363
1364        let state = get_room(&daemon, "dm-alice-bob").await;
1365        let cfg = state.config.as_ref().unwrap();
1366        assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1367        assert_eq!(cfg.max_members, Some(2));
1368        assert!(cfg.invite_list.contains("alice"));
1369        assert!(cfg.invite_list.contains("bob"));
1370    }
1371
1372    #[tokio::test]
1373    async fn create_room_with_config_duplicate_fails() {
1374        let daemon = DaemonState::new(DaemonConfig::default());
1375        let config = room_protocol::RoomConfig::public("owner");
1376        daemon
1377            .create_room_with_config("dup", config.clone())
1378            .await
1379            .unwrap();
1380        assert!(daemon.create_room_with_config("dup", config).await.is_err());
1381    }
1382
1383    #[tokio::test]
1384    async fn get_room_config_returns_none_for_unconfigured() {
1385        let daemon = DaemonState::new(DaemonConfig::default());
1386        daemon.create_room("plain").await.unwrap();
1387        assert!(daemon.get_room_config("plain").await.is_none());
1388    }
1389
1390    #[tokio::test]
1391    async fn get_room_config_returns_config_when_present() {
1392        let daemon = DaemonState::new(DaemonConfig::default());
1393        let config = room_protocol::RoomConfig::dm("alice", "bob");
1394        daemon
1395            .create_room_with_config("dm-alice-bob", config)
1396            .await
1397            .unwrap();
1398        let cfg = daemon.get_room_config("dm-alice-bob").await.unwrap();
1399        assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1400    }
1401
1402    #[tokio::test]
1403    async fn dm_room_id_deterministic_and_lookup_works() {
1404        let daemon = DaemonState::new(DaemonConfig::default());
1405        let room_id = room_protocol::dm_room_id("bob", "alice").unwrap();
1406        assert_eq!(room_id, "dm-alice-bob");
1407
1408        let config = room_protocol::RoomConfig::dm("bob", "alice");
1409        daemon
1410            .create_room_with_config(&room_id, config)
1411            .await
1412            .unwrap();
1413        assert!(daemon.has_room("dm-alice-bob").await);
1414        // Reverse order gives the same room_id
1415        assert_eq!(
1416            room_protocol::dm_room_id("alice", "bob").unwrap(),
1417            "dm-alice-bob"
1418        );
1419    }
1420
1421    // ── validate_room_id ──────────────────────────────────────────────────
1422
1423    #[test]
1424    fn valid_room_ids() {
1425        for id in [
1426            "lobby",
1427            "agent-room-2",
1428            "my_room",
1429            "Room.1",
1430            "dm-alice-bob",
1431            "a",
1432            &"x".repeat(MAX_ROOM_ID_LEN),
1433        ] {
1434            assert!(validate_room_id(id).is_ok(), "should accept: {id:?}");
1435        }
1436    }
1437
1438    #[test]
1439    fn empty_room_id_rejected() {
1440        let err = validate_room_id("").unwrap_err();
1441        assert!(err.contains("empty"), "{err}");
1442    }
1443
1444    #[test]
1445    fn room_id_too_long_rejected() {
1446        let long = "x".repeat(MAX_ROOM_ID_LEN + 1);
1447        let err = validate_room_id(&long).unwrap_err();
1448        assert!(err.contains("too long"), "{err}");
1449    }
1450
1451    #[test]
1452    fn dot_dot_traversal_rejected() {
1453        for id in ["..", "room/../etc", "..secret", "a..b"] {
1454            let err = validate_room_id(id).unwrap_err();
1455            assert!(err.contains(".."), "should reject {id:?}: {err}");
1456        }
1457    }
1458
1459    #[test]
1460    fn single_dot_rejected() {
1461        let err = validate_room_id(".").unwrap_err();
1462        assert!(err.contains(".."), "{err}");
1463    }
1464
1465    #[test]
1466    fn slash_rejected() {
1467        for id in ["room/sub", "/etc/passwd", "a/b/c"] {
1468            let err = validate_room_id(id).unwrap_err();
1469            assert!(err.contains("unsafe"), "should reject {id:?}: {err}");
1470        }
1471    }
1472
1473    #[test]
1474    fn backslash_rejected() {
1475        let err = validate_room_id("room\\sub").unwrap_err();
1476        assert!(err.contains("unsafe"), "{err}");
1477    }
1478
1479    #[test]
1480    fn null_byte_rejected() {
1481        let err = validate_room_id("room\0id").unwrap_err();
1482        assert!(err.contains("unsafe"), "{err}");
1483    }
1484
1485    #[test]
1486    fn whitespace_rejected() {
1487        for id in ["room name", "room\tid", "room\nid", " leading", "trailing "] {
1488            let err = validate_room_id(id).unwrap_err();
1489            assert!(err.contains("whitespace"), "should reject {id:?}: {err}");
1490        }
1491    }
1492
1493    #[test]
1494    fn other_unsafe_chars_rejected() {
1495        for ch in [':', '*', '?', '"', '<', '>', '|'] {
1496            let id = format!("room{ch}id");
1497            let err = validate_room_id(&id).unwrap_err();
1498            assert!(err.contains("unsafe"), "should reject {ch:?}: {err}");
1499        }
1500    }
1501
1502    #[tokio::test]
1503    async fn create_room_rejects_invalid_id() {
1504        let daemon = DaemonState::new(DaemonConfig::default());
1505        assert!(daemon.create_room("room/sub").await.is_err());
1506        assert!(daemon.create_room("..").await.is_err());
1507        assert!(daemon.create_room("").await.is_err());
1508        assert!(daemon.create_room("room name").await.is_err());
1509    }
1510
1511    #[tokio::test]
1512    async fn create_room_with_config_rejects_invalid_id() {
1513        let daemon = DaemonState::new(DaemonConfig::default());
1514        let config = room_protocol::RoomConfig::public("owner");
1515        assert!(daemon
1516            .create_room_with_config("../etc", config)
1517            .await
1518            .is_err());
1519    }
1520
1521    // ── DM auto-subscribe ─────────────────────────────────────────────────
1522
1523    #[tokio::test]
1524    async fn dm_room_auto_subscribes_both_participants() {
1525        let daemon = DaemonState::new(DaemonConfig::default());
1526        let config = room_protocol::RoomConfig::dm("alice", "bob");
1527        daemon
1528            .create_room_with_config("dm-alice-bob", config)
1529            .await
1530            .unwrap();
1531
1532        let state = get_room(&daemon, "dm-alice-bob").await;
1533        let subs = state.subscription_map.lock().await;
1534        assert_eq!(subs.len(), 2);
1535        assert_eq!(
1536            subs.get("alice"),
1537            Some(&room_protocol::SubscriptionTier::Full)
1538        );
1539        assert_eq!(
1540            subs.get("bob"),
1541            Some(&room_protocol::SubscriptionTier::Full)
1542        );
1543    }
1544
1545    #[tokio::test]
1546    async fn public_room_starts_with_no_subscriptions() {
1547        let daemon = DaemonState::new(DaemonConfig::default());
1548        let config = room_protocol::RoomConfig::public("owner");
1549        daemon
1550            .create_room_with_config("lobby", config)
1551            .await
1552            .unwrap();
1553
1554        let state = get_room(&daemon, "lobby").await;
1555        let subs = state.subscription_map.lock().await;
1556        assert!(subs.is_empty());
1557    }
1558
1559    #[tokio::test]
1560    async fn unconfigured_room_starts_with_no_subscriptions() {
1561        let daemon = DaemonState::new(DaemonConfig::default());
1562        daemon.create_room("plain").await.unwrap();
1563
1564        let state = get_room(&daemon, "plain").await;
1565        let subs = state.subscription_map.lock().await;
1566        assert!(subs.is_empty());
1567    }
1568
1569    #[tokio::test]
1570    async fn dm_auto_subscribe_uses_full_tier() {
1571        let daemon = DaemonState::new(DaemonConfig::default());
1572        let config = room_protocol::RoomConfig::dm("carol", "dave");
1573        daemon
1574            .create_room_with_config("dm-carol-dave", config)
1575            .await
1576            .unwrap();
1577
1578        let state = get_room(&daemon, "dm-carol-dave").await;
1579        let subs = state.subscription_map.lock().await;
1580        // Verify it's Full, not MentionsOnly
1581        for (_, tier) in subs.iter() {
1582            assert_eq!(*tier, room_protocol::SubscriptionTier::Full);
1583        }
1584    }
1585
1586    #[test]
1587    fn build_initial_subscriptions_dm_populates() {
1588        let config = room_protocol::RoomConfig::dm("alice", "bob");
1589        let subs = build_initial_subscriptions(&config);
1590        assert_eq!(subs.len(), 2);
1591        assert_eq!(subs["alice"], room_protocol::SubscriptionTier::Full);
1592        assert_eq!(subs["bob"], room_protocol::SubscriptionTier::Full);
1593    }
1594
1595    #[test]
1596    fn build_initial_subscriptions_public_empty() {
1597        let config = room_protocol::RoomConfig::public("owner");
1598        let subs = build_initial_subscriptions(&config);
1599        assert!(subs.is_empty());
1600    }
1601
1602    // ── DaemonConfig grace_period_secs ────────────────────────────────────
1603
1604    #[test]
1605    fn default_grace_period_is_30() {
1606        let config = DaemonConfig::default();
1607        assert_eq!(config.grace_period_secs, 30);
1608    }
1609
1610    #[test]
1611    fn custom_grace_period_preserved() {
1612        let config = DaemonConfig {
1613            grace_period_secs: 0,
1614            ..DaemonConfig::default()
1615        };
1616        assert_eq!(config.grace_period_secs, 0);
1617    }
1618
1619    // ── connection_count refcount ─────────────────────────────────────────
1620
1621    #[tokio::test]
1622    async fn connection_count_starts_at_zero() {
1623        let daemon = DaemonState::new(DaemonConfig::default());
1624        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1625    }
1626
1627    #[tokio::test]
1628    async fn connection_count_increments_and_decrements() {
1629        let count = Arc::new(AtomicUsize::new(0));
1630        count.fetch_add(1, Ordering::SeqCst);
1631        count.fetch_add(1, Ordering::SeqCst);
1632        assert_eq!(count.load(Ordering::SeqCst), 2);
1633        count.fetch_sub(1, Ordering::SeqCst);
1634        assert_eq!(count.load(Ordering::SeqCst), 1);
1635        count.fetch_sub(1, Ordering::SeqCst);
1636        assert_eq!(count.load(Ordering::SeqCst), 0);
1637    }
1638
1639    /// Verify that the daemon exits cleanly when the shutdown signal is sent.
1640    /// Uses an Arc<DaemonState> so the run() task can hold a reference while
1641    /// the test also holds one to send the shutdown signal.
1642    #[tokio::test]
1643    async fn daemon_exits_on_shutdown_signal() {
1644        let dir = tempfile::TempDir::new().unwrap();
1645        let socket = dir.path().join("test-grace.sock");
1646        std::fs::create_dir_all(dir.path().join("data")).unwrap();
1647        std::fs::create_dir_all(dir.path().join("state")).unwrap();
1648
1649        let config = DaemonConfig {
1650            socket_path: socket.clone(),
1651            data_dir: dir.path().join("data"),
1652            state_dir: dir.path().join("state"),
1653            ws_port: None,
1654            grace_period_secs: 0,
1655        };
1656        let daemon = Arc::new(DaemonState::new(config));
1657        let shutdown = daemon.shutdown_handle();
1658
1659        let daemon2 = Arc::clone(&daemon);
1660        let handle = tokio::spawn(async move { daemon2.run().await });
1661
1662        // Wait for socket to become connectable (daemon is up).
1663        for _ in 0..100 {
1664            if tokio::net::UnixStream::connect(&socket).await.is_ok() {
1665                break;
1666            }
1667            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1668        }
1669        assert!(
1670            tokio::net::UnixStream::connect(&socket).await.is_ok(),
1671            "daemon socket not ready"
1672        );
1673
1674        // Send shutdown — daemon should exit quickly.
1675        let _ = shutdown.send(true);
1676        let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await;
1677        assert!(result.is_ok(), "daemon did not exit within 5s");
1678        assert!(result.unwrap().unwrap().is_ok(), "run() returned error");
1679    }
1680
1681    /// Verify that a new connection during the grace period resets the timer.
1682    /// We check this by confirming connection_count goes from 0 → 1 → 0 without
1683    /// a premature shutdown.
1684    #[tokio::test]
1685    async fn grace_period_cancelled_by_new_connection() {
1686        let dir = tempfile::TempDir::new().unwrap();
1687        let socket = dir.path().join("test-cancel-grace.sock");
1688
1689        let config = DaemonConfig {
1690            socket_path: socket.clone(),
1691            data_dir: dir.path().join("data"),
1692            state_dir: dir.path().join("state"),
1693            ws_port: None,
1694            grace_period_secs: 60, // long grace — should not fire
1695        };
1696        let daemon = DaemonState::new(config);
1697
1698        // Manually exercise the counter: simulate connect + disconnect.
1699        daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1700        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1701        daemon.connection_count.fetch_sub(1, Ordering::SeqCst);
1702        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1703
1704        // Simulate a second connection arriving (cancels grace timer).
1705        daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1706        assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1707
1708        // Daemon has not shut down.
1709        assert!(!*daemon.shutdown.borrow());
1710    }
1711
1712    // ── migrate_legacy_tmpdir_tokens ──────────────────────────────────────
1713
1714    /// Write a token file to `dir` in the format written by old `room join`.
1715    fn write_legacy_token(dir: &std::path::Path, room_id: &str, username: &str, token: &str) {
1716        let name = format!("room-{room_id}-{username}.token");
1717        let data = serde_json::json!({"username": username, "token": token});
1718        std::fs::write(dir.join(name), format!("{data}\n")).unwrap();
1719    }
1720
1721    #[test]
1722    fn migrate_legacy_tmpdir_tokens_imports_token() {
1723        let token_dir = tempfile::TempDir::new().unwrap();
1724        let state_dir = tempfile::TempDir::new().unwrap();
1725        write_legacy_token(token_dir.path(), "lobby", "alice", "legacy-uuid-alice");
1726
1727        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1728
1729        // Override the legacy dir by temporarily pointing TMPDIR at token_dir.
1730        // Because legacy_token_dir() reads env on macOS, we run the function
1731        // directly on the directory to avoid touching the process environment.
1732        // Instead we call the inner logic directly with a helper that accepts
1733        // a custom dir.
1734        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1735
1736        assert_eq!(registry.validate_token("legacy-uuid-alice"), Some("alice"));
1737        assert!(registry.get_user("alice").is_some());
1738    }
1739
1740    #[test]
1741    fn migrate_legacy_tmpdir_tokens_idempotent() {
1742        let token_dir = tempfile::TempDir::new().unwrap();
1743        let state_dir = tempfile::TempDir::new().unwrap();
1744        write_legacy_token(token_dir.path(), "lobby", "bob", "tok-bob");
1745
1746        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1747        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1748        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1749
1750        // Token still valid and exactly one entry for bob.
1751        assert_eq!(registry.validate_token("tok-bob"), Some("bob"));
1752        let snap = registry.token_snapshot();
1753        assert_eq!(snap.values().filter(|u| u.as_str() == "bob").count(), 1);
1754    }
1755
1756    #[test]
1757    fn migrate_legacy_tmpdir_tokens_skips_non_token_files() {
1758        let token_dir = tempfile::TempDir::new().unwrap();
1759        let state_dir = tempfile::TempDir::new().unwrap();
1760        std::fs::write(token_dir.path().join("roomd.sock"), "not a token").unwrap();
1761        std::fs::write(token_dir.path().join("something.json"), "{}").unwrap();
1762
1763        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1764        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1765
1766        assert!(registry.list_users().is_empty());
1767    }
1768
1769    #[test]
1770    fn migrate_legacy_tmpdir_tokens_skips_malformed_json() {
1771        let token_dir = tempfile::TempDir::new().unwrap();
1772        let state_dir = tempfile::TempDir::new().unwrap();
1773        std::fs::write(token_dir.path().join("room-x-bad.token"), "not-json{{{").unwrap();
1774
1775        let mut registry = UserRegistry::new(state_dir.path().to_owned());
1776        migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1777
1778        assert!(registry.list_users().is_empty());
1779    }
1780}