Skip to main content

room_cli/
client.rs

1use std::path::PathBuf;
2
3use tokio::{
4    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
5    net::UnixStream,
6};
7
8use crate::{message::Message, tui};
9
10pub struct Client {
11    pub socket_path: PathBuf,
12    pub room_id: String,
13    pub username: String,
14    pub agent_mode: bool,
15    pub history_lines: usize,
16}
17
18impl Client {
19    pub async fn run(self) -> anyhow::Result<()> {
20        let stream = UnixStream::connect(&self.socket_path).await?;
21        let (read_half, mut write_half) = stream.into_split();
22
23        // Handshake: send username
24        write_half
25            .write_all(format!("{}\n", self.username).as_bytes())
26            .await?;
27
28        let reader = BufReader::new(read_half);
29
30        if self.agent_mode {
31            run_agent(reader, write_half, &self.username, self.history_lines).await
32        } else {
33            tui::run(
34                reader,
35                write_half,
36                &self.room_id,
37                &self.username,
38                self.history_lines,
39            )
40            .await
41        }
42    }
43}
44
45async fn run_agent(
46    mut reader: BufReader<tokio::net::unix::OwnedReadHalf>,
47    mut write_half: tokio::net::unix::OwnedWriteHalf,
48    username: &str,
49    history_lines: usize,
50) -> anyhow::Result<()> {
51    // Buffer messages until we see our own join (signals end of history replay),
52    // then print the last `history_lines` buffered messages and stream the rest.
53    let username_owned = username.to_owned();
54
55    let inbound = tokio::spawn(async move {
56        let mut history_buf: Vec<String> = Vec::new();
57        let mut history_done = false;
58        let mut line = String::new();
59
60        loop {
61            line.clear();
62            match reader.read_line(&mut line).await {
63                Ok(0) => break,
64                Ok(_) => {
65                    let trimmed = line.trim();
66                    if trimmed.is_empty() {
67                        continue;
68                    }
69                    if history_done {
70                        println!("{trimmed}");
71                    } else {
72                        // Look for our own join event to mark end of history
73                        let is_own_join = serde_json::from_str::<Message>(trimmed)
74                            .ok()
75                            .map(|m| {
76                                matches!(&m, Message::Join { user, .. } if user == &username_owned)
77                            })
78                            .unwrap_or(false);
79
80                        if is_own_join {
81                            // Flush last N history entries
82                            let start = history_buf.len().saturating_sub(history_lines);
83                            for h in &history_buf[start..] {
84                                println!("{h}");
85                            }
86                            history_done = true;
87                            println!("{trimmed}");
88                        } else {
89                            history_buf.push(trimmed.to_owned());
90                        }
91                    }
92                }
93                Err(e) => {
94                    eprintln!("[agent] read error: {e}");
95                    break;
96                }
97            }
98        }
99    });
100
101    let _outbound = tokio::spawn(async move {
102        let stdin = tokio::io::stdin();
103        let mut stdin_reader = BufReader::new(stdin);
104        let mut line = String::new();
105        loop {
106            line.clear();
107            match stdin_reader.read_line(&mut line).await {
108                Ok(0) => break,
109                Ok(_) => {
110                    let trimmed = line.trim();
111                    if trimmed.is_empty() {
112                        continue;
113                    }
114                    if write_half
115                        .write_all(format!("{trimmed}\n").as_bytes())
116                        .await
117                        .is_err()
118                    {
119                        break;
120                    }
121                }
122                Err(e) => {
123                    eprintln!("[agent] stdin error: {e}");
124                    break;
125                }
126            }
127        }
128    });
129
130    // Stay alive until the broker closes the connection (inbound EOF),
131    // even if stdin is already exhausted.  This lets agents receive responses
132    // to messages they sent before their stdin closed.
133    inbound.await.ok();
134    Ok(())
135}