1use std::{
34 collections::HashMap,
35 path::PathBuf,
36 sync::{
37 atomic::{AtomicU64, AtomicUsize, Ordering},
38 Arc,
39 },
40};
41
42use tokio::{
43 net::UnixListener,
44 sync::{broadcast, watch, Mutex},
45};
46
47use crate::registry::UserRegistry;
48
49use super::{
50 handle_oneshot_send,
51 state::{RoomState, TokenMap},
52 ws::{self, DaemonWsState},
53};
54
55const UNSAFE_CHARS: &[char] = &['/', '\\', ':', '*', '?', '"', '<', '>', '|', '\0'];
57
58pub fn write_pid_file(path: &std::path::Path) -> std::io::Result<()> {
62 std::fs::write(path, std::process::id().to_string())
63}
64
65pub fn is_pid_alive(path: &std::path::Path) -> bool {
70 let Ok(contents) = std::fs::read_to_string(path) else {
71 return false;
72 };
73 let Ok(pid) = contents.trim().parse::<u32>() else {
74 return false;
75 };
76 pid_alive(pid)
77}
78
79pub fn remove_pid_file(path: &std::path::Path) {
81 let _ = std::fs::remove_file(path);
82}
83
84#[cfg(unix)]
92fn pid_alive(pid: u32) -> bool {
93 extern "C" {
94 fn kill(pid: i32, sig: i32) -> i32;
95 }
96 let ret = unsafe { kill(pid as i32, 0) };
98 if ret == 0 {
99 return true;
100 }
101 std::io::Error::last_os_error().raw_os_error() == Some(1)
103}
104
105#[cfg(not(unix))]
106fn pid_alive(_pid: u32) -> bool {
107 true
109}
110
111const MAX_ROOM_ID_LEN: usize = 64;
113
114pub fn validate_room_id(room_id: &str) -> Result<(), String> {
119 if room_id.is_empty() {
120 return Err("room ID cannot be empty".into());
121 }
122 if room_id.len() > MAX_ROOM_ID_LEN {
123 return Err(format!(
124 "room ID too long ({} chars, max {MAX_ROOM_ID_LEN})",
125 room_id.len()
126 ));
127 }
128 if room_id == "." || room_id == ".." || room_id.contains("..") {
129 return Err("room ID cannot contain '..'".into());
130 }
131 if room_id.chars().any(|c| c.is_whitespace()) {
132 return Err("room ID cannot contain whitespace".into());
133 }
134 if let Some(bad) = room_id.chars().find(|c| UNSAFE_CHARS.contains(c)) {
135 return Err(format!("room ID contains unsafe character: {bad:?}"));
136 }
137 Ok(())
138}
139
140#[derive(Debug, Clone)]
142pub struct DaemonConfig {
143 pub socket_path: PathBuf,
145 pub data_dir: PathBuf,
148 pub state_dir: PathBuf,
151 pub ws_port: Option<u16>,
153 pub grace_period_secs: u64,
158}
159
160impl DaemonConfig {
161 pub fn chat_path(&self, room_id: &str) -> PathBuf {
163 self.data_dir.join(format!("{room_id}.chat"))
164 }
165
166 pub fn token_map_path(&self, room_id: &str) -> PathBuf {
168 crate::paths::broker_tokens_path(&self.state_dir, room_id)
169 }
170
171 pub fn system_tokens_path(&self) -> PathBuf {
177 self.state_dir.join("tokens.json")
178 }
179
180 pub fn subscription_map_path(&self, room_id: &str) -> PathBuf {
182 crate::paths::broker_subscriptions_path(&self.state_dir, room_id)
183 }
184}
185
186impl Default for DaemonConfig {
187 fn default() -> Self {
188 Self {
189 socket_path: crate::paths::room_socket_path(),
190 data_dir: crate::paths::room_data_dir(),
191 state_dir: crate::paths::room_state_dir(),
192 ws_port: None,
193 grace_period_secs: 30,
194 }
195 }
196}
197
198pub(crate) type RoomMap = Arc<Mutex<HashMap<String, Arc<RoomState>>>>;
200
201pub struct DaemonState {
203 pub(crate) rooms: RoomMap,
204 pub(crate) config: DaemonConfig,
205 pub(crate) next_client_id: Arc<AtomicU64>,
207 pub(crate) shutdown: Arc<watch::Sender<bool>>,
209 pub(crate) system_token_map: TokenMap,
216 pub(crate) user_registry: Arc<tokio::sync::Mutex<UserRegistry>>,
223 pub(crate) connection_count: Arc<AtomicUsize>,
229}
230
231impl DaemonState {
232 pub fn new(config: DaemonConfig) -> Self {
234 let (shutdown_tx, _) = watch::channel(false);
235
236 let registry = load_or_migrate_registry(&config);
243
244 let token_snapshot = registry.token_snapshot();
247
248 Self {
249 rooms: Arc::new(Mutex::new(HashMap::new())),
250 config,
251 next_client_id: Arc::new(AtomicU64::new(0)),
252 shutdown: Arc::new(shutdown_tx),
253 system_token_map: Arc::new(Mutex::new(token_snapshot)),
254 user_registry: Arc::new(tokio::sync::Mutex::new(registry)),
255 connection_count: Arc::new(AtomicUsize::new(0)),
256 }
257 }
258
259 pub async fn create_room(&self, room_id: &str) -> Result<(), String> {
262 create_room_entry(
263 room_id,
264 None,
265 &self.rooms,
266 &self.config,
267 &self.system_token_map,
268 Some(self.user_registry.clone()),
269 )
270 .await
271 }
272
273 pub async fn create_room_with_config(
276 &self,
277 room_id: &str,
278 config: room_protocol::RoomConfig,
279 ) -> Result<(), String> {
280 create_room_entry(
281 room_id,
282 Some(config),
283 &self.rooms,
284 &self.config,
285 &self.system_token_map,
286 Some(self.user_registry.clone()),
287 )
288 .await
289 }
290
291 pub async fn get_room_config(&self, room_id: &str) -> Option<room_protocol::RoomConfig> {
293 self.rooms
294 .lock()
295 .await
296 .get(room_id)
297 .and_then(|s| s.config.clone())
298 }
299
300 pub async fn destroy_room(&self, room_id: &str) -> Result<(), String> {
304 let mut rooms = self.rooms.lock().await;
305 let state = rooms
306 .remove(room_id)
307 .ok_or_else(|| format!("room not found: {room_id}"))?;
308
309 let _ = state.shutdown.send(true);
311 Ok(())
312 }
313
314 pub async fn has_room(&self, room_id: &str) -> bool {
316 self.rooms.lock().await.contains_key(room_id)
317 }
318
319 pub fn shutdown_handle(&self) -> Arc<watch::Sender<bool>> {
321 self.shutdown.clone()
322 }
323
324 pub async fn list_rooms(&self) -> Vec<String> {
326 self.rooms.lock().await.keys().cloned().collect()
327 }
328
329 #[doc(hidden)]
332 pub async fn test_inject_token(
333 &self,
334 room_id: &str,
335 username: &str,
336 token: &str,
337 ) -> Result<(), String> {
338 let rooms = self.rooms.lock().await;
339 let room = rooms
340 .get(room_id)
341 .ok_or_else(|| format!("room not found: {room_id}"))?;
342 room.token_map
343 .lock()
344 .await
345 .insert(token.to_owned(), username.to_owned());
346 Ok(())
347 }
348
349 pub async fn run(&self) -> anyhow::Result<()> {
357 let pid_path = if self.config.socket_path == crate::paths::room_socket_path() {
361 match write_pid_file(&crate::paths::room_pid_path()) {
362 Ok(()) => Some(crate::paths::room_pid_path()),
363 Err(e) => {
364 eprintln!("[daemon] failed to write PID file: {e}");
365 None
366 }
367 }
368 } else {
369 None
370 };
371
372 if self.config.socket_path.exists() {
374 std::fs::remove_file(&self.config.socket_path)?;
375 }
376
377 let listener = UnixListener::bind(&self.config.socket_path)?;
378 eprintln!(
379 "[daemon] listening on {}",
380 self.config.socket_path.display()
381 );
382
383 let mut shutdown_rx = self.shutdown.subscribe();
384 let grace_duration = tokio::time::Duration::from_secs(self.config.grace_period_secs);
385
386 let (close_tx, mut close_rx) = tokio::sync::mpsc::channel::<()>(64);
388
389 let mut grace_sleep: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
391
392 if let Some(port) = self.config.ws_port {
394 let ws_state = DaemonWsState {
395 rooms: self.rooms.clone(),
396 next_client_id: self.next_client_id.clone(),
397 config: self.config.clone(),
398 system_token_map: self.system_token_map.clone(),
399 user_registry: self.user_registry.clone(),
400 };
401 let app = ws::create_daemon_router(ws_state);
402 let tcp = tokio::net::TcpListener::bind(("0.0.0.0", port)).await?;
403 eprintln!("[daemon] WebSocket/REST listening on port {port}");
404 tokio::spawn(async move {
405 if let Err(e) = axum::serve(tcp, app).await {
406 eprintln!("[daemon] WS server error: {e}");
407 }
408 });
409 }
410
411 let result = loop {
412 let grace_fut = async {
415 match grace_sleep.as_mut() {
416 Some(s) => {
417 s.await;
418 }
419 None => std::future::pending::<()>().await,
420 }
421 };
422
423 tokio::select! {
424 accept = listener.accept() => {
425 let (stream, _) = match accept {
426 Ok(a) => a,
427 Err(e) => break Err(e.into()),
428 };
429 grace_sleep = None;
431
432 let count = self.connection_count.clone();
433 count.fetch_add(1, Ordering::SeqCst);
434 let rooms = self.rooms.clone();
435 let next_id = self.next_client_id.clone();
436 let cfg = self.config.clone();
437 let sys_tokens = self.system_token_map.clone();
438 let registry = self.user_registry.clone();
439 let tx = close_tx.clone();
440
441 tokio::spawn(async move {
442 if let Err(e) = dispatch_connection(stream, &rooms, &next_id, &cfg, &sys_tokens, ®istry).await {
443 eprintln!("[daemon] connection error: {e:#}");
444 }
445 count.fetch_sub(1, Ordering::SeqCst);
446 let _ = tx.send(()).await;
448 });
449 }
450 Some(()) = close_rx.recv() => {
451 if self.connection_count.load(Ordering::SeqCst) == 0 {
453 eprintln!(
454 "[daemon] no connections — grace period {}s started",
455 self.config.grace_period_secs
456 );
457 grace_sleep =
458 Some(Box::pin(tokio::time::sleep(grace_duration)));
459 }
460 }
461 _ = grace_fut => {
462 eprintln!("[daemon] grace period expired, shutting down");
463 let _ = self.shutdown.send(true);
464 break Ok(());
467 }
468 _ = shutdown_rx.changed() => {
469 eprintln!("[daemon] shutdown requested, exiting");
470 if let Some(ref p) = pid_path {
471 remove_pid_file(p);
472 }
473 break Ok(());
474 }
475 }
476 };
477
478 let _ = std::fs::remove_file(&self.config.socket_path);
480 let _ = std::fs::remove_file(crate::paths::room_pid_path());
481 for room_id in self.list_rooms().await {
483 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
484 }
485
486 result
487 }
488}
489
490fn load_or_migrate_registry(config: &DaemonConfig) -> UserRegistry {
500 let users_path = config.state_dir.join("users.json");
501
502 let mut registry = if users_path.exists() {
503 UserRegistry::load(config.state_dir.clone()).unwrap_or_else(|e| {
505 eprintln!("[daemon] failed to load user registry: {e}; starting empty");
506 UserRegistry::new(config.state_dir.clone())
507 })
508 } else {
509 let tokens_path = config.system_tokens_path();
511 if tokens_path.exists() {
512 let legacy = super::auth::load_token_map(&tokens_path);
513 if !legacy.is_empty() {
514 eprintln!(
515 "[daemon] migrating {} token(s) from tokens.json to users.json",
516 legacy.len()
517 );
518 let mut reg = UserRegistry::new(config.state_dir.clone());
519 for (token, username) in &legacy {
520 if let Err(e) = reg.register_user_idempotent(username) {
522 eprintln!("[daemon] migration: register {username}: {e}");
523 continue;
524 }
525 let _ = reg.issue_token(username);
532 let _ = token; }
534 if let Err(e) = reg.save() {
535 eprintln!("[daemon] migration save failed: {e}");
536 }
537 reg
538 } else {
539 UserRegistry::new(config.state_dir.clone())
541 }
542 } else {
543 UserRegistry::new(config.state_dir.clone())
545 }
546 };
547
548 migrate_legacy_tmpdir_tokens(&mut registry);
552
553 registry
554}
555
556fn migrate_legacy_tmpdir_tokens(registry: &mut UserRegistry) {
566 let legacy_dir = crate::paths::legacy_token_dir();
567 migrate_legacy_tmpdir_tokens_from(&legacy_dir, registry);
568}
569
570fn migrate_legacy_tmpdir_tokens_from(legacy_dir: &std::path::Path, registry: &mut UserRegistry) {
574 let entries = match std::fs::read_dir(legacy_dir) {
575 Ok(e) => e,
576 Err(_) => return,
577 };
578 let mut count = 0usize;
579 for entry in entries.filter_map(|e| e.ok()) {
580 let path = entry.path();
581 let name = match path.file_name().and_then(|n| n.to_str()) {
582 Some(n) => n.to_owned(),
583 None => continue,
584 };
585 if !name.starts_with("room-") || !name.ends_with(".token") {
586 continue;
587 }
588 let data = match std::fs::read_to_string(&path) {
589 Ok(d) => d,
590 Err(_) => continue,
591 };
592 let v: serde_json::Value = match serde_json::from_str(data.trim()) {
593 Ok(v) => v,
594 Err(_) => continue,
595 };
596 let (username, token) = match (v["username"].as_str(), v["token"].as_str()) {
597 (Some(u), Some(t)) if !u.is_empty() && !t.is_empty() => (u.to_owned(), t.to_owned()),
598 _ => continue,
599 };
600 if let Err(e) = registry.register_user_idempotent(&username) {
601 eprintln!("[daemon] legacy token migration: register {username}: {e}");
602 continue;
603 }
604 match registry.import_token(&username, &token) {
605 Ok(()) => count += 1,
606 Err(e) => {
607 eprintln!("[daemon] legacy token migration: import token for {username}: {e}")
608 }
609 }
610 }
611 if count > 0 {
612 eprintln!(
613 "[daemon] imported {count} legacy token(s) from {}",
614 legacy_dir.display()
615 );
616 }
617}
618
619fn build_initial_subscriptions(
626 config: &room_protocol::RoomConfig,
627) -> HashMap<String, room_protocol::SubscriptionTier> {
628 let mut subs = HashMap::new();
629 if config.visibility == room_protocol::RoomVisibility::Dm {
630 for user in &config.invite_list {
631 subs.insert(user.clone(), room_protocol::SubscriptionTier::Full);
632 }
633 }
634 subs
635}
636
637pub(crate) async fn create_room_entry(
647 room_id: &str,
648 config: Option<room_protocol::RoomConfig>,
649 rooms: &RoomMap,
650 daemon_config: &DaemonConfig,
651 system_token_map: &TokenMap,
652 registry: Option<Arc<tokio::sync::Mutex<UserRegistry>>>,
653) -> Result<(), String> {
654 validate_room_id(room_id)?;
655 {
656 let map = rooms.lock().await;
657 if map.contains_key(room_id) {
658 return Err(format!("room already exists: {room_id}"));
659 }
660 }
661
662 let chat_path = daemon_config.chat_path(room_id);
663 let subscription_map_path = daemon_config.subscription_map_path(room_id);
664
665 let persisted_subs = super::commands::load_subscription_map(&subscription_map_path);
666 let merged_subs = if let Some(ref cfg) = config {
667 let mut initial = build_initial_subscriptions(cfg);
668 initial.extend(persisted_subs);
669 initial
670 } else {
671 persisted_subs
672 };
673
674 let state = RoomState::new(
677 room_id.to_owned(),
678 chat_path,
679 daemon_config.system_tokens_path(),
680 subscription_map_path,
681 Arc::clone(system_token_map),
682 Arc::new(Mutex::new(merged_subs)),
683 config,
684 )?;
685 if let Some(reg) = registry {
686 state.set_registry(reg);
687 }
688
689 rooms.lock().await.insert(room_id.to_owned(), state);
690
691 let meta_path = crate::paths::room_meta_path(room_id);
695 let chat_path_str = daemon_config.chat_path(room_id);
696 let meta_json = serde_json::json!({ "chat_path": chat_path_str });
697 let _ = std::fs::write(&meta_path, meta_json.to_string());
698
699 Ok(())
700}
701
702async fn handle_destroy(
713 room_id: &str,
714 reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
715 write_half: &mut tokio::net::unix::OwnedWriteHalf,
716 rooms: &RoomMap,
717 user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
718) -> anyhow::Result<()> {
719 use tokio::io::AsyncWriteExt;
720
721 if room_id.is_empty() {
722 let err = serde_json::json!({
723 "type": "error",
724 "code": "invalid_room_id",
725 "message": "room ID is empty"
726 });
727 write_half.write_all(format!("{err}\n").as_bytes()).await?;
728 return Ok(());
729 }
730
731 let mut token_line = String::new();
733 super::read_line_limited(reader, &mut token_line).await?;
734 let token = token_line.trim();
735
736 if token.is_empty() {
737 let err = serde_json::json!({
738 "type": "error",
739 "code": "missing_token",
740 "message": "DESTROY requires a valid token on the second line"
741 });
742 write_half.write_all(format!("{err}\n").as_bytes()).await?;
743 return Ok(());
744 }
745
746 {
748 let reg = user_registry.lock().await;
749 if reg.validate_token(token).is_none() {
750 let err = serde_json::json!({
751 "type": "error",
752 "code": "invalid_token",
753 "message": "token is not valid"
754 });
755 write_half.write_all(format!("{err}\n").as_bytes()).await?;
756 return Ok(());
757 }
758 }
759
760 let state = {
762 let mut map = rooms.lock().await;
763 map.remove(room_id)
764 };
765
766 match state {
767 Some(s) => {
768 let _ = s.shutdown.send(true);
770 let ok = serde_json::json!({
771 "type": "room_destroyed",
772 "room": room_id
773 });
774 write_half.write_all(format!("{ok}\n").as_bytes()).await?;
775 }
776 None => {
777 let err = serde_json::json!({
778 "type": "error",
779 "code": "room_not_found",
780 "room": room_id
781 });
782 write_half.write_all(format!("{err}\n").as_bytes()).await?;
783 }
784 }
785
786 Ok(())
787}
788
789async fn handle_create(
796 room_id: &str,
797 reader: &mut tokio::io::BufReader<tokio::net::unix::OwnedReadHalf>,
798 write_half: &mut tokio::net::unix::OwnedWriteHalf,
799 rooms: &RoomMap,
800 daemon_config: &DaemonConfig,
801 system_token_map: &TokenMap,
802 user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
803) -> anyhow::Result<()> {
804 use tokio::io::AsyncWriteExt;
805
806 if let Err(e) = validate_room_id(room_id) {
808 let err = serde_json::json!({
809 "type": "error",
810 "code": "invalid_room_id",
811 "message": e
812 });
813 write_half.write_all(format!("{err}\n").as_bytes()).await?;
814 return Ok(());
815 }
816
817 {
819 let map = rooms.lock().await;
820 if map.contains_key(room_id) {
821 let err = serde_json::json!({
822 "type": "error",
823 "code": "room_exists",
824 "message": format!("room already exists: {room_id}")
825 });
826 write_half.write_all(format!("{err}\n").as_bytes()).await?;
827 return Ok(());
828 }
829 }
830
831 let mut config_line = String::new();
833 super::read_line_limited(reader, &mut config_line).await?;
834 let config_str = config_line.trim();
835
836 let (visibility_str, invite, token): (String, Vec<String>, Option<String>) =
837 if config_str.is_empty() {
838 ("public".into(), vec![], None)
839 } else {
840 let v: serde_json::Value = match serde_json::from_str(config_str) {
841 Ok(v) => v,
842 Err(e) => {
843 let err = serde_json::json!({
844 "type": "error",
845 "code": "invalid_config",
846 "message": format!("invalid config JSON: {e}")
847 });
848 write_half.write_all(format!("{err}\n").as_bytes()).await?;
849 return Ok(());
850 }
851 };
852 let vis = v["visibility"].as_str().unwrap_or("public").to_owned();
853 let inv = v["invite"]
854 .as_array()
855 .map(|arr| {
856 arr.iter()
857 .filter_map(|v| v.as_str().map(|s| s.to_owned()))
858 .collect()
859 })
860 .unwrap_or_default();
861 let tok = v["token"].as_str().map(|s| s.to_owned());
862 (vis, inv, tok)
863 };
864
865 let token_str = match token.as_deref() {
867 Some(t) if !t.is_empty() => t,
868 _ => {
869 let err = serde_json::json!({
870 "type": "error",
871 "code": "missing_token",
872 "message": "CREATE requires a valid token in the config JSON"
873 });
874 write_half.write_all(format!("{err}\n").as_bytes()).await?;
875 return Ok(());
876 }
877 };
878 {
879 let reg = user_registry.lock().await;
880 if reg.validate_token(token_str).is_none() {
881 let err = serde_json::json!({
882 "type": "error",
883 "code": "invalid_token",
884 "message": "token is not valid"
885 });
886 write_half.write_all(format!("{err}\n").as_bytes()).await?;
887 return Ok(());
888 }
889 }
890
891 let room_config = match visibility_str.as_str() {
893 "public" => room_protocol::RoomConfig {
894 visibility: room_protocol::RoomVisibility::Public,
895 max_members: None,
896 invite_list: invite.into_iter().collect(),
897 created_by: "system".to_owned(),
898 created_at: chrono::Utc::now().to_rfc3339(),
899 },
900 "private" => room_protocol::RoomConfig {
901 visibility: room_protocol::RoomVisibility::Private,
902 max_members: None,
903 invite_list: invite.into_iter().collect(),
904 created_by: "system".to_owned(),
905 created_at: chrono::Utc::now().to_rfc3339(),
906 },
907 "dm" => {
908 if invite.len() != 2 {
909 let err = serde_json::json!({
910 "type": "error",
911 "code": "invalid_config",
912 "message": "dm visibility requires exactly 2 users in invite list"
913 });
914 write_half.write_all(format!("{err}\n").as_bytes()).await?;
915 return Ok(());
916 }
917 room_protocol::RoomConfig::dm(&invite[0], &invite[1])
918 }
919 other => {
920 let err = serde_json::json!({
921 "type": "error",
922 "code": "invalid_config",
923 "message": format!("unknown visibility: {other}")
924 });
925 write_half.write_all(format!("{err}\n").as_bytes()).await?;
926 return Ok(());
927 }
928 };
929
930 if let Err(e) = create_room_entry(
932 room_id,
933 Some(room_config),
934 rooms,
935 daemon_config,
936 system_token_map,
937 Some(user_registry.clone()),
938 )
939 .await
940 {
941 let err = serde_json::json!({
942 "type": "error",
943 "code": "internal",
944 "message": e
945 });
946 write_half.write_all(format!("{err}\n").as_bytes()).await?;
947 return Ok(());
948 }
949
950 let ok = serde_json::json!({
951 "type": "room_created",
952 "room": room_id
953 });
954 write_half.write_all(format!("{ok}\n").as_bytes()).await?;
955 Ok(())
956}
957
958async fn handle_global_join(
963 username: &str,
964 write_half: &mut tokio::net::unix::OwnedWriteHalf,
965 registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
966) -> anyhow::Result<()> {
967 use tokio::io::AsyncWriteExt;
968
969 let mut reg = registry.lock().await;
970
971 let token = if reg.has_token_for_user(username) {
973 reg.token_snapshot()
975 .into_iter()
976 .find(|(_, u)| u == username)
977 .map(|(t, _)| t)
978 .expect("has_token_for_user was true but no token found")
979 } else {
980 reg.register_user_idempotent(username)
981 .map_err(|e| anyhow::anyhow!("registration failed: {e}"))?;
982 reg.issue_token(username)
983 .map_err(|e| anyhow::anyhow!("token issuance failed: {e}"))?
984 };
985
986 let resp = serde_json::json!({
987 "type": "token",
988 "token": token,
989 "username": username
990 });
991 write_half.write_all(format!("{resp}\n").as_bytes()).await?;
992 Ok(())
993}
994
995async fn dispatch_connection(
1001 stream: tokio::net::UnixStream,
1002 rooms: &RoomMap,
1003 next_client_id: &Arc<AtomicU64>,
1004 daemon_config: &DaemonConfig,
1005 system_token_map: &TokenMap,
1006 user_registry: &Arc<tokio::sync::Mutex<UserRegistry>>,
1007) -> anyhow::Result<()> {
1008 use tokio::io::{AsyncWriteExt, BufReader};
1009
1010 let (read_half, mut write_half) = stream.into_split();
1011 let mut reader = BufReader::new(read_half);
1012
1013 let mut first = String::new();
1014 super::read_line_limited(&mut reader, &mut first).await?;
1015 let first_line = first.trim();
1016
1017 if first_line.is_empty() {
1018 return Ok(());
1019 }
1020
1021 use super::handshake::{
1022 parse_client_handshake, parse_daemon_prefix, ClientHandshake, DaemonPrefix,
1023 };
1024 let (room_id, rest) = match parse_daemon_prefix(first_line) {
1025 DaemonPrefix::Destroy(room_id) => {
1026 return handle_destroy(&room_id, &mut reader, &mut write_half, rooms, user_registry)
1027 .await;
1028 }
1029 DaemonPrefix::Create(room_id) => {
1030 return handle_create(
1031 &room_id,
1032 &mut reader,
1033 &mut write_half,
1034 rooms,
1035 daemon_config,
1036 system_token_map,
1037 user_registry,
1038 )
1039 .await;
1040 }
1041 DaemonPrefix::Join(username) => {
1042 return handle_global_join(&username, &mut write_half, user_registry).await;
1043 }
1044 DaemonPrefix::Room { room_id, rest } => (room_id, rest),
1045 DaemonPrefix::Unknown => {
1046 let err = serde_json::json!({
1047 "type": "error",
1048 "code": "missing_room_prefix",
1049 "message": "daemon mode requires ROOM:<room_id>: or CREATE:<room_id> prefix"
1050 });
1051 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1052 return Ok(());
1053 }
1054 };
1055
1056 let state = {
1058 let map = rooms.lock().await;
1059 map.get(room_id.as_str()).cloned()
1060 };
1061
1062 let state = match state {
1063 Some(s) => s,
1064 None => {
1065 let err = serde_json::json!({
1066 "type": "error",
1067 "code": "room_not_found",
1068 "room": room_id
1069 });
1070 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1071 return Ok(());
1072 }
1073 };
1074
1075 let cid = next_client_id.fetch_add(1, Ordering::SeqCst) + 1;
1076
1077 let username = match parse_client_handshake(&rest) {
1079 ClientHandshake::Send(u) => {
1080 eprintln!(
1081 "[broker/daemon] DEPRECATED: SEND:{u} handshake used — \
1082 migrate to TOKEN:<uuid> (SEND: will be removed in a future version)"
1083 );
1084 return handle_oneshot_send(u, reader, write_half, &state).await;
1085 }
1086 ClientHandshake::Token(token) => {
1087 let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1089 Some(u) => Some(u),
1090 None => {
1091 let reg = user_registry.lock().await;
1092 reg.validate_token(&token).map(|u| u.to_owned())
1093 }
1094 };
1095 return match resolved {
1096 Some(u) => handle_oneshot_send(u, reader, write_half, &state).await,
1097 None => {
1098 let err = serde_json::json!({"type":"error","code":"invalid_token"});
1099 write_half
1100 .write_all(format!("{err}\n").as_bytes())
1101 .await
1102 .map_err(Into::into)
1103 }
1104 };
1105 }
1106 ClientHandshake::Join(u) => {
1107 let result = super::auth::handle_oneshot_join_with_registry(
1108 u,
1109 write_half,
1110 user_registry,
1111 &state.token_map,
1112 &state.subscription_map,
1113 state.config.as_ref(),
1114 )
1115 .await;
1116 super::commands::persist_subscriptions(&state).await;
1118 return result;
1119 }
1120 ClientHandshake::Session(token) => {
1121 let resolved = match super::auth::validate_token(&token, &state.token_map).await {
1123 Some(u) => Some(u),
1124 None => {
1125 let reg = user_registry.lock().await;
1126 reg.validate_token(&token).map(|u| u.to_owned())
1127 }
1128 };
1129 match resolved {
1130 Some(u) => u,
1131 None => {
1132 let err = serde_json::json!({"type":"error","code":"invalid_token"});
1133 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1134 return Ok(());
1135 }
1136 }
1137 }
1138 ClientHandshake::Interactive(u) => {
1139 eprintln!(
1140 "[broker/daemon] DEPRECATED: unauthenticated interactive join for '{u}' — \
1141 migrate to SESSION:<token>"
1142 );
1143 u
1144 }
1145 };
1146
1147 if username.is_empty() {
1149 return Ok(());
1150 }
1151
1152 if let Err(reason) = super::auth::check_join_permission(&username, state.config.as_ref()) {
1154 let err = serde_json::json!({
1155 "type": "error",
1156 "code": "join_denied",
1157 "message": reason,
1158 "username": username
1159 });
1160 write_half.write_all(format!("{err}\n").as_bytes()).await?;
1161 return Ok(());
1162 }
1163
1164 let (tx, _) = broadcast::channel::<String>(256);
1166 state
1167 .clients
1168 .lock()
1169 .await
1170 .insert(cid, (String::new(), tx.clone()));
1171
1172 let result =
1173 super::run_interactive_session(cid, &username, reader, write_half, tx, &state).await;
1174
1175 state.clients.lock().await.remove(&cid);
1176 result
1177}
1178
1179#[cfg(test)]
1182mod tests {
1183 use super::*;
1184
1185 #[test]
1188 fn write_pid_file_creates_file_with_current_pid() {
1189 let dir = tempfile::TempDir::new().unwrap();
1190 let path = dir.path().join("test.pid");
1191 write_pid_file(&path).unwrap();
1192 let content = std::fs::read_to_string(&path).unwrap();
1193 let pid: u32 = content.trim().parse().expect("PID should be a number");
1194 assert_eq!(pid, std::process::id());
1195 }
1196
1197 #[test]
1198 fn is_pid_alive_true_for_current_process() {
1199 let dir = tempfile::TempDir::new().unwrap();
1200 let path = dir.path().join("test.pid");
1201 write_pid_file(&path).unwrap();
1202 assert!(is_pid_alive(&path), "current process should be alive");
1203 }
1204
1205 #[test]
1206 fn is_pid_alive_false_for_missing_file() {
1207 let path = std::path::Path::new("/tmp/nonexistent-room-test-99999999.pid");
1208 assert!(!is_pid_alive(path));
1209 }
1210
1211 #[test]
1212 fn remove_pid_file_deletes_file() {
1213 let dir = tempfile::TempDir::new().unwrap();
1214 let path = dir.path().join("remove.pid");
1215 write_pid_file(&path).unwrap();
1216 assert!(path.exists());
1217 remove_pid_file(&path);
1218 assert!(!path.exists());
1219 }
1220
1221 #[test]
1222 fn remove_pid_file_noop_when_missing() {
1223 let path = std::path::Path::new("/tmp/gone-99999999.pid");
1225 remove_pid_file(path); }
1227
1228 async fn get_room(daemon: &DaemonState, room_id: &str) -> Arc<RoomState> {
1232 daemon
1233 .rooms
1234 .lock()
1235 .await
1236 .get(room_id)
1237 .cloned()
1238 .unwrap_or_else(|| panic!("room {room_id} not found"))
1239 }
1240
1241 #[tokio::test]
1242 async fn create_room_succeeds() {
1243 let daemon = DaemonState::new(DaemonConfig::default());
1244 assert!(daemon.create_room("test-room").await.is_ok());
1245 let state = get_room(&daemon, "test-room").await;
1246 assert_eq!(*state.room_id, "test-room");
1247 }
1248
1249 #[tokio::test]
1250 async fn create_duplicate_room_fails() {
1251 let daemon = DaemonState::new(DaemonConfig::default());
1252 daemon.create_room("dup").await.unwrap();
1253 let result = daemon.create_room("dup").await;
1254 assert!(result.is_err());
1255 assert!(result.unwrap_err().contains("already exists"));
1256 }
1257
1258 #[tokio::test]
1259 async fn has_room_returns_true_for_created() {
1260 let daemon = DaemonState::new(DaemonConfig::default());
1261 daemon.create_room("room-a").await.unwrap();
1262 assert!(daemon.has_room("room-a").await);
1263 assert!(!daemon.has_room("room-b").await);
1264 }
1265
1266 #[tokio::test]
1267 async fn destroy_room_removes_it() {
1268 let daemon = DaemonState::new(DaemonConfig::default());
1269 daemon.create_room("doomed").await.unwrap();
1270 assert!(daemon.destroy_room("doomed").await.is_ok());
1271 assert!(!daemon.has_room("doomed").await);
1272 }
1273
1274 #[tokio::test]
1275 async fn destroy_nonexistent_room_fails() {
1276 let daemon = DaemonState::new(DaemonConfig::default());
1277 let result = daemon.destroy_room("nope").await;
1278 assert!(result.is_err());
1279 assert!(result.unwrap_err().contains("not found"));
1280 }
1281
1282 #[tokio::test]
1283 async fn destroy_room_signals_shutdown() {
1284 let daemon = DaemonState::new(DaemonConfig::default());
1285 daemon.create_room("shutme").await.unwrap();
1286 let state = get_room(&daemon, "shutme").await;
1287 let rx = state.shutdown.subscribe();
1288 assert!(!*rx.borrow());
1289
1290 daemon.destroy_room("shutme").await.unwrap();
1291 assert!(*rx.borrow());
1293 }
1294
1295 #[tokio::test]
1296 async fn list_rooms_returns_all() {
1297 let daemon = DaemonState::new(DaemonConfig::default());
1298 daemon.create_room("alpha").await.unwrap();
1299 daemon.create_room("beta").await.unwrap();
1300 daemon.create_room("gamma").await.unwrap();
1301
1302 let mut rooms = daemon.list_rooms().await;
1303 rooms.sort();
1304 assert_eq!(rooms, vec!["alpha", "beta", "gamma"]);
1305 }
1306
1307 #[tokio::test]
1308 async fn list_rooms_empty_initially() {
1309 let daemon = DaemonState::new(DaemonConfig::default());
1310 assert!(daemon.list_rooms().await.is_empty());
1311 }
1312
1313 #[tokio::test]
1314 async fn create_room_initializes_plugins() {
1315 let daemon = DaemonState::new(DaemonConfig::default());
1316 daemon.create_room("plugtest").await.unwrap();
1317 let state = get_room(&daemon, "plugtest").await;
1318 assert!(state.plugin_registry.resolve("help").is_some());
1320 assert!(state.plugin_registry.resolve("stats").is_some());
1321 }
1322
1323 #[test]
1326 fn config_chat_path_format() {
1327 let config = DaemonConfig {
1328 data_dir: PathBuf::from("/var/room"),
1329 ..DaemonConfig::default()
1330 };
1331 assert_eq!(
1332 config.chat_path("myroom"),
1333 PathBuf::from("/var/room/myroom.chat")
1334 );
1335 }
1336
1337 #[test]
1338 fn config_default_socket_path() {
1339 let config = DaemonConfig::default();
1340 assert_eq!(config.socket_path, crate::paths::room_socket_path());
1341 }
1342
1343 #[tokio::test]
1346 async fn create_room_with_dm_config() {
1347 let daemon = DaemonState::new(DaemonConfig::default());
1348 let config = room_protocol::RoomConfig::dm("alice", "bob");
1349 assert!(daemon
1350 .create_room_with_config("dm-alice-bob", config)
1351 .await
1352 .is_ok());
1353
1354 let state = get_room(&daemon, "dm-alice-bob").await;
1355 let cfg = state.config.as_ref().unwrap();
1356 assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1357 assert_eq!(cfg.max_members, Some(2));
1358 assert!(cfg.invite_list.contains("alice"));
1359 assert!(cfg.invite_list.contains("bob"));
1360 }
1361
1362 #[tokio::test]
1363 async fn create_room_with_config_duplicate_fails() {
1364 let daemon = DaemonState::new(DaemonConfig::default());
1365 let config = room_protocol::RoomConfig::public("owner");
1366 daemon
1367 .create_room_with_config("dup", config.clone())
1368 .await
1369 .unwrap();
1370 assert!(daemon.create_room_with_config("dup", config).await.is_err());
1371 }
1372
1373 #[tokio::test]
1374 async fn get_room_config_returns_none_for_unconfigured() {
1375 let daemon = DaemonState::new(DaemonConfig::default());
1376 daemon.create_room("plain").await.unwrap();
1377 assert!(daemon.get_room_config("plain").await.is_none());
1378 }
1379
1380 #[tokio::test]
1381 async fn get_room_config_returns_config_when_present() {
1382 let daemon = DaemonState::new(DaemonConfig::default());
1383 let config = room_protocol::RoomConfig::dm("alice", "bob");
1384 daemon
1385 .create_room_with_config("dm-alice-bob", config)
1386 .await
1387 .unwrap();
1388 let cfg = daemon.get_room_config("dm-alice-bob").await.unwrap();
1389 assert_eq!(cfg.visibility, room_protocol::RoomVisibility::Dm);
1390 }
1391
1392 #[tokio::test]
1393 async fn dm_room_id_deterministic_and_lookup_works() {
1394 let daemon = DaemonState::new(DaemonConfig::default());
1395 let room_id = room_protocol::dm_room_id("bob", "alice").unwrap();
1396 assert_eq!(room_id, "dm-alice-bob");
1397
1398 let config = room_protocol::RoomConfig::dm("bob", "alice");
1399 daemon
1400 .create_room_with_config(&room_id, config)
1401 .await
1402 .unwrap();
1403 assert!(daemon.has_room("dm-alice-bob").await);
1404 assert_eq!(
1406 room_protocol::dm_room_id("alice", "bob").unwrap(),
1407 "dm-alice-bob"
1408 );
1409 }
1410
1411 #[test]
1414 fn valid_room_ids() {
1415 for id in [
1416 "lobby",
1417 "agent-room-2",
1418 "my_room",
1419 "Room.1",
1420 "dm-alice-bob",
1421 "a",
1422 &"x".repeat(MAX_ROOM_ID_LEN),
1423 ] {
1424 assert!(validate_room_id(id).is_ok(), "should accept: {id:?}");
1425 }
1426 }
1427
1428 #[test]
1429 fn empty_room_id_rejected() {
1430 let err = validate_room_id("").unwrap_err();
1431 assert!(err.contains("empty"), "{err}");
1432 }
1433
1434 #[test]
1435 fn room_id_too_long_rejected() {
1436 let long = "x".repeat(MAX_ROOM_ID_LEN + 1);
1437 let err = validate_room_id(&long).unwrap_err();
1438 assert!(err.contains("too long"), "{err}");
1439 }
1440
1441 #[test]
1442 fn dot_dot_traversal_rejected() {
1443 for id in ["..", "room/../etc", "..secret", "a..b"] {
1444 let err = validate_room_id(id).unwrap_err();
1445 assert!(err.contains(".."), "should reject {id:?}: {err}");
1446 }
1447 }
1448
1449 #[test]
1450 fn single_dot_rejected() {
1451 let err = validate_room_id(".").unwrap_err();
1452 assert!(err.contains(".."), "{err}");
1453 }
1454
1455 #[test]
1456 fn slash_rejected() {
1457 for id in ["room/sub", "/etc/passwd", "a/b/c"] {
1458 let err = validate_room_id(id).unwrap_err();
1459 assert!(err.contains("unsafe"), "should reject {id:?}: {err}");
1460 }
1461 }
1462
1463 #[test]
1464 fn backslash_rejected() {
1465 let err = validate_room_id("room\\sub").unwrap_err();
1466 assert!(err.contains("unsafe"), "{err}");
1467 }
1468
1469 #[test]
1470 fn null_byte_rejected() {
1471 let err = validate_room_id("room\0id").unwrap_err();
1472 assert!(err.contains("unsafe"), "{err}");
1473 }
1474
1475 #[test]
1476 fn whitespace_rejected() {
1477 for id in ["room name", "room\tid", "room\nid", " leading", "trailing "] {
1478 let err = validate_room_id(id).unwrap_err();
1479 assert!(err.contains("whitespace"), "should reject {id:?}: {err}");
1480 }
1481 }
1482
1483 #[test]
1484 fn other_unsafe_chars_rejected() {
1485 for ch in [':', '*', '?', '"', '<', '>', '|'] {
1486 let id = format!("room{ch}id");
1487 let err = validate_room_id(&id).unwrap_err();
1488 assert!(err.contains("unsafe"), "should reject {ch:?}: {err}");
1489 }
1490 }
1491
1492 #[tokio::test]
1493 async fn create_room_rejects_invalid_id() {
1494 let daemon = DaemonState::new(DaemonConfig::default());
1495 assert!(daemon.create_room("room/sub").await.is_err());
1496 assert!(daemon.create_room("..").await.is_err());
1497 assert!(daemon.create_room("").await.is_err());
1498 assert!(daemon.create_room("room name").await.is_err());
1499 }
1500
1501 #[tokio::test]
1502 async fn create_room_with_config_rejects_invalid_id() {
1503 let daemon = DaemonState::new(DaemonConfig::default());
1504 let config = room_protocol::RoomConfig::public("owner");
1505 assert!(daemon
1506 .create_room_with_config("../etc", config)
1507 .await
1508 .is_err());
1509 }
1510
1511 #[tokio::test]
1514 async fn dm_room_auto_subscribes_both_participants() {
1515 let daemon = DaemonState::new(DaemonConfig::default());
1516 let config = room_protocol::RoomConfig::dm("alice", "bob");
1517 daemon
1518 .create_room_with_config("dm-alice-bob", config)
1519 .await
1520 .unwrap();
1521
1522 let state = get_room(&daemon, "dm-alice-bob").await;
1523 let subs = state.subscription_map.lock().await;
1524 assert_eq!(subs.len(), 2);
1525 assert_eq!(
1526 subs.get("alice"),
1527 Some(&room_protocol::SubscriptionTier::Full)
1528 );
1529 assert_eq!(
1530 subs.get("bob"),
1531 Some(&room_protocol::SubscriptionTier::Full)
1532 );
1533 }
1534
1535 #[tokio::test]
1536 async fn public_room_starts_with_no_subscriptions() {
1537 let daemon = DaemonState::new(DaemonConfig::default());
1538 let config = room_protocol::RoomConfig::public("owner");
1539 daemon
1540 .create_room_with_config("lobby", config)
1541 .await
1542 .unwrap();
1543
1544 let state = get_room(&daemon, "lobby").await;
1545 let subs = state.subscription_map.lock().await;
1546 assert!(subs.is_empty());
1547 }
1548
1549 #[tokio::test]
1550 async fn unconfigured_room_starts_with_no_subscriptions() {
1551 let daemon = DaemonState::new(DaemonConfig::default());
1552 daemon.create_room("plain").await.unwrap();
1553
1554 let state = get_room(&daemon, "plain").await;
1555 let subs = state.subscription_map.lock().await;
1556 assert!(subs.is_empty());
1557 }
1558
1559 #[tokio::test]
1560 async fn dm_auto_subscribe_uses_full_tier() {
1561 let daemon = DaemonState::new(DaemonConfig::default());
1562 let config = room_protocol::RoomConfig::dm("carol", "dave");
1563 daemon
1564 .create_room_with_config("dm-carol-dave", config)
1565 .await
1566 .unwrap();
1567
1568 let state = get_room(&daemon, "dm-carol-dave").await;
1569 let subs = state.subscription_map.lock().await;
1570 for (_, tier) in subs.iter() {
1572 assert_eq!(*tier, room_protocol::SubscriptionTier::Full);
1573 }
1574 }
1575
1576 #[test]
1577 fn build_initial_subscriptions_dm_populates() {
1578 let config = room_protocol::RoomConfig::dm("alice", "bob");
1579 let subs = build_initial_subscriptions(&config);
1580 assert_eq!(subs.len(), 2);
1581 assert_eq!(subs["alice"], room_protocol::SubscriptionTier::Full);
1582 assert_eq!(subs["bob"], room_protocol::SubscriptionTier::Full);
1583 }
1584
1585 #[test]
1586 fn build_initial_subscriptions_public_empty() {
1587 let config = room_protocol::RoomConfig::public("owner");
1588 let subs = build_initial_subscriptions(&config);
1589 assert!(subs.is_empty());
1590 }
1591
1592 #[test]
1595 fn default_grace_period_is_30() {
1596 let config = DaemonConfig::default();
1597 assert_eq!(config.grace_period_secs, 30);
1598 }
1599
1600 #[test]
1601 fn custom_grace_period_preserved() {
1602 let config = DaemonConfig {
1603 grace_period_secs: 0,
1604 ..DaemonConfig::default()
1605 };
1606 assert_eq!(config.grace_period_secs, 0);
1607 }
1608
1609 #[tokio::test]
1612 async fn connection_count_starts_at_zero() {
1613 let daemon = DaemonState::new(DaemonConfig::default());
1614 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1615 }
1616
1617 #[tokio::test]
1618 async fn connection_count_increments_and_decrements() {
1619 let count = Arc::new(AtomicUsize::new(0));
1620 count.fetch_add(1, Ordering::SeqCst);
1621 count.fetch_add(1, Ordering::SeqCst);
1622 assert_eq!(count.load(Ordering::SeqCst), 2);
1623 count.fetch_sub(1, Ordering::SeqCst);
1624 assert_eq!(count.load(Ordering::SeqCst), 1);
1625 count.fetch_sub(1, Ordering::SeqCst);
1626 assert_eq!(count.load(Ordering::SeqCst), 0);
1627 }
1628
1629 #[tokio::test]
1633 async fn daemon_exits_on_shutdown_signal() {
1634 let dir = tempfile::TempDir::new().unwrap();
1635 let socket = dir.path().join("test-grace.sock");
1636 std::fs::create_dir_all(dir.path().join("data")).unwrap();
1637 std::fs::create_dir_all(dir.path().join("state")).unwrap();
1638
1639 let config = DaemonConfig {
1640 socket_path: socket.clone(),
1641 data_dir: dir.path().join("data"),
1642 state_dir: dir.path().join("state"),
1643 ws_port: None,
1644 grace_period_secs: 0,
1645 };
1646 let daemon = Arc::new(DaemonState::new(config));
1647 let shutdown = daemon.shutdown_handle();
1648
1649 let daemon2 = Arc::clone(&daemon);
1650 let handle = tokio::spawn(async move { daemon2.run().await });
1651
1652 for _ in 0..100 {
1654 if tokio::net::UnixStream::connect(&socket).await.is_ok() {
1655 break;
1656 }
1657 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1658 }
1659 assert!(
1660 tokio::net::UnixStream::connect(&socket).await.is_ok(),
1661 "daemon socket not ready"
1662 );
1663
1664 let _ = shutdown.send(true);
1666 let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await;
1667 assert!(result.is_ok(), "daemon did not exit within 5s");
1668 assert!(result.unwrap().unwrap().is_ok(), "run() returned error");
1669 }
1670
1671 #[tokio::test]
1675 async fn grace_period_cancelled_by_new_connection() {
1676 let dir = tempfile::TempDir::new().unwrap();
1677 let socket = dir.path().join("test-cancel-grace.sock");
1678
1679 let config = DaemonConfig {
1680 socket_path: socket.clone(),
1681 data_dir: dir.path().join("data"),
1682 state_dir: dir.path().join("state"),
1683 ws_port: None,
1684 grace_period_secs: 60, };
1686 let daemon = DaemonState::new(config);
1687
1688 daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1690 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1691 daemon.connection_count.fetch_sub(1, Ordering::SeqCst);
1692 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 0);
1693
1694 daemon.connection_count.fetch_add(1, Ordering::SeqCst);
1696 assert_eq!(daemon.connection_count.load(Ordering::SeqCst), 1);
1697
1698 assert!(!*daemon.shutdown.borrow());
1700 }
1701
1702 fn write_legacy_token(dir: &std::path::Path, room_id: &str, username: &str, token: &str) {
1706 let name = format!("room-{room_id}-{username}.token");
1707 let data = serde_json::json!({"username": username, "token": token});
1708 std::fs::write(dir.join(name), format!("{data}\n")).unwrap();
1709 }
1710
1711 #[test]
1712 fn migrate_legacy_tmpdir_tokens_imports_token() {
1713 let token_dir = tempfile::TempDir::new().unwrap();
1714 let state_dir = tempfile::TempDir::new().unwrap();
1715 write_legacy_token(token_dir.path(), "lobby", "alice", "legacy-uuid-alice");
1716
1717 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1718
1719 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1725
1726 assert_eq!(registry.validate_token("legacy-uuid-alice"), Some("alice"));
1727 assert!(registry.get_user("alice").is_some());
1728 }
1729
1730 #[test]
1731 fn migrate_legacy_tmpdir_tokens_idempotent() {
1732 let token_dir = tempfile::TempDir::new().unwrap();
1733 let state_dir = tempfile::TempDir::new().unwrap();
1734 write_legacy_token(token_dir.path(), "lobby", "bob", "tok-bob");
1735
1736 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1737 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1738 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1739
1740 assert_eq!(registry.validate_token("tok-bob"), Some("bob"));
1742 let snap = registry.token_snapshot();
1743 assert_eq!(snap.values().filter(|u| u.as_str() == "bob").count(), 1);
1744 }
1745
1746 #[test]
1747 fn migrate_legacy_tmpdir_tokens_skips_non_token_files() {
1748 let token_dir = tempfile::TempDir::new().unwrap();
1749 let state_dir = tempfile::TempDir::new().unwrap();
1750 std::fs::write(token_dir.path().join("roomd.sock"), "not a token").unwrap();
1751 std::fs::write(token_dir.path().join("something.json"), "{}").unwrap();
1752
1753 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1754 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1755
1756 assert!(registry.list_users().is_empty());
1757 }
1758
1759 #[test]
1760 fn migrate_legacy_tmpdir_tokens_skips_malformed_json() {
1761 let token_dir = tempfile::TempDir::new().unwrap();
1762 let state_dir = tempfile::TempDir::new().unwrap();
1763 std::fs::write(token_dir.path().join("room-x-bad.token"), "not-json{{{").unwrap();
1764
1765 let mut registry = UserRegistry::new(state_dir.path().to_owned());
1766 migrate_legacy_tmpdir_tokens_from(token_dir.path(), &mut registry);
1767
1768 assert!(registry.list_users().is_empty());
1769 }
1770}