Skip to main content

room_cli/oneshot/
poll.rs

1use std::path::{Path, PathBuf};
2
3use crate::{history, message::Message};
4
5use super::token::{read_cursor, username_from_token, write_cursor};
6
7/// Return all messages from `chat_path` after the message with ID `since` (exclusive).
8///
9/// If `since` is `None`, the cursor file at `cursor_path` is checked for a previously
10/// stored position. A `None` cursor means all messages are returned.
11///
12/// `viewer` is the username of the caller. When `Some`, `DirectMessage` entries are
13/// filtered to only those where the viewer is the sender or the recipient. Pass `None`
14/// to skip DM filtering (e.g. in tests that don't involve DMs).
15///
16/// The cursor file is updated to the last returned message's ID after each successful call.
17pub async fn poll_messages(
18    chat_path: &Path,
19    cursor_path: &Path,
20    viewer: Option<&str>,
21    since: Option<&str>,
22) -> anyhow::Result<Vec<Message>> {
23    let effective_since: Option<String> = since
24        .map(|s| s.to_owned())
25        .or_else(|| read_cursor(cursor_path));
26
27    let messages = history::load(chat_path).await?;
28
29    let start = match &effective_since {
30        Some(id) => messages
31            .iter()
32            .position(|m| m.id() == id)
33            .map(|i| i + 1)
34            .unwrap_or(0),
35        None => 0,
36    };
37
38    let result: Vec<Message> = messages[start..]
39        .iter()
40        .filter(|m| match m {
41            Message::DirectMessage { user, to, .. } => viewer
42                .map(|v| v == user.as_str() || v == to.as_str())
43                .unwrap_or(true),
44            _ => true,
45        })
46        .cloned()
47        .collect();
48
49    if let Some(last) = result.last() {
50        write_cursor(cursor_path, last.id())?;
51    }
52
53    Ok(result)
54}
55
56/// Return the last `n` messages from history without updating the poll cursor.
57///
58/// DM entries are filtered so that `viewer` only sees messages where they are
59/// the sender or the recipient. Pass `None` to skip DM filtering.
60pub async fn pull_messages(
61    chat_path: &Path,
62    n: usize,
63    viewer: Option<&str>,
64) -> anyhow::Result<Vec<Message>> {
65    let clamped = n.min(200);
66    let all = history::tail(chat_path, clamped).await?;
67    let visible: Vec<Message> = all
68        .into_iter()
69        .filter(|m| match m {
70            Message::DirectMessage { user, to, .. } => viewer
71                .map(|v| v == user.as_str() || v == to.as_str())
72                .unwrap_or(true),
73            _ => true,
74        })
75        .collect();
76    Ok(visible)
77}
78
79/// One-shot pull subcommand: print the last N messages from history as NDJSON.
80///
81/// Reads from the chat file directly (no broker connection required).
82/// Does **not** update the poll cursor.
83pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
84    let username = username_from_token(room_id, token)?;
85    let meta_path = PathBuf::from(format!("/tmp/room-{room_id}.meta"));
86    let chat_path = chat_path_from_meta(room_id, &meta_path);
87
88    let messages = pull_messages(&chat_path, n, Some(&username)).await?;
89    for msg in &messages {
90        println!("{}", serde_json::to_string(msg)?);
91    }
92    Ok(())
93}
94
95/// Watch subcommand: poll in a loop until at least one foreign `Message` arrives.
96///
97/// Reads the caller's username from the session token file. Polls every
98/// `interval_secs` seconds, filtering out own messages and non-`Message` variants.
99/// Exits after printing the first batch of foreign messages as NDJSON.
100/// Shares the cursor file with `room poll` — the two subcommands never re-deliver
101/// the same message.
102pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
103    let username = username_from_token(room_id, token)?;
104    let meta_path = PathBuf::from(format!("/tmp/room-{room_id}.meta"));
105    let chat_path = chat_path_from_meta(room_id, &meta_path);
106    let cursor_path = PathBuf::from(format!("/tmp/room-{room_id}-{username}.cursor"));
107
108    loop {
109        let messages = poll_messages(&chat_path, &cursor_path, Some(&username), None).await?;
110
111        let foreign: Vec<&Message> = messages
112            .iter()
113            .filter(|m| match m {
114                Message::Message { user, .. } => user != &username,
115                Message::DirectMessage { to, .. } => to == &username,
116                _ => false,
117            })
118            .collect();
119
120        if !foreign.is_empty() {
121            for msg in foreign {
122                println!("{}", serde_json::to_string(msg)?);
123            }
124            return Ok(());
125        }
126
127        tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
128    }
129}
130
131/// One-shot poll subcommand: read messages since cursor, print as NDJSON, update cursor.
132///
133/// Reads the caller's username from the session token file.
134pub async fn cmd_poll(room_id: &str, token: &str, since: Option<String>) -> anyhow::Result<()> {
135    let username = username_from_token(room_id, token)?;
136    let meta_path = PathBuf::from(format!("/tmp/room-{room_id}.meta"));
137    let chat_path = chat_path_from_meta(room_id, &meta_path);
138    let cursor_path = PathBuf::from(format!("/tmp/room-{room_id}-{username}.cursor"));
139
140    let messages =
141        poll_messages(&chat_path, &cursor_path, Some(&username), since.as_deref()).await?;
142    for msg in &messages {
143        println!("{}", serde_json::to_string(msg)?);
144    }
145    Ok(())
146}
147
148pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
149    if meta_path.exists() {
150        if let Ok(data) = std::fs::read_to_string(meta_path) {
151            if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
152                if let Some(p) = v["chat_path"].as_str() {
153                    return PathBuf::from(p);
154                }
155            }
156        }
157    }
158    history::default_chat_path(room_id)
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use crate::message::make_message;
165    use tempfile::{NamedTempFile, TempDir};
166
167    /// poll_messages with no cursor and no since returns all messages.
168    #[tokio::test]
169    async fn poll_messages_no_cursor_returns_all() {
170        let chat = NamedTempFile::new().unwrap();
171        let cursor_dir = TempDir::new().unwrap();
172        let cursor = cursor_dir.path().join("cursor");
173
174        let msg = make_message("r", "alice", "hello");
175        crate::history::append(chat.path(), &msg).await.unwrap();
176
177        let result = poll_messages(chat.path(), &cursor, None, None)
178            .await
179            .unwrap();
180        assert_eq!(result.len(), 1);
181        assert_eq!(result[0].id(), msg.id());
182    }
183
184    /// poll_messages advances the cursor so a second call returns nothing.
185    #[tokio::test]
186    async fn poll_messages_advances_cursor() {
187        let chat = NamedTempFile::new().unwrap();
188        let cursor_dir = TempDir::new().unwrap();
189        let cursor = cursor_dir.path().join("cursor");
190
191        let msg = make_message("r", "alice", "hello");
192        crate::history::append(chat.path(), &msg).await.unwrap();
193
194        poll_messages(chat.path(), &cursor, None, None)
195            .await
196            .unwrap();
197
198        let second = poll_messages(chat.path(), &cursor, None, None)
199            .await
200            .unwrap();
201        assert!(
202            second.is_empty(),
203            "cursor should have advanced past the first message"
204        );
205    }
206
207    /// DM visibility: viewer only sees DMs they sent or received.
208    #[tokio::test]
209    async fn poll_messages_filters_dms_by_viewer() {
210        use crate::message::make_dm;
211        let chat = NamedTempFile::new().unwrap();
212        let cursor_dir = TempDir::new().unwrap();
213        let cursor = cursor_dir.path().join("cursor");
214
215        let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
216        let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
217        crate::history::append(chat.path(), &dm_alice_bob)
218            .await
219            .unwrap();
220        crate::history::append(chat.path(), &dm_alice_carol)
221            .await
222            .unwrap();
223
224        // bob sees only his DM
225        let result = poll_messages(chat.path(), &cursor, Some("bob"), None)
226            .await
227            .unwrap();
228        assert_eq!(result.len(), 1);
229        assert_eq!(result[0].id(), dm_alice_bob.id());
230    }
231
232    /// DMs addressed to the watcher are included in the foreign message filter
233    /// used by cmd_watch, not silently consumed.
234    #[tokio::test]
235    async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
236        use crate::message::make_dm;
237        let chat = NamedTempFile::new().unwrap();
238        let cursor_dir = TempDir::new().unwrap();
239        let cursor = cursor_dir.path().join("cursor");
240
241        // alice sends a DM to bob, and a broadcast message
242        let dm = make_dm("r", "alice", "bob", "secret for bob");
243        let msg = make_message("r", "alice", "public hello");
244        crate::history::append(chat.path(), &dm).await.unwrap();
245        crate::history::append(chat.path(), &msg).await.unwrap();
246
247        // Simulate what cmd_watch does: poll, then filter for foreign messages + DMs
248        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None)
249            .await
250            .unwrap();
251
252        let username = "bob";
253        let foreign: Vec<&Message> = messages
254            .iter()
255            .filter(|m| match m {
256                Message::Message { user, .. } => user != username,
257                Message::DirectMessage { to, .. } => to == username,
258                _ => false,
259            })
260            .collect();
261
262        // Both the DM (addressed to bob) and the broadcast (from alice) should appear
263        assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
264        assert!(
265            foreign
266                .iter()
267                .any(|m| matches!(m, Message::DirectMessage { .. })),
268            "DM must not be silently consumed"
269        );
270    }
271
272    /// DMs sent BY the watcher are excluded from the foreign filter (no self-echo).
273    #[tokio::test]
274    async fn poll_messages_dm_from_viewer_excluded_from_watch() {
275        use crate::message::make_dm;
276        let chat = NamedTempFile::new().unwrap();
277        let cursor_dir = TempDir::new().unwrap();
278        let cursor = cursor_dir.path().join("cursor");
279
280        // bob sends a DM to alice
281        let dm = make_dm("r", "bob", "alice", "from bob");
282        crate::history::append(chat.path(), &dm).await.unwrap();
283
284        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None)
285            .await
286            .unwrap();
287
288        let username = "bob";
289        let foreign: Vec<&Message> = messages
290            .iter()
291            .filter(|m| match m {
292                Message::Message { user, .. } => user != username,
293                Message::DirectMessage { to, .. } => to == username,
294                _ => false,
295            })
296            .collect();
297
298        assert!(
299            foreign.is_empty(),
300            "DMs sent by the watcher should not wake watch"
301        );
302    }
303
304    /// pull_messages returns the last n entries without moving the cursor.
305    #[tokio::test]
306    async fn pull_messages_returns_tail_without_cursor_change() {
307        let chat = NamedTempFile::new().unwrap();
308        let cursor_dir = TempDir::new().unwrap();
309        let cursor = cursor_dir.path().join("cursor");
310
311        for i in 0..5u32 {
312            crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
313                .await
314                .unwrap();
315        }
316
317        let pulled = pull_messages(chat.path(), 3, None).await.unwrap();
318        assert_eq!(pulled.len(), 3);
319
320        // cursor untouched — poll still returns all 5
321        let polled = poll_messages(chat.path(), &cursor, None, None)
322            .await
323            .unwrap();
324        assert_eq!(polled.len(), 5);
325    }
326}