Skip to main content

room_cli/oneshot/poll/
mod.rs

1mod commands;
2mod filter_events;
3mod filter_tier;
4pub(super) mod meta;
5mod multi_room;
6
7use std::path::Path;
8
9use crate::{history, message::Message};
10
11use super::token::{read_cursor, write_cursor};
12
13// ── Public re-exports ─────────────────────────────────────────────────────────
14
15pub use commands::{cmd_poll, cmd_poll_multi, cmd_pull, cmd_query, cmd_watch};
16pub use multi_room::poll_messages_multi;
17
18// ── Query engine types ─────────────────────────────────────────────────────────
19
20/// Options for the `room query` subcommand (and `poll`/`watch` aliases).
21#[derive(Debug, Clone)]
22pub struct QueryOptions {
23    /// Only return messages since the last poll cursor; advances the cursor
24    /// after printing results.
25    pub new_only: bool,
26    /// Block until at least one new foreign message arrives (implies `new_only`).
27    pub wait: bool,
28    /// Poll interval in seconds when `wait` is true.
29    pub interval_secs: u64,
30    /// If `true`, only return messages that @mention the calling user
31    /// (username resolved from the token).
32    pub mentions_only: bool,
33    /// Override the cursor with this legacy message UUID (used by the `poll`
34    /// alias `--since` flag, which predates the `room:seq` format).
35    pub since_uuid: Option<String>,
36}
37
38/// Return all messages from `chat_path` after the message with ID `since` (exclusive).
39///
40/// If `since` is `None`, the cursor file at `cursor_path` is checked for a previously
41/// stored position. A `None` cursor means all messages are returned.
42///
43/// `viewer` is the username of the caller. When `Some`, `DirectMessage` entries are
44/// filtered using [`Message::is_visible_to`], which grants access to the sender,
45/// recipient, and the room host. Pass `None` to skip DM filtering (e.g. in tests
46/// that don't involve DMs).
47///
48/// `host` is the room host username (typically the first user to join). When `Some`,
49/// the host can see all DMs regardless of sender/recipient.
50///
51/// The cursor file is updated to the last returned message's ID after each successful call.
52pub async fn poll_messages(
53    chat_path: &Path,
54    cursor_path: &Path,
55    viewer: Option<&str>,
56    host: Option<&str>,
57    since: Option<&str>,
58) -> anyhow::Result<Vec<Message>> {
59    let effective_since: Option<String> = since
60        .map(|s| s.to_owned())
61        .or_else(|| read_cursor(cursor_path));
62
63    let messages = history::load(chat_path).await?;
64
65    let start = match &effective_since {
66        Some(id) => messages
67            .iter()
68            .position(|m| m.id() == id)
69            .map(|i| i + 1)
70            .unwrap_or(0),
71        None => 0,
72    };
73
74    let result: Vec<Message> = messages[start..]
75        .iter()
76        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
77        .cloned()
78        .collect();
79
80    if let Some(last) = result.last() {
81        write_cursor(cursor_path, last.id())?;
82    }
83
84    Ok(result)
85}
86
87/// Return the last `n` messages from history without updating the poll cursor.
88///
89/// DM entries are filtered using [`Message::is_visible_to`] so that `viewer` only
90/// sees messages they are party to (sender, recipient, or host). Pass `None` to
91/// skip DM filtering.
92///
93/// `host` is the room host username. When `Some`, the host can see all DMs.
94pub async fn pull_messages(
95    chat_path: &Path,
96    n: usize,
97    viewer: Option<&str>,
98    host: Option<&str>,
99) -> anyhow::Result<Vec<Message>> {
100    let clamped = n.min(200);
101    let all = history::tail(chat_path, clamped).await?;
102    let visible: Vec<Message> = all
103        .into_iter()
104        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
105        .collect();
106    Ok(visible)
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use crate::message::make_message;
113    use tempfile::{NamedTempFile, TempDir};
114
115    /// poll_messages with no cursor and no since returns all messages.
116    #[tokio::test]
117    async fn poll_messages_no_cursor_returns_all() {
118        let chat = NamedTempFile::new().unwrap();
119        let cursor_dir = TempDir::new().unwrap();
120        let cursor = cursor_dir.path().join("cursor");
121
122        let msg = make_message("r", "alice", "hello");
123        crate::history::append(chat.path(), &msg).await.unwrap();
124
125        let result = poll_messages(chat.path(), &cursor, None, None, None)
126            .await
127            .unwrap();
128        assert_eq!(result.len(), 1);
129        assert_eq!(result[0].id(), msg.id());
130    }
131
132    /// poll_messages advances the cursor so a second call returns nothing.
133    #[tokio::test]
134    async fn poll_messages_advances_cursor() {
135        let chat = NamedTempFile::new().unwrap();
136        let cursor_dir = TempDir::new().unwrap();
137        let cursor = cursor_dir.path().join("cursor");
138
139        let msg = make_message("r", "alice", "hello");
140        crate::history::append(chat.path(), &msg).await.unwrap();
141
142        poll_messages(chat.path(), &cursor, None, None, None)
143            .await
144            .unwrap();
145
146        let second = poll_messages(chat.path(), &cursor, None, None, None)
147            .await
148            .unwrap();
149        assert!(
150            second.is_empty(),
151            "cursor should have advanced past the first message"
152        );
153    }
154
155    /// DM visibility: viewer only sees DMs they sent or received.
156    #[tokio::test]
157    async fn poll_messages_filters_dms_by_viewer() {
158        use crate::message::make_dm;
159        let chat = NamedTempFile::new().unwrap();
160        let cursor_dir = TempDir::new().unwrap();
161        let cursor = cursor_dir.path().join("cursor");
162
163        let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
164        let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
165        crate::history::append(chat.path(), &dm_alice_bob)
166            .await
167            .unwrap();
168        crate::history::append(chat.path(), &dm_alice_carol)
169            .await
170            .unwrap();
171
172        // bob sees only his DM
173        let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
174            .await
175            .unwrap();
176        assert_eq!(result.len(), 1);
177        assert_eq!(result[0].id(), dm_alice_bob.id());
178    }
179
180    /// DMs addressed to the watcher are included in the foreign message filter
181    /// used by cmd_watch, not silently consumed.
182    #[tokio::test]
183    async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
184        use crate::message::make_dm;
185        let chat = NamedTempFile::new().unwrap();
186        let cursor_dir = TempDir::new().unwrap();
187        let cursor = cursor_dir.path().join("cursor");
188
189        // alice sends a DM to bob, and a broadcast message
190        let dm = make_dm("r", "alice", "bob", "secret for bob");
191        let msg = make_message("r", "alice", "public hello");
192        crate::history::append(chat.path(), &dm).await.unwrap();
193        crate::history::append(chat.path(), &msg).await.unwrap();
194
195        // Simulate what cmd_watch does: poll, then filter for foreign messages + DMs
196        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
197            .await
198            .unwrap();
199
200        let username = "bob";
201        let foreign: Vec<&Message> = messages
202            .iter()
203            .filter(|m| match m {
204                Message::Message { user, .. } | Message::System { user, .. } => user != username,
205                Message::DirectMessage { to, .. } => to == username,
206                _ => false,
207            })
208            .collect();
209
210        // Both the DM (addressed to bob) and the broadcast (from alice) should appear
211        assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
212        assert!(
213            foreign
214                .iter()
215                .any(|m| matches!(m, Message::DirectMessage { .. })),
216            "DM must not be silently consumed"
217        );
218    }
219
220    /// DMs sent BY the watcher are excluded from the foreign filter (no self-echo).
221    #[tokio::test]
222    async fn poll_messages_dm_from_viewer_excluded_from_watch() {
223        use crate::message::make_dm;
224        let chat = NamedTempFile::new().unwrap();
225        let cursor_dir = TempDir::new().unwrap();
226        let cursor = cursor_dir.path().join("cursor");
227
228        // bob sends a DM to alice
229        let dm = make_dm("r", "bob", "alice", "from bob");
230        crate::history::append(chat.path(), &dm).await.unwrap();
231
232        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
233            .await
234            .unwrap();
235
236        let username = "bob";
237        let foreign: Vec<&Message> = messages
238            .iter()
239            .filter(|m| match m {
240                Message::Message { user, .. } | Message::System { user, .. } => user != username,
241                Message::DirectMessage { to, .. } => to == username,
242                _ => false,
243            })
244            .collect();
245
246        assert!(
247            foreign.is_empty(),
248            "DMs sent by the watcher should not wake watch"
249        );
250    }
251
252    /// System messages from other users wake the watch filter (#452).
253    #[tokio::test]
254    async fn watch_filter_wakes_on_foreign_system_message() {
255        use room_protocol::make_system;
256        let chat = NamedTempFile::new().unwrap();
257        let cursor_dir = TempDir::new().unwrap();
258        let cursor = cursor_dir.path().join("cursor");
259
260        let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
261        crate::history::append(chat.path(), &sys).await.unwrap();
262
263        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
264            .await
265            .unwrap();
266
267        let username = "bob";
268        let foreign: Vec<&Message> = messages
269            .iter()
270            .filter(|m| match m {
271                Message::Message { user, .. } | Message::System { user, .. } => user != username,
272                Message::DirectMessage { to, .. } => to == username,
273                _ => false,
274            })
275            .collect();
276
277        assert_eq!(
278            foreign.len(),
279            1,
280            "system messages from other users should wake watch"
281        );
282        assert!(matches!(foreign[0], Message::System { .. }));
283    }
284
285    /// System messages from the watcher's own username do not wake watch.
286    #[tokio::test]
287    async fn watch_filter_ignores_own_system_message() {
288        use room_protocol::make_system;
289        let chat = NamedTempFile::new().unwrap();
290        let cursor_dir = TempDir::new().unwrap();
291        let cursor = cursor_dir.path().join("cursor");
292
293        let sys = make_system("r", "bob", "bob subscribed (tier: full)");
294        crate::history::append(chat.path(), &sys).await.unwrap();
295
296        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
297            .await
298            .unwrap();
299
300        let username = "bob";
301        let foreign: Vec<&Message> = messages
302            .iter()
303            .filter(|m| match m {
304                Message::Message { user, .. } | Message::System { user, .. } => user != username,
305                Message::DirectMessage { to, .. } => to == username,
306                _ => false,
307            })
308            .collect();
309
310        assert!(
311            foreign.is_empty(),
312            "system messages from self should not wake watch"
313        );
314    }
315
316    /// Watch filter handles a mix of messages, system events, and DMs correctly.
317    #[tokio::test]
318    async fn watch_filter_mixed_message_types() {
319        use crate::message::make_dm;
320        use room_protocol::make_system;
321        let chat = NamedTempFile::new().unwrap();
322        let cursor_dir = TempDir::new().unwrap();
323        let cursor = cursor_dir.path().join("cursor");
324
325        // Foreign regular message
326        let msg = make_message("r", "alice", "hello");
327        // Foreign system message (plugin broadcast)
328        let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
329        // Own system message (should be filtered out)
330        let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
331        // DM addressed to watcher
332        let dm = make_dm("r", "alice", "bob", "private note");
333        // Own regular message (should be filtered out)
334        let own_msg = make_message("r", "bob", "my own message");
335
336        for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
337            crate::history::append(chat.path(), m).await.unwrap();
338        }
339
340        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
341            .await
342            .unwrap();
343
344        let username = "bob";
345        let foreign: Vec<&Message> = messages
346            .iter()
347            .filter(|m| match m {
348                Message::Message { user, .. } | Message::System { user, .. } => user != username,
349                Message::DirectMessage { to, .. } => to == username,
350                _ => false,
351            })
352            .collect();
353
354        assert_eq!(
355            foreign.len(),
356            3,
357            "should see: foreign message + foreign system + DM to self"
358        );
359        assert!(
360            foreign.iter().any(|m| matches!(m, Message::System { .. })),
361            "system message must appear in watch results"
362        );
363        assert!(
364            foreign.iter().any(|m| matches!(m, Message::Message { .. })),
365            "regular foreign message must appear"
366        );
367        assert!(
368            foreign
369                .iter()
370                .any(|m| matches!(m, Message::DirectMessage { .. })),
371            "DM to self must appear"
372        );
373    }
374
375    /// Host sees all DMs in poll regardless of sender/recipient.
376    #[tokio::test]
377    async fn poll_messages_host_sees_all_dms() {
378        use crate::message::make_dm;
379        let chat = NamedTempFile::new().unwrap();
380        let cursor_dir = TempDir::new().unwrap();
381        let cursor = cursor_dir.path().join("cursor");
382
383        let dm_alice_bob = make_dm("r", "alice", "bob", "private");
384        let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
385        crate::history::append(chat.path(), &dm_alice_bob)
386            .await
387            .unwrap();
388        crate::history::append(chat.path(), &dm_carol_dave)
389            .await
390            .unwrap();
391
392        // host "eve" can see both DMs
393        let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
394            .await
395            .unwrap();
396        assert_eq!(result.len(), 2, "host should see all DMs");
397    }
398
399    /// Non-host third party cannot see DMs they are not party to.
400    #[tokio::test]
401    async fn poll_messages_non_host_cannot_see_unrelated_dms() {
402        use crate::message::make_dm;
403        let chat = NamedTempFile::new().unwrap();
404        let cursor_dir = TempDir::new().unwrap();
405        let cursor = cursor_dir.path().join("cursor");
406
407        let dm = make_dm("r", "alice", "bob", "private");
408        crate::history::append(chat.path(), &dm).await.unwrap();
409
410        // carol is not a party and is not host
411        let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
412            .await
413            .unwrap();
414        assert!(result.is_empty(), "non-host third party should not see DM");
415    }
416
417    /// Host reads from pull_messages as well.
418    #[tokio::test]
419    async fn pull_messages_host_sees_all_dms() {
420        use crate::message::make_dm;
421        let chat = NamedTempFile::new().unwrap();
422
423        let dm = make_dm("r", "alice", "bob", "secret");
424        crate::history::append(chat.path(), &dm).await.unwrap();
425
426        let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
427            .await
428            .unwrap();
429        assert_eq!(result.len(), 1, "host should see the DM via pull");
430    }
431
432    /// pull_messages returns the last n entries without moving the cursor.
433    #[tokio::test]
434    async fn pull_messages_returns_tail_without_cursor_change() {
435        let chat = NamedTempFile::new().unwrap();
436        let cursor_dir = TempDir::new().unwrap();
437        let cursor = cursor_dir.path().join("cursor");
438
439        for i in 0..5u32 {
440            crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
441                .await
442                .unwrap();
443        }
444
445        let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
446        assert_eq!(pulled.len(), 3);
447
448        // cursor untouched — poll still returns all 5
449        let polled = poll_messages(chat.path(), &cursor, None, None, None)
450            .await
451            .unwrap();
452        assert_eq!(polled.len(), 5);
453    }
454}