Skip to main content

room_cli/
client.rs

1use std::path::PathBuf;
2
3use tokio::{
4    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5    net::UnixStream,
6};
7
8use crate::{
9    message::Message,
10    oneshot::transport::{join_session_target, resolve_socket_target, SocketTarget},
11    paths, tui,
12};
13
14pub struct Client {
15    pub socket_path: PathBuf,
16    pub room_id: String,
17    pub username: String,
18    pub agent_mode: bool,
19    pub history_lines: usize,
20    /// When set, the client is connecting through the daemon and must prefix
21    /// the interactive handshake with `ROOM:<room_id>:`.
22    pub daemon_mode: bool,
23}
24
25impl Client {
26    pub async fn run(self) -> anyhow::Result<()> {
27        // Ensure a token exists for this room/user so that subsequent oneshot
28        // commands (send, poll, watch) work without a manual `room join`.
29        self.ensure_token().await;
30
31        let stream = UnixStream::connect(&self.socket_path).await?;
32        let (read_half, mut write_half) = stream.into_split();
33
34        // Handshake: send username (with ROOM: prefix for daemon connections).
35        let handshake = if self.daemon_mode {
36            format!("ROOM:{}:{}\n", self.room_id, self.username)
37        } else {
38            format!("{}\n", self.username)
39        };
40        write_half.write_all(handshake.as_bytes()).await?;
41
42        let reader = BufReader::new(read_half);
43
44        if self.agent_mode {
45            run_agent(reader, write_half, &self.username, self.history_lines).await
46        } else {
47            tui::run(
48                reader,
49                write_half,
50                &self.room_id,
51                &self.username,
52                self.history_lines,
53                self.socket_path.clone(),
54            )
55            .await
56        }
57    }
58
59    /// Ensure a valid session token exists for this room/user pair.
60    ///
61    /// Always attempts a `JOIN:` handshake to acquire a fresh token. This handles
62    /// broker restarts (which invalidate old tokens) transparently. If the join
63    /// succeeds, the new token is written to `~/.room/state/`. If it fails with
64    /// "username_taken", the existing token file is assumed valid (the user is
65    /// already registered with the broker). Other errors are logged but not
66    /// propagated — the interactive session can proceed regardless.
67    async fn ensure_token(&self) {
68        if let Err(e) = paths::ensure_room_dirs() {
69            eprintln!("[tui] cannot create ~/.room dirs: {e}");
70            return;
71        }
72
73        let target = if self.daemon_mode {
74            SocketTarget {
75                path: self.socket_path.clone(),
76                daemon_room: Some(self.room_id.clone()),
77            }
78        } else {
79            resolve_socket_target(&self.room_id, None)
80        };
81        match join_session_target(&target, &self.username).await {
82            Ok((returned_user, token)) => {
83                let token_data = serde_json::json!({"username": returned_user, "token": token});
84                let path = paths::token_path(&self.room_id, &returned_user);
85                if let Err(e) = std::fs::write(&path, format!("{token_data}\n")) {
86                    eprintln!("[tui] failed to write token file: {e}");
87                }
88            }
89            Err(e) => {
90                let msg = e.to_string();
91                if msg.contains("already in use") {
92                    // Username is registered — existing token should be valid.
93                    let token_path = paths::token_path(&self.room_id, &self.username);
94                    if !has_valid_token_file(&token_path) {
95                        eprintln!(
96                            "[tui] username registered but no token file found — \
97                             run `room join {} {}` to recover",
98                            self.room_id, self.username
99                        );
100                    }
101                } else if msg.contains("cannot connect") {
102                    // Broker not running — token will be acquired on next session.
103                } else {
104                    eprintln!("[tui] auto-join failed: {e}");
105                }
106            }
107        }
108    }
109}
110
111/// Check whether a token file exists and contains valid JSON with a `token` field.
112fn has_valid_token_file(path: &std::path::Path) -> bool {
113    if !path.exists() {
114        return false;
115    }
116    let Ok(data) = std::fs::read_to_string(path) else {
117        return false;
118    };
119    let Ok(v) = serde_json::from_str::<serde_json::Value>(data.trim()) else {
120        return false;
121    };
122    v["token"].as_str().is_some()
123}
124
125/// Resolve the default username from the `$USER` environment variable.
126///
127/// Returns `None` when `$USER` is not set or is empty.
128pub fn default_username() -> Option<String> {
129    std::env::var("USER").ok().filter(|s| !s.is_empty())
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use tempfile::TempDir;
136
137    #[test]
138    fn has_valid_token_file_returns_false_for_missing_file() {
139        let dir = TempDir::new().unwrap();
140        let path = dir.path().join("nonexistent.token");
141        assert!(!has_valid_token_file(&path));
142    }
143
144    #[test]
145    fn has_valid_token_file_returns_true_for_valid_file() {
146        let dir = TempDir::new().unwrap();
147        let path = dir.path().join("test.token");
148        let data = serde_json::json!({"username": "alice", "token": "tok-123"});
149        std::fs::write(&path, format!("{data}\n")).unwrap();
150        assert!(has_valid_token_file(&path));
151    }
152
153    #[test]
154    fn has_valid_token_file_returns_false_for_corrupt_json() {
155        let dir = TempDir::new().unwrap();
156        let path = dir.path().join("corrupt.token");
157        std::fs::write(&path, "not valid json").unwrap();
158        assert!(!has_valid_token_file(&path));
159    }
160
161    #[test]
162    fn has_valid_token_file_returns_false_for_missing_token_field() {
163        let dir = TempDir::new().unwrap();
164        let path = dir.path().join("no-token.token");
165        let data = serde_json::json!({"username": "alice"});
166        std::fs::write(&path, format!("{data}\n")).unwrap();
167        assert!(!has_valid_token_file(&path));
168    }
169
170    #[test]
171    fn default_username_returns_user_env_var() {
172        // $USER should be set on macOS/Linux test environments.
173        let result = default_username();
174        assert!(
175            result.is_some(),
176            "$USER should be set in the test environment"
177        );
178        assert!(!result.unwrap().is_empty(), "$USER should not be empty");
179    }
180
181    /// has_valid_token_file returns false for empty file.
182    #[test]
183    fn has_valid_token_file_returns_false_for_empty_file() {
184        let dir = TempDir::new().unwrap();
185        let path = dir.path().join("empty.token");
186        std::fs::write(&path, "").unwrap();
187        assert!(!has_valid_token_file(&path));
188    }
189
190    /// has_valid_token_file returns false when token field is null.
191    #[test]
192    fn has_valid_token_file_returns_false_for_null_token() {
193        let dir = TempDir::new().unwrap();
194        let path = dir.path().join("null-token.token");
195        let data = serde_json::json!({"username": "alice", "token": null});
196        std::fs::write(&path, format!("{data}\n")).unwrap();
197        assert!(!has_valid_token_file(&path));
198    }
199}
200
201async fn run_agent(
202    mut reader: BufReader<tokio::net::unix::OwnedReadHalf>,
203    mut write_half: tokio::net::unix::OwnedWriteHalf,
204    username: &str,
205    history_lines: usize,
206) -> anyhow::Result<()> {
207    // Buffer messages until we see our own join (signals end of history replay),
208    // then print the last `history_lines` buffered messages and stream the rest.
209    let username_owned = username.to_owned();
210
211    let inbound = tokio::spawn(async move {
212        let mut history_buf: Vec<String> = Vec::new();
213        let mut history_done = false;
214        let mut line = String::new();
215
216        loop {
217            line.clear();
218            match reader.read_line(&mut line).await {
219                Ok(0) => break,
220                Ok(_) => {
221                    let trimmed = line.trim();
222                    if trimmed.is_empty() {
223                        continue;
224                    }
225                    if history_done {
226                        println!("{trimmed}");
227                    } else {
228                        // Look for our own join event to mark end of history
229                        let is_own_join = serde_json::from_str::<Message>(trimmed)
230                            .ok()
231                            .map(|m| {
232                                matches!(&m, Message::Join { user, .. } if user == &username_owned)
233                            })
234                            .unwrap_or(false);
235
236                        if is_own_join {
237                            // Flush last N history entries
238                            let start = history_buf.len().saturating_sub(history_lines);
239                            for h in &history_buf[start..] {
240                                println!("{h}");
241                            }
242                            history_done = true;
243                            println!("{trimmed}");
244                        } else {
245                            history_buf.push(trimmed.to_owned());
246                        }
247                    }
248                }
249                Err(e) => {
250                    eprintln!("[agent] read error: {e}");
251                    break;
252                }
253            }
254        }
255    });
256
257    let _outbound = tokio::spawn(async move {
258        let stdin = tokio::io::stdin();
259        let mut stdin_reader = BufReader::new(stdin);
260        let mut line = String::new();
261        loop {
262            line.clear();
263            match stdin_reader.read_line(&mut line).await {
264                Ok(0) => break,
265                Ok(_) => {
266                    let trimmed = line.trim();
267                    if trimmed.is_empty() {
268                        continue;
269                    }
270                    if write_half
271                        .write_all(format!("{trimmed}\n").as_bytes())
272                        .await
273                        .is_err()
274                    {
275                        break;
276                    }
277                }
278                Err(e) => {
279                    eprintln!("[agent] stdin error: {e}");
280                    break;
281                }
282            }
283        }
284    });
285
286    // Stay alive until the broker closes the connection (inbound EOF),
287    // even if stdin is already exhausted.  This lets agents receive responses
288    // to messages they sent before their stdin closed.
289    inbound.await.ok();
290    Ok(())
291}