Skip to main content

room_cli/oneshot/poll/
mod.rs

1mod commands;
2mod filter_events;
3mod filter_tier;
4pub(super) mod meta;
5mod multi_room;
6
7use std::path::Path;
8
9use crate::{history, message::Message};
10
11use super::token::{read_cursor, write_cursor};
12
13// ── Public re-exports ─────────────────────────────────────────────────────────
14
15pub use commands::{cmd_poll, cmd_poll_multi, cmd_pull, cmd_query, cmd_watch};
16pub use multi_room::poll_messages_multi;
17
18// ── Query engine types ─────────────────────────────────────────────────────────
19
20/// Options for the `room query` subcommand (and `poll`/`watch` aliases).
21#[derive(Debug, Clone)]
22pub struct QueryOptions {
23    /// Only return messages since the last poll cursor; advances the cursor
24    /// after printing results.
25    pub new_only: bool,
26    /// Block until at least one new foreign message arrives (implies `new_only`).
27    pub wait: bool,
28    /// Poll interval in seconds when `wait` is true.
29    pub interval_secs: u64,
30    /// If `true`, only return messages that @mention the calling user
31    /// (username resolved from the token).
32    pub mentions_only: bool,
33    /// Override the cursor with this legacy message UUID (used by the `poll`
34    /// alias `--since` flag, which predates the `room:seq` format).
35    pub since_uuid: Option<String>,
36    /// Maximum time to wait in seconds. When set, the watch loop returns
37    /// (possibly with no messages) after this duration instead of blocking
38    /// indefinitely.
39    pub timeout_secs: Option<u64>,
40}
41
42/// Return all messages from `chat_path` after the message with ID `since` (exclusive).
43///
44/// If `since` is `None`, the cursor file at `cursor_path` is checked for a previously
45/// stored position. A `None` cursor means all messages are returned.
46///
47/// `viewer` is the username of the caller. When `Some`, `DirectMessage` entries are
48/// filtered using [`Message::is_visible_to`], which grants access to the sender,
49/// recipient, and the room host. Pass `None` to skip DM filtering (e.g. in tests
50/// that don't involve DMs).
51///
52/// `host` is the room host username (typically the first user to join). When `Some`,
53/// the host can see all DMs regardless of sender/recipient.
54///
55/// The cursor file is updated to the last returned message's ID after each successful call.
56pub async fn poll_messages(
57    chat_path: &Path,
58    cursor_path: &Path,
59    viewer: Option<&str>,
60    host: Option<&str>,
61    since: Option<&str>,
62) -> anyhow::Result<Vec<Message>> {
63    let effective_since: Option<String> = since
64        .map(|s| s.to_owned())
65        .or_else(|| read_cursor(cursor_path));
66
67    let messages = history::load(chat_path).await?;
68
69    let start = match &effective_since {
70        Some(id) => messages
71            .iter()
72            .position(|m| m.id() == id)
73            .map(|i| i + 1)
74            .unwrap_or(0),
75        None => 0,
76    };
77
78    let result: Vec<Message> = messages[start..]
79        .iter()
80        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
81        .cloned()
82        .collect();
83
84    if let Some(last) = result.last() {
85        write_cursor(cursor_path, last.id())?;
86    }
87
88    Ok(result)
89}
90
91/// Return the last `n` messages from history without updating the poll cursor.
92///
93/// DM entries are filtered using [`Message::is_visible_to`] so that `viewer` only
94/// sees messages they are party to (sender, recipient, or host). Pass `None` to
95/// skip DM filtering.
96///
97/// `host` is the room host username. When `Some`, the host can see all DMs.
98pub async fn pull_messages(
99    chat_path: &Path,
100    n: usize,
101    viewer: Option<&str>,
102    host: Option<&str>,
103) -> anyhow::Result<Vec<Message>> {
104    let clamped = n.min(200);
105    let all = history::tail(chat_path, clamped).await?;
106    let visible: Vec<Message> = all
107        .into_iter()
108        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
109        .collect();
110    Ok(visible)
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::message::make_message;
117    use tempfile::{NamedTempFile, TempDir};
118
119    /// poll_messages with no cursor and no since returns all messages.
120    #[tokio::test]
121    async fn poll_messages_no_cursor_returns_all() {
122        let chat = NamedTempFile::new().unwrap();
123        let cursor_dir = TempDir::new().unwrap();
124        let cursor = cursor_dir.path().join("cursor");
125
126        let msg = make_message("r", "alice", "hello");
127        crate::history::append(chat.path(), &msg).await.unwrap();
128
129        let result = poll_messages(chat.path(), &cursor, None, None, None)
130            .await
131            .unwrap();
132        assert_eq!(result.len(), 1);
133        assert_eq!(result[0].id(), msg.id());
134    }
135
136    /// poll_messages advances the cursor so a second call returns nothing.
137    #[tokio::test]
138    async fn poll_messages_advances_cursor() {
139        let chat = NamedTempFile::new().unwrap();
140        let cursor_dir = TempDir::new().unwrap();
141        let cursor = cursor_dir.path().join("cursor");
142
143        let msg = make_message("r", "alice", "hello");
144        crate::history::append(chat.path(), &msg).await.unwrap();
145
146        poll_messages(chat.path(), &cursor, None, None, None)
147            .await
148            .unwrap();
149
150        let second = poll_messages(chat.path(), &cursor, None, None, None)
151            .await
152            .unwrap();
153        assert!(
154            second.is_empty(),
155            "cursor should have advanced past the first message"
156        );
157    }
158
159    /// DM visibility: viewer only sees DMs they sent or received.
160    #[tokio::test]
161    async fn poll_messages_filters_dms_by_viewer() {
162        use crate::message::make_dm;
163        let chat = NamedTempFile::new().unwrap();
164        let cursor_dir = TempDir::new().unwrap();
165        let cursor = cursor_dir.path().join("cursor");
166
167        let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
168        let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
169        crate::history::append(chat.path(), &dm_alice_bob)
170            .await
171            .unwrap();
172        crate::history::append(chat.path(), &dm_alice_carol)
173            .await
174            .unwrap();
175
176        // bob sees only his DM
177        let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
178            .await
179            .unwrap();
180        assert_eq!(result.len(), 1);
181        assert_eq!(result[0].id(), dm_alice_bob.id());
182    }
183
184    /// DMs addressed to the watcher are included in the foreign message filter
185    /// used by cmd_watch, not silently consumed.
186    #[tokio::test]
187    async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
188        use crate::message::make_dm;
189        let chat = NamedTempFile::new().unwrap();
190        let cursor_dir = TempDir::new().unwrap();
191        let cursor = cursor_dir.path().join("cursor");
192
193        // alice sends a DM to bob, and a broadcast message
194        let dm = make_dm("r", "alice", "bob", "secret for bob");
195        let msg = make_message("r", "alice", "public hello");
196        crate::history::append(chat.path(), &dm).await.unwrap();
197        crate::history::append(chat.path(), &msg).await.unwrap();
198
199        // Simulate what cmd_watch does: poll, then filter for foreign messages + DMs
200        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
201            .await
202            .unwrap();
203
204        let username = "bob";
205        let foreign: Vec<&Message> = messages
206            .iter()
207            .filter(|m| match m {
208                Message::Message { user, .. } | Message::System { user, .. } => user != username,
209                Message::DirectMessage { to, .. } => to == username,
210                _ => false,
211            })
212            .collect();
213
214        // Both the DM (addressed to bob) and the broadcast (from alice) should appear
215        assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
216        assert!(
217            foreign
218                .iter()
219                .any(|m| matches!(m, Message::DirectMessage { .. })),
220            "DM must not be silently consumed"
221        );
222    }
223
224    /// DMs sent BY the watcher are excluded from the foreign filter (no self-echo).
225    #[tokio::test]
226    async fn poll_messages_dm_from_viewer_excluded_from_watch() {
227        use crate::message::make_dm;
228        let chat = NamedTempFile::new().unwrap();
229        let cursor_dir = TempDir::new().unwrap();
230        let cursor = cursor_dir.path().join("cursor");
231
232        // bob sends a DM to alice
233        let dm = make_dm("r", "bob", "alice", "from bob");
234        crate::history::append(chat.path(), &dm).await.unwrap();
235
236        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
237            .await
238            .unwrap();
239
240        let username = "bob";
241        let foreign: Vec<&Message> = messages
242            .iter()
243            .filter(|m| match m {
244                Message::Message { user, .. } | Message::System { user, .. } => user != username,
245                Message::DirectMessage { to, .. } => to == username,
246                _ => false,
247            })
248            .collect();
249
250        assert!(
251            foreign.is_empty(),
252            "DMs sent by the watcher should not wake watch"
253        );
254    }
255
256    /// System messages from other users wake the watch filter (#452).
257    #[tokio::test]
258    async fn watch_filter_wakes_on_foreign_system_message() {
259        use room_protocol::make_system;
260        let chat = NamedTempFile::new().unwrap();
261        let cursor_dir = TempDir::new().unwrap();
262        let cursor = cursor_dir.path().join("cursor");
263
264        let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
265        crate::history::append(chat.path(), &sys).await.unwrap();
266
267        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
268            .await
269            .unwrap();
270
271        let username = "bob";
272        let foreign: Vec<&Message> = messages
273            .iter()
274            .filter(|m| match m {
275                Message::Message { user, .. } | Message::System { user, .. } => user != username,
276                Message::DirectMessage { to, .. } => to == username,
277                _ => false,
278            })
279            .collect();
280
281        assert_eq!(
282            foreign.len(),
283            1,
284            "system messages from other users should wake watch"
285        );
286        assert!(matches!(foreign[0], Message::System { .. }));
287    }
288
289    /// System messages from the watcher's own username do not wake watch.
290    #[tokio::test]
291    async fn watch_filter_ignores_own_system_message() {
292        use room_protocol::make_system;
293        let chat = NamedTempFile::new().unwrap();
294        let cursor_dir = TempDir::new().unwrap();
295        let cursor = cursor_dir.path().join("cursor");
296
297        let sys = make_system("r", "bob", "bob subscribed (tier: full)");
298        crate::history::append(chat.path(), &sys).await.unwrap();
299
300        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
301            .await
302            .unwrap();
303
304        let username = "bob";
305        let foreign: Vec<&Message> = messages
306            .iter()
307            .filter(|m| match m {
308                Message::Message { user, .. } | Message::System { user, .. } => user != username,
309                Message::DirectMessage { to, .. } => to == username,
310                _ => false,
311            })
312            .collect();
313
314        assert!(
315            foreign.is_empty(),
316            "system messages from self should not wake watch"
317        );
318    }
319
320    /// Watch filter handles a mix of messages, system events, and DMs correctly.
321    #[tokio::test]
322    async fn watch_filter_mixed_message_types() {
323        use crate::message::make_dm;
324        use room_protocol::make_system;
325        let chat = NamedTempFile::new().unwrap();
326        let cursor_dir = TempDir::new().unwrap();
327        let cursor = cursor_dir.path().join("cursor");
328
329        // Foreign regular message
330        let msg = make_message("r", "alice", "hello");
331        // Foreign system message (plugin broadcast)
332        let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
333        // Own system message (should be filtered out)
334        let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
335        // DM addressed to watcher
336        let dm = make_dm("r", "alice", "bob", "private note");
337        // Own regular message (should be filtered out)
338        let own_msg = make_message("r", "bob", "my own message");
339
340        for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
341            crate::history::append(chat.path(), m).await.unwrap();
342        }
343
344        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
345            .await
346            .unwrap();
347
348        let username = "bob";
349        let foreign: Vec<&Message> = messages
350            .iter()
351            .filter(|m| match m {
352                Message::Message { user, .. } | Message::System { user, .. } => user != username,
353                Message::DirectMessage { to, .. } => to == username,
354                _ => false,
355            })
356            .collect();
357
358        assert_eq!(
359            foreign.len(),
360            3,
361            "should see: foreign message + foreign system + DM to self"
362        );
363        assert!(
364            foreign.iter().any(|m| matches!(m, Message::System { .. })),
365            "system message must appear in watch results"
366        );
367        assert!(
368            foreign.iter().any(|m| matches!(m, Message::Message { .. })),
369            "regular foreign message must appear"
370        );
371        assert!(
372            foreign
373                .iter()
374                .any(|m| matches!(m, Message::DirectMessage { .. })),
375            "DM to self must appear"
376        );
377    }
378
379    /// Host sees all DMs in poll regardless of sender/recipient.
380    #[tokio::test]
381    async fn poll_messages_host_sees_all_dms() {
382        use crate::message::make_dm;
383        let chat = NamedTempFile::new().unwrap();
384        let cursor_dir = TempDir::new().unwrap();
385        let cursor = cursor_dir.path().join("cursor");
386
387        let dm_alice_bob = make_dm("r", "alice", "bob", "private");
388        let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
389        crate::history::append(chat.path(), &dm_alice_bob)
390            .await
391            .unwrap();
392        crate::history::append(chat.path(), &dm_carol_dave)
393            .await
394            .unwrap();
395
396        // host "eve" can see both DMs
397        let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
398            .await
399            .unwrap();
400        assert_eq!(result.len(), 2, "host should see all DMs");
401    }
402
403    /// Non-host third party cannot see DMs they are not party to.
404    #[tokio::test]
405    async fn poll_messages_non_host_cannot_see_unrelated_dms() {
406        use crate::message::make_dm;
407        let chat = NamedTempFile::new().unwrap();
408        let cursor_dir = TempDir::new().unwrap();
409        let cursor = cursor_dir.path().join("cursor");
410
411        let dm = make_dm("r", "alice", "bob", "private");
412        crate::history::append(chat.path(), &dm).await.unwrap();
413
414        // carol is not a party and is not host
415        let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
416            .await
417            .unwrap();
418        assert!(result.is_empty(), "non-host third party should not see DM");
419    }
420
421    /// Host reads from pull_messages as well.
422    #[tokio::test]
423    async fn pull_messages_host_sees_all_dms() {
424        use crate::message::make_dm;
425        let chat = NamedTempFile::new().unwrap();
426
427        let dm = make_dm("r", "alice", "bob", "secret");
428        crate::history::append(chat.path(), &dm).await.unwrap();
429
430        let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
431            .await
432            .unwrap();
433        assert_eq!(result.len(), 1, "host should see the DM via pull");
434    }
435
436    /// pull_messages returns the last n entries without moving the cursor.
437    #[tokio::test]
438    async fn pull_messages_returns_tail_without_cursor_change() {
439        let chat = NamedTempFile::new().unwrap();
440        let cursor_dir = TempDir::new().unwrap();
441        let cursor = cursor_dir.path().join("cursor");
442
443        for i in 0..5u32 {
444            crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
445                .await
446                .unwrap();
447        }
448
449        let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
450        assert_eq!(pulled.len(), 3);
451
452        // cursor untouched — poll still returns all 5
453        let polled = poll_messages(chat.path(), &cursor, None, None, None)
454            .await
455            .unwrap();
456        assert_eq!(polled.len(), 5);
457    }
458}