1use std::{
34 collections::HashMap,
35 path::PathBuf,
36 sync::{
37 atomic::{AtomicU64, AtomicUsize, Ordering},
38 Arc,
39 },
40};
41
42use tokio::{
43 net::UnixListener,
44 sync::{broadcast, watch, Mutex},
45};
46
47use crate::registry::UserRegistry;
48
49use super::{
50 handle_oneshot_send,
51 state::{RoomState, TokenMap},
52 ws::{self, DaemonWsState},
53};
54
55const UNSAFE_CHARS: &[char] = &['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\0'];
57
58pub fn write_pid_file(path: &std::path::Path) -> std::io::Result<()> {
62 std::fs::write(path, std::process::id().to_string())
63}
64
65pub fn is_pid_alive(path: &std::path::Path) -> bool {
70 let Ok(contents) = std::fs::read_to_string(path) else {
71 return false;
72 };
73 let Ok(pid) = contents.trim().parse::<u32>() else {
74 return false;
75 };
76 pid_alive(pid)
77}
78
79pub fn remove_pid_file(path: &std::path::Path) {
81 let _ = std::fs::remove_file(path);
82}
83
84#[cfg(unix)]
92fn pid_alive(pid: u32) -> bool {
93 extern "C" {
94 fn kill(pid: i32, sig: i32) -> i32;
95 }
96 let ret = unsafe { kill(pid as i32, 0) };
98 if ret == 0 {
99 return true;
100 }
101 std::io::Error::last_os_error().raw_os_error() == Some(1)
103}
104
105#[cfg(not(unix))]
106fn pid_alive(_pid: u32) -> bool {
107 true
109}
110
111const MAX_ROOM_ID_LEN: usize = 64;
113
114pub fn validate_room_id(room_id: &str) -> Result<(), String> {
119 if room_id.is_empty() {
120 return Err("room ID cannot be empty".into());
121 }
122 if room_id.len() > MAX_ROOM_ID_LEN {
123 return Err(format!(
124 "room ID too long ({} chars, max {MAX_ROOM_ID_LEN})",
125 room_id.len()
126 ));
127 }
128 if room_id == "." || room_id == ".." || room_id.contains("..") {
129 return Err("room ID cannot contain '..'".into());
130 }
131 if room_id.chars().any(|c| c.is_whitespace()) {
132 return Err("room ID cannot contain whitespace".into());
133 }
134 if let Some(bad) = room_id.chars().find(|c| UNSAFE_CHARS.contains(c)) {
135 return Err(format!("room ID contains unsafe character: {bad:?}"));
136 }
137 Ok(())
138}
139
140#[derive(Debug, Clone)]
142pub struct DaemonConfig {
143 pub socket_path: PathBuf,
145 pub data_dir: PathBuf,
148 pub state_dir: PathBuf,
151 pub ws_port: Option<u16>,
153 pub grace_period_secs: u64,
158}
159
160impl DaemonConfig {
161 pub fn chat_path(&self, room_id: &str) -> PathBuf {
163 self.data_dir.join(format!("{room_id}.chat"))
164 }
165
166 pub fn token_map_path(&self, room_id: &str) -> PathBuf {
168 crate::paths::broker_tokens_path(&self.state_dir, room_id)
169 }
170
171 pub fn system_tokens_path(&self) -> PathBuf {
177 self.state_dir.join("tokens.json")
178 }
179
180 pub fn subscription_map_path(&self, room_id: &str) -> PathBuf {
182 crate::paths::broker_subscriptions_path(&self.state_dir, room_id)
183 }
184
185 pub fn event_filter_map_path(&self, room_id: &str) -> PathBuf {
187 crate::paths::broker_event_filters_path(&self.state_dir, room_id)
188 }
189}
190
191impl Default for DaemonConfig {
192 fn default() -> Self {
193 Self {
194 socket_path: crate::paths::room_socket_path(),
195 data_dir: crate::paths::room_data_dir(),
196 state_dir: crate::paths::room_state_dir(),
197 ws_port: None,
198 grace_period_secs: 30,
199 }
200 }
201}
202
203pub(crate) type RoomMap = Arc<Mutex<HashMap<String, Arc<RoomState>>>>;
205
206pub struct DaemonState {
208 pub(crate) rooms: RoomMap,
209 pub(crate) config: DaemonConfig,
210 pub(crate) next_client_id: Arc<AtomicU64>,
212 pub(crate) shutdown: Arc<watch::Sender<bool>>,
214 pub(crate) system_token_map: TokenMap,
221 pub(crate) user_registry: Arc<tokio::sync::Mutex<UserRegistry>>,
228 pub(crate) connection_count: Arc<AtomicUsize>,
234}
235
236impl DaemonState {
237 pub fn new(config: DaemonConfig) -> Self {
239 let (shutdown_tx, _) = watch::channel(false);
240
241 let registry = load_or_migrate_registry(&config);
248
249 let token_snapshot = registry.token_snapshot();
252
253 Self {
254 rooms: Arc::new(Mutex::new(HashMap::new())),
255 config,
256 next_client_id: Arc::new(AtomicU64::new(0)),
257 shutdown: Arc::new(shutdown_tx),
258 system_token_map: Arc::new(Mutex::new(token_snapshot)),
259 user_registry: Arc::new(tokio::sync::Mutex::new(registry)),
260 connection_count: Arc::new(AtomicUsize::new(0)),
261 }
262 }
263
264 pub async fn create_room(&self, room_id: &str) -> Result<(), String> {
267 create_room_entry(
268 room_id,
269 None,
270 &self.rooms,
271 &self.config,
272 &self.system_token_map,
273 Some(self.user_registry.clone()),
274 )
275 .await
276 }
277
278 pub async fn create_room_with_config(
281 &self,
282 room_id: &str,
283 config: room_protocol::RoomConfig,
284 ) -> Result<(), String> {
285 create_room_entry(
286 room_id,
287 Some(config),
288 &self.rooms,
289 &self.config,
290 &self.system_token_map,
291 Some(self.user_registry.clone()),
292 )
293 .await
294 }
295
296 pub async fn get_room_config(&self, room_id: &str) -> Option<room_protocol::RoomConfig> {
298 self.rooms
299 .lock()
300 .await
301 .get(room_id)
302 .and_then(|s| s.config.clone())
303 }
304
305 pub async fn destroy_room(&self, room_id: &str) -> Result<(), String> {
309 let mut rooms = self.rooms.lock().await;
310 let state = rooms
311 .remove(room_id)
312 .ok_or_else(|| format!("room not found: {room_id}"))?;
313
314 let _ = state.shutdown.send(true);
316 Ok(())
317 }
318
319 pub async fn has_room(&self, room_id: &str) -> bool {
321 self.rooms.lock().await.contains_key(room_id)
322 }
323
324 pub fn shutdown_handle(&self) -> Arc<watch::Sender<bool>> {
326 self.shutdown.clone()
327 }
328
329 pub async fn list_rooms(&self) -> Vec<String> {
331 self.rooms.lock().await.keys().cloned().collect()
332 }
333
334 #[doc(hidden)]
337 pub async fn test_inject_token(
338 &self,
339 room_id: &str,
340 username: &str,
341 token: &str,
342 ) -> Result<(), String> {
343 let rooms = self.rooms.lock().await;
344 let room = rooms
345 .get(room_id)
346 .ok_or_else(|| format!("room not found: {room_id}"))?;
347 room.token_map
348 .lock()
349 .await
350 .insert(token.to_owned(), username.to_owned());
351 Ok(())
352 }
353
354 pub async fn run(&self) -> anyhow::Result<()> {
362 let pid_path = if self.config.socket_path == crate::paths::room_socket_path() {
366 match write_pid_file(&crate::paths::room_pid_path()) {
367 Ok(()) => Some(crate::paths::room_pid_path()),
368 Err(e) => {
369 eprintln!("[daemon] failed to write PID file: {e}");
370 None
371 }
372 }
373 } else {
374 None
375 };
376
377 if self.config.socket_path.exists() {
379 std::fs::remove_file(&self.config.socket_path)?;
380 }
381
382 let listener = UnixListener::bind(&self.config.socket_path)?;
383 eprintln!(
384 "[daemon] listening on {}",
385 self.config.socket_path.display()
386 );
387
388 let mut shutdown_rx = self.shutdown.subscribe();
389 let grace_duration = tokio::time::Duration::from_secs(self.config.grace_period_secs);
390
391 let (close_tx, mut close_rx) = tokio::sync::mpsc::channel::<()>(64);
393
394 let mut grace_sleep: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
396
397 if let Some(port) = self.config.ws_port {
399 let ws_state = DaemonWsState {
400 rooms: self.rooms.clone(),
401 next_client_id: self.next_client_id.clone(),
402 config: self.config.clone(),
403 system_token_map: self.system_token_map.clone(),
404 user_registry: self.user_registry.clone(),
405 };
406 let app = ws::create_daemon_router(ws_state);
407 let tcp = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
408 eprintln!("[daemon] WebSocket/REST listening on port {port}");
409 tokio::spawn(async move {
410 if let Err(e) = axum::serve(tcp, app).await {
411 eprintln!("[daemon] WS server error: {e}");
412 }
413 });
414 }
415
416 let result = loop {
417 let grace_fut = async {
420 match grace_sleep.as_mut() {
421 Some(s) => {
422 s.await;
423 }
424 None => std::future::pending::<()>().await,
425 }
426 };
427
428 tokio::select! {
429 accept = listener.accept() => {
430 let (stream, _) = match accept {
431 Ok(a) => a,
432 Err(e) => break Err(e.into()),
433 };
434 grace_sleep = None;
436
437 let count = self.connection_count.clone();
438 count.fetch_add(1, Ordering::SeqCst);
439 let rooms = self.rooms.clone();
440 let next_id = self.next_client_id.clone();
441 let cfg = self.config.clone();
442 let sys_tokens = self.system_token_map.clone();
443 let registry = self.user_registry.clone();
444 let tx = close_tx.clone();
445
446 tokio::spawn(async move {
447 if let Err(e) = dispatch_connection(stream, &rooms, &next_id, &cfg, &sys_tokens, ®istry).await {
448 eprintln!("[daemon] connection error: {e:#}");
449 }
450 count.fetch_sub(1, Ordering::SeqCst);
451 let _ = tx.send(()).await;
453 });
454 }
455 Some(()) = close_rx.recv() => {
456 if self.connection_count.load(Ordering::SeqCst) == 0 {
458 eprintln!(
459 "[daemon] no connections — grace period {}s started",
460 self.config.grace_period_secs
461 );
462 grace_sleep =
463 Some(Box::pin(tokio::time::sleep(grace_duration)));
464 }
465 }
466 _ = grace_fut => {
467 eprintln!("[daemon] grace period expired, shutting down");
468 let _ = self.shutdown.send(true);
469 break Ok(());
472 }
473 _ = shutdown_rx.changed() => {
474 eprintln!("[daemon] shutdown requested, exiting");
475 if let Some(ref p) = pid_path {
476 remove_pid_file(p);
477 }
478 break Ok(());
479 }
480 }
481 };
482
483 let _ = std::fs::remove_file(&self.config.socket_path);
485 let _ = std::fs::remove_file(crate::paths::room_pid_path());
486 for room_id in self.list_rooms().await {
488 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
489 }
490
491 result
492 }
493}
494
495fn load_or_migrate_registry(config: &DaemonConfig) -> UserRegistry {
505 let users_path = config.state_dir.join("users.json");
506
507 let mut registry = if users_path.exists() {
508 UserRegistry::load(config.state_dir.clone()).unwrap_or_else(|e| {
510 eprintln!("[daemon] failed to load user registry: {e}; starting empty");
511 UserRegistry::new(config.state_dir.clone())
512 })
513 } else {
514 let tokens_path = config.system_tokens_path();
516 if tokens_path.exists() {
517 let legacy = super::auth::load_token_map(&tokens_path);
518 if !legacy.is_empty() {
519 eprintln!(
520 "[daemon] migrating {} token(s) from tokens.json to users.json",
521 legacy.len()
522 );
523 let mut reg = UserRegistry::new(config.state_dir.clone());
524 for (token, username) in &legacy {
525 if let Err(e) = reg.register_user_idempotent(username) {
527 eprintln!("[daemon] migration: register {username}: {e}");
528 continue;
529 }
530 let _ = reg.issue_token(username);
537 let _ = token; }
539 if let Err(e) = reg.save() {
540 eprintln!("[daemon] migration save failed: {e}");
541 }
542 reg
543 } else {
544 UserRegistry::new(config.state_dir.clone())
546 }
547 } else {
548 UserRegistry::new(config.state_dir.clone())
550 }
551 };
552
553 migrate_legacy_tmpdir_tokens(&mut registry);
557
558 registry
559}
560
561fn migrate_legacy_tmpdir_tokens(registry: &mut UserRegistry) {
571 let legacy_dir = crate::paths::legacy_token_dir();
572 migrate_legacy_tmpdir_tokens_from(&legacy_dir, registry);
573}
574
575fn migrate_legacy_tmpdir_tokens_from(legacy_dir: &std::path::Path, registry: &mut UserRegistry) {
579 let entries = match std::fs::read_dir(legacy_dir) {
580 Ok(e) => e,
581 Err(_) => return,
582 };
583 let mut count = 0usize;
584 for entry in entries.filter_map(|e| e.ok()) {
585 let path = entry.path();
586 let name = match path.file_name().and_then(|n| n.to_str()) {
587 Some(n) => n.to_owned(),
588 None => continue,
589 };
590 if !name.starts_with("room-") || !name.ends_with(".token") {
591 continue;
592 }
593 let data = match std::fs::read_to_string(&path) {
594 Ok(d) => d,
595 Err(_) => continue,
596 };
597 let v: serde_json::Value = match serde_json::from_str(data.trim()) {
598 Ok(v) => v,
599 Err(_) => continue,
600 };
601 let (username, token) = match (v["username"].as_str(), v["token"].as_str()) {
602 (Some(u), Some(t)) if !u.is_empty() && !t.is_empty() => (u.to_owned(), t.to_owned()),
603 _ => continue,
604 };
605 if let Err(e) = registry.register_user_idempotent(&username) {
606 eprintln!("[daemon] legacy token migration: register {username}: {e}");
607 continue;
608 }
609 match registry.import_token(&username, &token) {
610 Ok(()) => count += 1,
611 Err(e) => {
612 eprintln!("[daemon] legacy token migration: import token for {username}: {e}")
613 }
614 }
615 }
616 if count > 0 {
617 eprintln!(
618 "[daemon] imported {count} legacy token(s) from {}",
619 legacy_dir.display()
620 );
621 }
622}
623
624fn build_initial_subscriptions(
631 config: &room_protocol::RoomConfig,
632) -> HashMap<String, room_protocol::SubscriptionTier> {
633 let mut subs = HashMap::new();
634 if config.visibility == room_protocol::RoomVisibility::Dm {
635 for user in &config.invite_list {
636 subs.insert(user.clone(), room_protocol::SubscriptionTier::Full);
637 }
638 }
639 subs
640}
641
642pub(crate) async fn create_room_entry(
652 room_id: &str,
653 config: Option<room_protocol::RoomConfig>,
654 rooms: &RoomMap,
655 daemon_config: &DaemonConfig,
656 system_token_map: &TokenMap,
657 registry: Option<Arc<tokio::sync::Mutex<UserRegistry>>>,
658) -> Result<(), String> {
659 validate_room_id(room_id)?;
660 {
661 let map = rooms.lock().await;
662 if map.contains_key(room_id) {
663 return Err(format!("room already exists: {room_id}"));
664 }
665 }
666
667 let chat_path = daemon_config.chat_path(room_id);
668 let subscription_map_path = daemon_config.subscription_map_path(room_id);
669
670 let persisted_subs = super::commands::load_subscription_map(&subscription_map_path);
671 let merged_subs = if let Some(ref cfg) = config {
672 let mut initial = build_initial_subscriptions(cfg);
673 initial.extend(persisted_subs);
674 initial
675 } else {
676 persisted_subs
677 };
678
679 let state = RoomState::new(
682 room_id.to_owned(),
683 chat_path,
684 daemon_config.system_tokens_path(),
685 subscription_map_path,
686 Arc::clone(system_token_map),
687 Arc::new(Mutex::new(merged_subs)),
688 config,
689 )?;
690 if let Some(reg) = registry {
691 state.set_registry(reg);
692 }
693
694 let ef_path = daemon_config.event_filter_map_path(room_id);
696 let persisted_ef = super::commands::load_event_filter_map(&ef_path);
697 state.set_event_filter_map(Arc::new(Mutex::new(persisted_ef)), ef_path);
698
699 rooms.lock().await.insert(room_id.to_owned(), state);
700
701 let meta_path = crate::paths::room_meta_path(room_id);
705 let chat_path_str = daemon_config.chat_path(room_id);
706 let meta_json = serde_json::json!({ "chat_path": chat_path_str });
707 let _ = std::fs::write(&meta_path, meta_json.to_string());
708
709 Ok(())
710}
711
712async fn handle_destroy(
723 room_id: &str,
724 reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
725 write_half: &mut tokio::net::unix::OwnedWriteHalf,
726 rooms: &RoomMap,
727 user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
728) -> anyhow::Result<()> {
729 use tokio::io::AsyncWriteExt;
730
731 if room_id.is_empty() {
732 let err = serde_json::json!({
733 "type": "error",
734 "code": "invalid_room_id",
735 "message": "room ID is empty"
736 });
737 write_half.write_all(format!("{err}\n").as_bytes()).await?;
738 return Ok(());
739 }
740
741 let mut token_line = String::new();
743 super::read_line_limited(reader, &mut token_line).await?;
744 let token = token_line.trim();
745
746 if token.is_empty() {
747 let err = serde_json::json!({
748 "type": "error",
749 "code": "missing_token",
750 "message": "DESTROY requires a valid token on the second line"
751 });
752 write_half.write_all(format!("{err}\n").as_bytes()).await?;
753 return Ok(());
754 }
755
756 {
758 let reg = user_registry.lock().await;
759 if reg.validate_token(token).is_none() {
760 let err = serde_json::json!({
761 "type": "error",
762 "code": "invalid_token",
763 "message": "token is not valid"
764 });
765 write_half.write_all(format!("{err}\n").as_bytes()).await?;
766 return Ok(());
767 }
768 }
769
770 let state = {
772 let mut map = rooms.lock().await;
773 map.remove(room_id)
774 };
775
776 match state {
777 Some(s) => {
778 let _ = s.shutdown.send(true);
780 let ok = serde_json::json!({
781 "type": "room_destroyed",
782 "room": room_id
783 });
784 write_half.write_all(format!("{ok}\n").as_bytes()).await?;
785 }
786 None => {
787 let err = serde_json::json!({
788 "type": "error",
789 "code": "room_not_found",
790 "room": room_id
791 });
792 write_half.write_all(format!("{err}\n").as_bytes()).await?;
793 }
794 }
795
796 Ok(())
797}
798
799async fn handle_create(
806 room_id: &str,
807 reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
808 write_half: &mut tokio::net::unix::OwnedWriteHalf,
809 rooms: &RoomMap,
810 daemon_config: &DaemonConfig,
811 system_token_map: &TokenMap,
812 user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
813) -> anyhow::Result<()> {
814 use tokio::io::AsyncWriteExt;
815
816 if let Err(e) = validate_room_id(room_id) {
818 let err = serde_json::json!({
819 "type": "error",
820 "code": "invalid_room_id",
821 "message": e
822 });
823 write_half.write_all(format!("{err}\n").as_bytes()).await?;
824 return Ok(());
825 }
826
827 {
829 let map = rooms.lock().await;
830 if map.contains_key(room_id) {
831 let err = serde_json::json!({
832 "type": "error",
833 "code": "room_exists",
834 "message": format!("room already exists: {room_id}")
835 });
836 write_half.write_all(format!("{err}\n").as_bytes()).await?;
837 return Ok(());
838 }
839 }
840
841 let mut config_line = String::new();
843 super::read_line_limited(reader, &mut config_line).await?;
844 let config_str = config_line.trim();
845
846 let (visibility_str, invite, token): (String, Vec<String>, Option<String>) =
847 if config_str.is_empty() {
848 ("public".into(), vec![], None)
849 } else {
850 let v: serde_json::Value = match serde_json::from_str(config_str) {
851 Ok(v) => v,
852 Err(e) => {
853 let err = serde_json::json!({
854 "type": "error",
855 "code": "invalid_config",
856 "message": format!("invalid config JSON: {e}")
857 });
858 write_half.write_all(format!("{err}\n").as_bytes()).await?;
859 return Ok(());
860 }
861 };
862 let vis = v["visibility"].as_str().unwrap_or("public").to_owned();
863 let inv = v["invite"]
864 .as_array()
865 .map(|arr| {
866 arr.iter()
867 .filter_map(|v| v.as_str().map(|s| s.to_owned()))
868 .collect()
869 })
870 .unwrap_or_default();
871 let tok = v["token"].as_str().map(|s| s.to_owned());
872 (vis, inv, tok)
873 };
874
875 let token_str = match token.as_deref() {
877 Some(t) if !t.is_empty() => t,
878 _ => {
879 let err = serde_json::json!({
880 "type": "error",
881 "code": "missing_token",
882 "message": "CREATE requires a valid token in the config JSON"
883 });
884 write_half.write_all(format!("{err}\n").as_bytes()).await?;
885 return Ok(());
886 }
887 };
888 {
889 let reg = user_registry.lock().await;
890 if reg.validate_token(token_str).is_none() {
891 let err = serde_json::json!({
892 "type": "error",
893 "code": "invalid_token",
894 "message": "token is not valid"
895 });
896 write_half.write_all(format!("{err}\n").as_bytes()).await?;
897 return Ok(());
898 }
899 }
900
901 let room_config = match visibility_str.as_str() {
903 "public" => room_protocol::RoomConfig {
904 visibility: room_protocol::RoomVisibility::Public,
905 max_members: None,
906 invite_list: invite.into_iter().collect(),
907 created_by: "system".to_owned(),
908 created_at: chrono::Utc::now().to_rfc3339(),
909 },
910 "private" => room_protocol::RoomConfig {
911 visibility: room_protocol::RoomVisibility::Private,
912 max_members: None,
913 invite_list: invite.into_iter().collect(),
914 created_by: "system".to_owned(),
915 created_at: chrono::Utc::now().to_rfc3339(),
916 },
917 "dm" => {
918 if invite.len() != 2 {
919 let err = serde_json::json!({
920 "type": "error",
921 "code": "invalid_config",
922 "message": "dm visibility requires exactly 2 users in invite list"
923 });
924 write_half.write_all(format!("{err}\n").as_bytes()).await?;
925 return Ok(());
926 }
927 room_protocol::RoomConfig::dm(&invite[0], &invite[1])
928 }
929 other => {
930 let err = serde_json::json!({
931 "type": "error",
932 "code": "invalid_config",
933 "message": format!("unknown visibility: {other}")
934 });
935 write_half.write_all(format!("{err}\n").as_bytes()).await?;
936 return Ok(());
937 }
938 };
939
940 if let Err(e) = create_room_entry(
942 room_id,
943 Some(room_config),
944 rooms,
945 daemon_config,
946 system_token_map,
947 Some(user_registry.clone()),
948 )
949 .await
950 {
951 let err = serde_json::json!({
952 "type": "error",
953 "code": "internal",
954 "message": e
955 });
956 write_half.write_all(format!("{err}\n").as_bytes()).await?;
957 return Ok(());
958 }
959
960 let ok = serde_json::json!({
961 "type": "room_created",
962 "room": room_id
963 });
964 write_half.write_all(format!("{ok}\n").as_bytes()).await?;
965 Ok(())
966}
967
968async fn handle_global_join(
973 username: &str,
974 write_half: &mut tokio::net::unix::OwnedWriteHalf,
975 registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
976) -> anyhow::Result<()> {
977 use tokio::io::AsyncWriteExt;
978
979 let mut reg = registry.lock().await;
980
981 let token = if reg.has_token_for_user(username) {
983 reg.token_snapshot()
985 .into_iter()
986 .find(|(_, u)| u == username)
987 .map(|(t, _)| t)
988 .expect("has_token_for_user was true but no token found")
989 } else {
990 reg.register_user_idempotent(username)
991 .map_err(|e| anyhow::anyhow!("registration failed: {e}"))?;
992 reg.issue_token(username)
993 .map_err(|e| anyhow::anyhow!("token issuance failed: {e}"))?
994 };
995
996 let resp = serde_json::json!({
997 "type": "token",
998 "token": token,
999 "username": username
1000 });
1001 write_half.write_all(format!("{resp}\n").as_bytes()).await?;
1002 Ok(())
1003}
1004
1005async fn dispatch_connection(
1011 stream: tokio::net::UnixStream,
1012 rooms: &RoomMap,
1013 next_client_id: &Arc<AtomicU64>,
1014 daemon_config: &DaemonConfig,
1015 system_token_map: &TokenMap,
1016 user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
1017) -> anyhow::Result<()> {
1018 use tokio::io::{AsyncWriteExt, BufReader};
1019
1020 let (read_half, mut write_half) = stream.into_split();
1021 let mut reader = BufReader::new(read_half);
1022
1023 let mut first = String::new();
1024 super::read_line_limited(&mut reader, &mut first).await?;
1025 let first_line = first.trim();
1026
1027 if first_line.is_empty() {
1028 return Ok(());
1029 }
1030
1031 use super::handshake::{
1032 parse_client_handshake, parse_daemon_prefix, ClientHandshake, DaemonPrefix,
1033 };
1034 let (room_id, rest) = match parse_daemon_prefix(first_line) {
1035 DaemonPrefix::Destroy(room_id) => {
1036 return handle_destroy(&room_id, &mut reader, &mut write_half, rooms, user_registry)
1037 .await;
1038 }
1039 DaemonPrefix::Create(room_id) => {
1040 return handle_create(
1041 &room_id,
1042 &mut reader,
1043 &mut write_half,
1044 rooms,
1045 daemon_config,
1046 system_token_map,
1047 user_registry,
1048 )
1049 .await;
1050 }
1051 DaemonPrefix::Join(username) => {
1052 return handle_global_join(&username, &mut write_half, user_registry).await;
1053 }
1054 DaemonPrefix::Room { room_id, rest } => (room_id, rest),
1055 DaemonPrefix::Unknown => {
1056 let err = serde_json::json!({
1057 "type": "error",
1058 "code": "missing_room_prefix",
1059 "message": "daemon mode requires ROOM:<room_id>: or CREATE:<room_id> prefix"
1060 });
1061 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1062 return Ok(());
1063 }
1064 };
1065
1066 let state = {
1068 let map = rooms.lock().await;
1069 map.get(room_id.as_str()).cloned()
1070 };
1071
1072 let state = match state {
1073 Some(s) => s,
1074 None => {
1075 let err = serde_json::json!({
1076 "type": "error",
1077 "code": "room_not_found",
1078 "room": room_id
1079 });
1080 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1081 return Ok(());
1082 }
1083 };
1084
1085 let cid = next_client_id.fetch_add(1, Ordering::SeqCst) + 1;
1086
1087 let username = match parse_client_handshake(&rest) {
1089 ClientHandshake::Send(u) => {
1090 eprintln!(
1091 "[broker/daemon] DEPRECATED: SEND:{u} handshake used — \
1092 migrate to TOKEN:<uuid> (SEND: will be removed in a future version)"
1093 );
1094 return handle_oneshot_send(u, reader, write_half, &state).await;
1095 }
1096 ClientHandshake::Token(token) => {
1097 let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1099 Some(u) => Some(u),
1100 None => {
1101 let reg = user_registry.lock().await;
1102 reg.validate_token(&token).map(|u| u.to_owned())
1103 }
1104 };
1105 return match resolved {
1106 Some(u) => handle_oneshot_send(u, reader, write_half, &state).await,
1107 None => {
1108 let err = serde_json::json!({"type":"error","code":"invalid_token"});
1109 write_half
1110 .write_all(format!("{err}\n").as_bytes())
1111 .await
1112 .map_err(Into::into)
1113 }
1114 };
1115 }
1116 ClientHandshake::Join(u) => {
1117 let result = super::auth::handle_oneshot_join_with_registry(
1118 u,
1119 write_half,
1120 user_registry,
1121 &state.token_map,
1122 &state.subscription_map,
1123 state.config.as_ref(),
1124 )
1125 .await;
1126 super::commands::persist_subscriptions(&state).await;
1128 return result;
1129 }
1130 ClientHandshake::Session(token) => {
1131 let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1133 Some(u) => Some(u),
1134 None => {
1135 let reg = user_registry.lock().await;
1136 reg.validate_token(&token).map(|u| u.to_owned())
1137 }
1138 };
1139 match resolved {
1140 Some(u) => u,
1141 None => {
1142 let err = serde_json::json!({"type":"error","code":"invalid_token"});
1143 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1144 return Ok(());
1145 }
1146 }
1147 }
1148 ClientHandshake::Interactive(u) => {
1149 eprintln!(
1150 "[broker/daemon] DEPRECATED: unauthenticated interactive join for '{u}' — \
1151 migrate to SESSION:<token>"
1152 );
1153 u
1154 }
1155 };
1156
1157 if username.is_empty() {
1159 return Ok(());
1160 }
1161
1162 if let Err(reason) = super::auth::check_join_permission(&username, state.config.as_ref()) {
1164 let err = serde_json::json!({
1165 "type": "error",
1166 "code": "join_denied",
1167 "message": reason,
1168 "username": username
1169 });
1170 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1171 return Ok(());
1172 }
1173
1174 let (tx, _) = broadcast::channel::<String>(256);
1176 state
1177 .clients
1178 .lock()
1179 .await
1180 .insert(cid, (String::new(), tx.clone()));
1181
1182 let result =
1183 super::run_interactive_session(cid, &username, reader, write_half, tx, &state).await;
1184
1185 state.clients.lock().await.remove(&cid);
1186 result
1187}
1188
1189#[cfg(test)]
1192mod tests {
1193 use super::*;
1194
1195 #[test]
1198 fn write_pid_file_creates_file_with_current_pid() {
1199 let dir = tempfile::TempDir::new().unwrap();
1200 let path = dir.path().join("test.pid");
1201 write_pid_file(&path).unwrap();
1202 let content = std::fs::read_to_string(&path).unwrap();
1203 let pid: u32 = content.trim().parse().expect("PID should be a number");
1204 assert_eq!(pid, std::process::id());
1205 }
1206
1207 #[test]
1208 fn is_pid_alive_true_for_current_process() {
1209 let dir = tempfile::TempDir::new().unwrap();
1210 let path = dir.path().join("test.pid");
1211 write_pid_file(&path).unwrap();
1212 assert!(is_pid_alive(&path), "current process should be alive");
1213 }
1214
1215 #[test]
1216 fn is_pid_alive_false_for_missing_file() {
1217 let path = std::path::Path::new("/tmp/nonexistent-room-test-99999999.pid");
1218 assert!(!is_pid_alive(path));
1219 }
1220
1221 #[test]
1222 fn remove_pid_file_deletes_file() {
1223 let dir = tempfile::TempDir::new().unwrap();
1224 let path = dir.path().join("remove.pid");
1225 write_pid_file(&path).unwrap();
1226 assert!(path.exists());
1227 remove_pid_file(&path);
1228 assert!(!path.exists());
1229 }
1230
1231 #[test]
1232 fn remove_pid_file_noop_when_missing() {
1233 let path = std::path::Path::new("/tmp/gone-99999999.pid");
1235 remove_pid_file(path); }
1237
1238 async fn get_room(daemon: &DaemonState, room_id: &str) -> Arc<RoomState> {
1242 daemon
1243 .rooms
1244 .lock()
1245 .await
1246 .get(room_id)
1247 .cloned()
1248 .unwrap_or_else(|| panic!("room {room_id} not found"))
1249 }
1250
1251 #[tokio::test]
1252 async fn create_room_succeeds() {
1253 let daemon = DaemonState::new(DaemonConfig::default());
1254 assert!(daemon.create_room("test-room").await.is_ok());
1255 let state = get_room(&daemon, "test-room").await;
1256 assert_eq!(*state.room_id, "test-room");
1257 }
1258
1259 #[tokio::test]
1260 async fn create_duplicate_room_fails() {
1261 let daemon = DaemonState::new(DaemonConfig::default());
1262 daemon.create_room("dup").await.unwrap();
1263 let result = daemon.create_room("dup").await;
1264 assert!(result.is_err());
1265 assert!(result.unwrap_err().contains("already exists"));
1266 }
1267
1268 #[tokio::test]
1269 async fn has_room_returns_true_for_created() {
1270 let daemon = DaemonState::new(DaemonConfig::default());
1271 daemon.create_room("room-a").await.unwrap();
1272 assert!(daemon.has_room("room-a").await);
1273 assert!(!daemon.has_room("room-b").await);
1274 }
1275
1276 #[tokio::test]
1277 async fn destroy_room_removes_it() {
1278 let daemon = DaemonState::new(DaemonConfig::default());
1279 daemon.create_room("doomed").await.unwrap();
1280 assert!(daemon.destroy_room("doomed").await.is_ok());
1281 assert!(!daemon.has_room("doomed").await);
1282 }
1283
1284 #[tokio::test]
1285 async fn destroy_nonexistent_room_fails() {
1286 let daemon = DaemonState::new(DaemonConfig::default());
1287 let result = daemon.destroy_room("nope").await;
1288 assert!(result.is_err());
1289 assert!(result.unwrap_err().contains("not found"));
1290 }
1291
1292 #[tokio::test]
1293 async fn destroy_room_signals_shutdown() {
1294 let daemon = DaemonState::new(DaemonConfig::default());
1295 daemon.create_room("shutme").await.unwrap();
1296 let state = get_room(&daemon, "shutme").await;
1297 let rx = state.shutdown.subscribe();
1298 assert!(!*rx.borrow());
1299
1300 daemon.destroy_room("shutme").await.unwrap();
1301 assert!(*rx.borrow());
1303 }
1304
1305 #[tokio::test]
1306 async fn list_rooms_returns_all() {
1307 let daemon = DaemonState::new(DaemonConfig::default());
1308 daemon.create_room("alpha").await.unwrap();
1309 daemon.create_room("beta").await.unwrap();
1310 daemon.create_room("gamma").await.unwrap();
1311
1312 let mut rooms = daemon.list_rooms().await;
1313 rooms.sort();
1314 assert_eq!(rooms, vec!["alpha", "beta", "gamma"]);
1315 }
1316
1317 #[tokio::test]
1318 async fn list_rooms_empty_initially() {
1319 let daemon = DaemonState::new(DaemonConfig::default());
1320 assert!(daemon.list_rooms().await.is_empty());
1321 }
1322
1323 #[tokio::test]
1324 async fn create_room_initializes_plugins() {
1325 let daemon = DaemonState::new(DaemonConfig::default());
1326 daemon.create_room("plugtest").await.unwrap();
1327 let state = get_room(&daemon, "plugtest").await;
1328 assert!(state.plugin_registry.resolve("help").is_none());
1330 assert!(state.plugin_registry.resolve("stats").is_some());
1331 }
1332
1333 #[test]
1336 fn config_chat_path_format() {
1337 let config = DaemonConfig {
1338 data_dir: PathBuf::from("/var/room"),
1339 ..DaemonConfig::default()
1340 };
1341 assert_eq!(
1342 config.chat_path("myroom"),
1343 PathBuf::from("/var/room/myroom.chat")
1344 );
1345 }
1346
1347 #[test]
1348 fn config_default_socket_path() {
1349 let config = DaemonConfig::default();
1350 assert_eq!(config.socket_path, crate::paths::room_socket_path());
1351 }
1352
1353 #[tokio::test]
1356 async fn create_room_with_dm_config() {
1357 let daemon = DaemonState::new(DaemonConfig::default());
1358 let config = room_protocol::RoomConfig::dm("alice", "bob");
1359 assert!(daemon
1360 .create_room_with_config("dm-alice-bob", config)
1361 .await
1362 .is_ok());
1363
1364 let state = get_room(&daemon, "dm-alice-bob").await;
1365 let cfg = state.config.as_ref().unwrap();
1366 assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1367 assert_eq!(cfg.max_members, Some(2));
1368 assert!(cfg.invite_list.contains("alice"));
1369 assert!(cfg.invite_list.contains("bob"));
1370 }
1371
1372 #[tokio::test]
1373 async fn create_room_with_config_duplicate_fails() {
1374 let daemon = DaemonState::new(DaemonConfig::default());
1375 let config = room_protocol::RoomConfig::public("owner");
1376 daemon
1377 .create_room_with_config("dup", config.clone())
1378 .await
1379 .unwrap();
1380 assert!(daemon.create_room_with_config("dup", config).await.is_err());
1381 }
1382
1383 #[tokio::test]
1384 async fn get_room_config_returns_none_for_unconfigured() {
1385 let daemon = DaemonState::new(DaemonConfig::default());
1386 daemon.create_room("plain").await.unwrap();
1387 assert!(daemon.get_room_config("plain").await.is_none());
1388 }
1389
1390 #[tokio::test]
1391 async fn get_room_config_returns_config_when_present() {
1392 let daemon = DaemonState::new(DaemonConfig::default());
1393 let config = room_protocol::RoomConfig::dm("alice", "bob");
1394 daemon
1395 .create_room_with_config("dm-alice-bob", config)
1396 .await
1397 .unwrap();
1398 let cfg = daemon.get_room_config("dm-alice-bob").await.unwrap();
1399 assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1400 }
1401
1402 #[tokio::test]
1403 async fn dm_room_id_deterministic_and_lookup_works() {
1404 let daemon = DaemonState::new(DaemonConfig::default());
1405 let room_id = room_protocol::dm_room_id("bob", "alice").unwrap();
1406 assert_eq!(room_id, "dm-alice-bob");
1407
1408 let config = room_protocol::RoomConfig::dm("bob", "alice");
1409 daemon
1410 .create_room_with_config(&room_id, config)
1411 .await
1412 .unwrap();
1413 assert!(daemon.has_room("dm-alice-bob").await);
1414 assert_eq!(
1416 room_protocol::dm_room_id("alice", "bob").unwrap(),
1417 "dm-alice-bob"
1418 );
1419 }
1420
1421 #[test]
1424 fn valid_room_ids() {
1425 for id in [
1426 "lobby",
1427 "agent-room-2",
1428 "my_room",
1429 "Room.1",
1430 "dm-alice-bob",
1431 "a",
1432 &"x".repeat(MAX_ROOM_ID_LEN),
1433 ] {
1434 assert!(validate_room_id(id).is_ok(), "should accept: {id:?}");
1435 }
1436 }
1437
1438 #[test]
1439 fn empty_room_id_rejected() {
1440 let err = validate_room_id("").unwrap_err();
1441 assert!(err.contains("empty"), "{err}");
1442 }
1443
1444 #[test]
1445 fn room_id_too_long_rejected() {
1446 let long = "x".repeat(MAX_ROOM_ID_LEN + 1);
1447 let err = validate_room_id(&long).unwrap_err();
1448 assert!(err.contains("too long"), "{err}");
1449 }
1450
1451 #[test]
1452 fn dot_dot_traversal_rejected() {
1453 for id in ["..", "room/../etc", "..secret", "a..b"] {
1454 let err = validate_room_id(id).unwrap_err();
1455 assert!(err.contains(".."), "should reject {id:?}: {err}");
1456 }
1457 }
1458
1459 #[test]
1460 fn single_dot_rejected() {
1461 let err = validate_room_id(".").unwrap_err();
1462 assert!(err.contains(".."), "{err}");
1463 }
1464
1465 #[test]
1466 fn slash_rejected() {
1467 for id in ["room/sub", "/etc/passwd", "a/b/c"] {
1468 let err = validate_room_id(id).unwrap_err();
1469 assert!(err.contains("unsafe"), "should reject {id:?}: {err}");
1470 }
1471 }
1472
1473 #[test]
1474 fn backslash_rejected() {
1475 let err = validate_room_id("room\\sub").unwrap_err();
1476 assert!(err.contains("unsafe"), "{err}");
1477 }
1478
1479 #[test]
1480 fn null_byte_rejected() {
1481 let err = validate_room_id("room\0id").unwrap_err();
1482 assert!(err.contains("unsafe"), "{err}");
1483 }
1484
1485 #[test]
1486 fn whitespace_rejected() {
1487 for id in ["room name", "room\tid", "room\nid", " leading", "trailing "] {
1488 let err = validate_room_id(id).unwrap_err();
1489 assert!(err.contains("whitespace"), "should reject {id:?}: {err}");
1490 }
1491 }
1492
1493 #[test]
1494 fn other_unsafe_chars_rejected() {
1495 for ch in [':', '*', '?', '"', '<', '>', '|'] {
1496 let id = format!("room{ch}id");
1497 let err = validate_room_id(&id).unwrap_err();
1498 assert!(err.contains("unsafe"), "should reject {ch:?}: {err}");
1499 }
1500 }
1501
1502 #[tokio::test]
1503 async fn create_room_rejects_invalid_id() {
1504 let daemon = DaemonState::new(DaemonConfig::default());
1505 assert!(daemon.create_room("room/sub").await.is_err());
1506 assert!(daemon.create_room("..").await.is_err());
1507 assert!(daemon.create_room("").await.is_err());
1508 assert!(daemon.create_room("room name").await.is_err());
1509 }
1510
1511 #[tokio::test]
1512 async fn create_room_with_config_rejects_invalid_id() {
1513 let daemon = DaemonState::new(DaemonConfig::default());
1514 let config = room_protocol::RoomConfig::public("owner");
1515 assert!(daemon
1516 .create_room_with_config("../etc", config)
1517 .await
1518 .is_err());
1519 }
1520
1521 #[tokio::test]
1524 async fn dm_room_auto_subscribes_both_participants() {
1525 let daemon = DaemonState::new(DaemonConfig::default());
1526 let config = room_protocol::RoomConfig::dm("alice", "bob");
1527 daemon
1528 .create_room_with_config("dm-alice-bob", config)
1529 .await
1530 .unwrap();
1531
1532 let state = get_room(&daemon, "dm-alice-bob").await;
1533 let subs = state.subscription_map.lock().await;
1534 assert_eq!(subs.len(), 2);
1535 assert_eq!(
1536 subs.get("alice"),
1537 Some(&room_protocol::SubscriptionTier::Full)
1538 );
1539 assert_eq!(
1540 subs.get("bob"),
1541 Some(&room_protocol::SubscriptionTier::Full)
1542 );
1543 }
1544
1545 #[tokio::test]
1546 async fn public_room_starts_with_no_subscriptions() {
1547 let daemon = DaemonState::new(DaemonConfig::default());
1548 let config = room_protocol::RoomConfig::public("owner");
1549 daemon
1550 .create_room_with_config("lobby", config)
1551 .await
1552 .unwrap();
1553
1554 let state = get_room(&daemon, "lobby").await;
1555 let subs = state.subscription_map.lock().await;
1556 assert!(subs.is_empty());
1557 }
1558
1559 #[tokio::test]
1560 async fn unconfigured_room_starts_with_no_subscriptions() {
1561 let daemon = DaemonState::new(DaemonConfig::default());
1562 daemon.create_room("plain").await.unwrap();
1563
1564 let state = get_room(&daemon, "plain").await;
1565 let subs = state.subscription_map.lock().await;
1566 assert!(subs.is_empty());
1567 }
1568
1569 #[tokio::test]
1570 async fn dm_auto_subscribe_uses_full_tier() {
1571 let daemon = DaemonState::new(DaemonConfig::default());
1572 let config = room_protocol::RoomConfig::dm("carol", "dave");
1573 daemon
1574 .create_room_with_config("dm-carol-dave", config)
1575 .await
1576 .unwrap();
1577
1578 let state = get_room(&daemon, "dm-carol-dave").await;
1579 let subs = state.subscription_map.lock().await;
1580 for (_, tier) in subs.iter() {
1582 assert_eq!(*tier, room_protocol::SubscriptionTier::Full);
1583 }
1584 }
1585
1586 #[test]
1587 fn build_initial_subscriptions_dm_populates() {
1588 let config = room_protocol::RoomConfig::dm("alice", "bob");
1589 let subs = build_initial_subscriptions(&config);
1590 assert_eq!(subs.len(), 2);
1591 assert_eq!(subs["alice"], room_protocol::SubscriptionTier::Full);
1592 assert_eq!(subs["bob"], room_protocol::SubscriptionTier::Full);
1593 }
1594
1595 #[test]
1596 fn build_initial_subscriptions_public_empty() {
1597 let config = room_protocol::RoomConfig::public("owner");
1598 let subs = build_initial_subscriptions(&config);
1599 assert!(subs.is_empty());
1600 }
1601
1602 #[test]
1605 fn default_grace_period_is_30() {
1606 let config = DaemonConfig::default();
1607 assert_eq!(config.grace_period_secs, 30);
1608 }
1609
1610 #[test]
1611 fn custom_grace_period_preserved() {
1612 let config = DaemonConfig {
1613 grace_period_secs: 0,
1614 ..DaemonConfig::default()
1615 };
1616 assert_eq!(config.grace_period_secs, 0);
1617 }
1618
1619 #[tokio::test]
1622 async fn connection_count_starts_at_zero() {
1623 let daemon = DaemonState::new(DaemonConfig::default());
1624 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1625 }
1626
1627 #[tokio::test]
1628 async fn connection_count_increments_and_decrements() {
1629 let count = Arc::new(AtomicUsize::new(0));
1630 count.fetch_add(1, Ordering::SeqCst);
1631 count.fetch_add(1, Ordering::SeqCst);
1632 assert_eq!(count.load(Ordering::SeqCst), 2);
1633 count.fetch_sub(1, Ordering::SeqCst);
1634 assert_eq!(count.load(Ordering::SeqCst), 1);
1635 count.fetch_sub(1, Ordering::SeqCst);
1636 assert_eq!(count.load(Ordering::SeqCst), 0);
1637 }
1638
1639 #[tokio::test]
1643 async fn daemon_exits_on_shutdown_signal() {
1644 let dir = tempfile::TempDir::new().unwrap();
1645 let socket = dir.path().join("test-grace.sock");
1646 std::fs::create_dir_all(dir.path().join("data")).unwrap();
1647 std::fs::create_dir_all(dir.path().join("state")).unwrap();
1648
1649 let config = DaemonConfig {
1650 socket_path: socket.clone(),
1651 data_dir: dir.path().join("data"),
1652 state_dir: dir.path().join("state"),
1653 ws_port: None,
1654 grace_period_secs: 0,
1655 };
1656 let daemon = Arc::new(DaemonState::new(config));
1657 let shutdown = daemon.shutdown_handle();
1658
1659 let daemon2 = Arc::clone(&daemon);
1660 let handle = tokio::spawn(async move { daemon2.run().await });
1661
1662 for _ in 0..100 {
1664 if tokio::net::UnixStream::connect(&socket).await.is_ok() {
1665 break;
1666 }
1667 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1668 }
1669 assert!(
1670 tokio::net::UnixStream::connect(&socket).await.is_ok(),
1671 "daemon socket not ready"
1672 );
1673
1674 let _ = shutdown.send(true);
1676 let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await;
1677 assert!(result.is_ok(), "daemon did not exit within 5s");
1678 assert!(result.unwrap().unwrap().is_ok(), "run() returned error");
1679 }
1680
1681 #[tokio::test]
1685 async fn grace_period_cancelled_by_new_connection() {
1686 let dir = tempfile::TempDir::new().unwrap();
1687 let socket = dir.path().join("test-cancel-grace.sock");
1688
1689 let config = DaemonConfig {
1690 socket_path: socket.clone(),
1691 data_dir: dir.path().join("data"),
1692 state_dir: dir.path().join("state"),
1693 ws_port: None,
1694 grace_period_secs: 60, };
1696 let daemon = DaemonState::new(config);
1697
1698 daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1700 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1701 daemon.connection_count.fetch_sub(1, Ordering::SeqCst);
1702 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1703
1704 daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1706 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1707
1708 assert!(!*daemon.shutdown.borrow());
1710 }
1711
1712 fn write_legacy_token(dir: &std::path::Path, room_id: &str, username: &str, token: &str) {
1716 let name = format!("room-{room_id}-{username}.token");
1717 let data = serde_json::json!({"username": username, "token": token});
1718 std::fs::write(dir.join(name), format!("{data}\n")).unwrap();
1719 }
1720
1721 #[test]
1722 fn migrate_legacy_tmpdir_tokens_imports_token() {
1723 let token_dir = tempfile::TempDir::new().unwrap();
1724 let state_dir = tempfile::TempDir::new().unwrap();
1725 write_legacy_token(token_dir.path(), "lobby", "alice", "legacy-uuid-alice");
1726
1727 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1728
1729 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1735
1736 assert_eq!(registry.validate_token("legacy-uuid-alice"), Some("alice"));
1737 assert!(registry.get_user("alice").is_some());
1738 }
1739
1740 #[test]
1741 fn migrate_legacy_tmpdir_tokens_idempotent() {
1742 let token_dir = tempfile::TempDir::new().unwrap();
1743 let state_dir = tempfile::TempDir::new().unwrap();
1744 write_legacy_token(token_dir.path(), "lobby", "bob", "tok-bob");
1745
1746 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1747 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1748 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1749
1750 assert_eq!(registry.validate_token("tok-bob"), Some("bob"));
1752 let snap = registry.token_snapshot();
1753 assert_eq!(snap.values().filter(|u| u.as_str() == "bob").count(), 1);
1754 }
1755
1756 #[test]
1757 fn migrate_legacy_tmpdir_tokens_skips_non_token_files() {
1758 let token_dir = tempfile::TempDir::new().unwrap();
1759 let state_dir = tempfile::TempDir::new().unwrap();
1760 std::fs::write(token_dir.path().join("roomd.sock"), "not a token").unwrap();
1761 std::fs::write(token_dir.path().join("something.json"), "{}").unwrap();
1762
1763 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1764 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1765
1766 assert!(registry.list_users().is_empty());
1767 }
1768
1769 #[test]
1770 fn migrate_legacy_tmpdir_tokens_skips_malformed_json() {
1771 let token_dir = tempfile::TempDir::new().unwrap();
1772 let state_dir = tempfile::TempDir::new().unwrap();
1773 std::fs::write(token_dir.path().join("room-x-bad.token"), "not-json{{{").unwrap();
1774
1775 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1776 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1777
1778 assert!(registry.list_users().is_empty());
1779 }
1780}