Skip to main content

room_cli/
client.rs

1use std::path::PathBuf;
2
3use tokio::{
4    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5    net::UnixStream,
6};
7
8use crate::{
9    message::Message,
10    oneshot::transport::{join_session_target, resolve_socket_target, SocketTarget},
11    paths, tui,
12};
13
14pub struct Client {
15    pub socket_path: PathBuf,
16    pub room_id: String,
17    pub username: String,
18    pub agent_mode: bool,
19    pub history_lines: usize,
20    /// When set, the client is connecting through the daemon and must prefix
21    /// the interactive handshake with `ROOM:<room_id>:`.
22    pub daemon_mode: bool,
23}
24
25impl Client {
26    pub async fn run(self) -> anyhow::Result<()> {
27        // Ensure a token exists for this room/user so that subsequent oneshot
28        // commands (send, poll, watch) work without a manual `room join`.
29        self.ensure_token().await;
30
31        let stream = UnixStream::connect(&self.socket_path).await?;
32        let (read_half, mut write_half) = stream.into_split();
33
34        // Handshake: prefer SESSION:<token> for authenticated interactive join,
35        // falling back to bare username (deprecated) if no token file exists.
36        let session_part = match read_token_for_session(&self.room_id, &self.username) {
37            Some(token) => format!("SESSION:{token}"),
38            None => {
39                eprintln!(
40                    "[tui] no token file found — falling back to unauthenticated join \
41                     (run `room join {}` to fix)",
42                    self.username
43                );
44                self.username.clone()
45            }
46        };
47        let handshake = if self.daemon_mode {
48            format!("ROOM:{}:{session_part}\n", self.room_id)
49        } else {
50            format!("{session_part}\n")
51        };
52        write_half.write_all(handshake.as_bytes()).await?;
53
54        let reader = BufReader::new(read_half);
55
56        if self.agent_mode {
57            run_agent(reader, write_half, &self.username, self.history_lines).await
58        } else {
59            tui::run(
60                reader,
61                write_half,
62                &self.room_id,
63                &self.username,
64                self.history_lines,
65                self.socket_path.clone(),
66            )
67            .await
68        }
69    }
70
71    /// Ensure a valid session token exists for this room/user pair.
72    ///
73    /// Always attempts a `JOIN:` handshake to acquire a fresh token. This handles
74    /// broker restarts (which invalidate old tokens) transparently. If the join
75    /// succeeds, the new token is written to `~/.room/state/`. If it fails with
76    /// "username_taken", the existing token file is assumed valid (the user is
77    /// already registered with the broker). Other errors are logged but not
78    /// propagated — the interactive session can proceed regardless.
79    async fn ensure_token(&self) {
80        if let Err(e) = paths::ensure_room_dirs() {
81            eprintln!("[tui] cannot create ~/.room dirs: {e}");
82            return;
83        }
84
85        let target = if self.daemon_mode {
86            SocketTarget {
87                path: self.socket_path.clone(),
88                daemon_room: Some(self.room_id.clone()),
89            }
90        } else {
91            resolve_socket_target(&self.room_id, None)
92        };
93        match join_session_target(&target, &self.username).await {
94            Ok((returned_user, token)) => {
95                let token_data = serde_json::json!({"username": returned_user, "token": token});
96                let path = paths::token_path(&self.room_id, &returned_user);
97                if let Err(e) = std::fs::write(&path, format!("{token_data}\n")) {
98                    eprintln!("[tui] failed to write token file: {e}");
99                }
100            }
101            Err(e) => {
102                let msg = e.to_string();
103                if msg.contains("already in use") {
104                    // Username is registered — existing token should be valid.
105                    let token_path = paths::token_path(&self.room_id, &self.username);
106                    if !has_valid_token_file(&token_path) {
107                        eprintln!(
108                            "[tui] username registered but no token file found — \
109                             run `room join {} {}` to recover",
110                            self.room_id, self.username
111                        );
112                    }
113                } else if msg.contains("cannot connect") {
114                    // Broker not running — token will be acquired on next session.
115                } else {
116                    eprintln!("[tui] auto-join failed: {e}");
117                }
118            }
119        }
120    }
121}
122
123/// Read a session token for the given room/user.
124///
125/// Checks the global token file (`~/.room/state/room-<username>.token`) first,
126/// then falls back to the per-room token file (`~/.room/state/room-<room_id>-<username>.token`).
127/// Returns `Some(token_uuid)` if a valid token file is found, `None` otherwise.
128fn read_token_for_session(room_id: &str, username: &str) -> Option<String> {
129    let candidates = [
130        paths::global_token_path(username),
131        paths::token_path(room_id, username),
132    ];
133    for path in &candidates {
134        if let Some(token) = read_token_from_file(path) {
135            return Some(token);
136        }
137    }
138    None
139}
140
141/// Extract the `token` field from a JSON token file, or `None` on any failure.
142fn read_token_from_file(path: &std::path::Path) -> Option<String> {
143    let data = std::fs::read_to_string(path).ok()?;
144    let v: serde_json::Value = serde_json::from_str(data.trim()).ok()?;
145    v["token"].as_str().map(|s| s.to_owned())
146}
147
148/// Check whether a token file exists and contains valid JSON with a `token` field.
149fn has_valid_token_file(path: &std::path::Path) -> bool {
150    if !path.exists() {
151        return false;
152    }
153    let Ok(data) = std::fs::read_to_string(path) else {
154        return false;
155    };
156    let Ok(v) = serde_json::from_str::<serde_json::Value>(data.trim()) else {
157        return false;
158    };
159    v["token"].as_str().is_some()
160}
161
162/// Resolve the default username from the `$USER` environment variable.
163///
164/// Returns `None` when `$USER` is not set or is empty.
165pub fn default_username() -> Option<String> {
166    std::env::var("USER").ok().filter(|s| !s.is_empty())
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use tempfile::TempDir;
173
174    #[test]
175    fn has_valid_token_file_returns_false_for_missing_file() {
176        let dir = TempDir::new().unwrap();
177        let path = dir.path().join("nonexistent.token");
178        assert!(!has_valid_token_file(&path));
179    }
180
181    #[test]
182    fn has_valid_token_file_returns_true_for_valid_file() {
183        let dir = TempDir::new().unwrap();
184        let path = dir.path().join("test.token");
185        let data = serde_json::json!({"username": "alice", "token": "tok-123"});
186        std::fs::write(&path, format!("{data}\n")).unwrap();
187        assert!(has_valid_token_file(&path));
188    }
189
190    #[test]
191    fn has_valid_token_file_returns_false_for_corrupt_json() {
192        let dir = TempDir::new().unwrap();
193        let path = dir.path().join("corrupt.token");
194        std::fs::write(&path, "not valid json").unwrap();
195        assert!(!has_valid_token_file(&path));
196    }
197
198    #[test]
199    fn has_valid_token_file_returns_false_for_missing_token_field() {
200        let dir = TempDir::new().unwrap();
201        let path = dir.path().join("no-token.token");
202        let data = serde_json::json!({"username": "alice"});
203        std::fs::write(&path, format!("{data}\n")).unwrap();
204        assert!(!has_valid_token_file(&path));
205    }
206
207    #[test]
208    fn default_username_returns_user_env_var() {
209        // $USER should be set on macOS/Linux test environments.
210        let result = default_username();
211        assert!(
212            result.is_some(),
213            "$USER should be set in the test environment"
214        );
215        assert!(!result.unwrap().is_empty(), "$USER should not be empty");
216    }
217
218    /// has_valid_token_file returns false for empty file.
219    #[test]
220    fn has_valid_token_file_returns_false_for_empty_file() {
221        let dir = TempDir::new().unwrap();
222        let path = dir.path().join("empty.token");
223        std::fs::write(&path, "").unwrap();
224        assert!(!has_valid_token_file(&path));
225    }
226
227    /// has_valid_token_file returns false when token field is null.
228    #[test]
229    fn has_valid_token_file_returns_false_for_null_token() {
230        let dir = TempDir::new().unwrap();
231        let path = dir.path().join("null-token.token");
232        let data = serde_json::json!({"username": "alice", "token": null});
233        std::fs::write(&path, format!("{data}\n")).unwrap();
234        assert!(!has_valid_token_file(&path));
235    }
236
237    // ── read_token_from_file ─────────────────────────────────────────────
238
239    #[test]
240    fn read_token_from_file_returns_token_for_valid_file() {
241        let dir = TempDir::new().unwrap();
242        let path = dir.path().join("valid.token");
243        let data = serde_json::json!({"username": "alice", "token": "abc-123"});
244        std::fs::write(&path, format!("{data}\n")).unwrap();
245        assert_eq!(read_token_from_file(&path), Some("abc-123".to_owned()));
246    }
247
248    #[test]
249    fn read_token_from_file_returns_none_for_missing_file() {
250        let dir = TempDir::new().unwrap();
251        let path = dir.path().join("nope.token");
252        assert_eq!(read_token_from_file(&path), None);
253    }
254
255    #[test]
256    fn read_token_from_file_returns_none_for_corrupt_json() {
257        let dir = TempDir::new().unwrap();
258        let path = dir.path().join("corrupt.token");
259        std::fs::write(&path, "not json").unwrap();
260        assert_eq!(read_token_from_file(&path), None);
261    }
262
263    #[test]
264    fn read_token_from_file_returns_none_for_null_token() {
265        let dir = TempDir::new().unwrap();
266        let path = dir.path().join("null.token");
267        let data = serde_json::json!({"username": "alice", "token": null});
268        std::fs::write(&path, format!("{data}\n")).unwrap();
269        assert_eq!(read_token_from_file(&path), None);
270    }
271
272    #[test]
273    fn read_token_from_file_returns_none_for_missing_token_field() {
274        let dir = TempDir::new().unwrap();
275        let path = dir.path().join("no-field.token");
276        let data = serde_json::json!({"username": "alice"});
277        std::fs::write(&path, format!("{data}\n")).unwrap();
278        assert_eq!(read_token_from_file(&path), None);
279    }
280}
281
282async fn run_agent(
283    mut reader: BufReader<tokio::net::unix::OwnedReadHalf>,
284    mut write_half: tokio::net::unix::OwnedWriteHalf,
285    username: &str,
286    history_lines: usize,
287) -> anyhow::Result<()> {
288    // Buffer messages until we see our own join (signals end of history replay),
289    // then print the last `history_lines` buffered messages and stream the rest.
290    let username_owned = username.to_owned();
291
292    let inbound = tokio::spawn(async move {
293        let mut history_buf: Vec<String> = Vec::new();
294        let mut history_done = false;
295        let mut line = String::new();
296
297        loop {
298            line.clear();
299            match reader.read_line(&mut line).await {
300                Ok(0) => break,
301                Ok(_) => {
302                    let trimmed = line.trim();
303                    if trimmed.is_empty() {
304                        continue;
305                    }
306                    if history_done {
307                        println!("{trimmed}");
308                    } else {
309                        // Look for our own join event to mark end of history
310                        let is_own_join = serde_json::from_str::<Message>(trimmed)
311                            .ok()
312                            .map(|m| {
313                                matches!(&m, Message::Join { user, .. } if user == &username_owned)
314                            })
315                            .unwrap_or(false);
316
317                        if is_own_join {
318                            // Flush last N history entries
319                            let start = history_buf.len().saturating_sub(history_lines);
320                            for h in &history_buf[start..] {
321                                println!("{h}");
322                            }
323                            history_done = true;
324                            println!("{trimmed}");
325                        } else {
326                            history_buf.push(trimmed.to_owned());
327                        }
328                    }
329                }
330                Err(e) => {
331                    eprintln!("[agent] read error: {e}");
332                    break;
333                }
334            }
335        }
336    });
337
338    let _outbound = tokio::spawn(async move {
339        let stdin = tokio::io::stdin();
340        let mut stdin_reader = BufReader::new(stdin);
341        let mut line = String::new();
342        loop {
343            line.clear();
344            match stdin_reader.read_line(&mut line).await {
345                Ok(0) => break,
346                Ok(_) => {
347                    let trimmed = line.trim();
348                    if trimmed.is_empty() {
349                        continue;
350                    }
351                    if write_half
352                        .write_all(format!("{trimmed}\n").as_bytes())
353                        .await
354                        .is_err()
355                    {
356                        break;
357                    }
358                }
359                Err(e) => {
360                    eprintln!("[agent] stdin error: {e}");
361                    break;
362                }
363            }
364        }
365    });
366
367    // Stay alive until the broker closes the connection (inbound EOF),
368    // even if stdin is already exhausted.  This lets agents receive responses
369    // to messages they sent before their stdin closed.
370    inbound.await.ok();
371    Ok(())
372}