Skip to main content

room_cli/oneshot/poll/
commands.rs

1use std::path::{Path, PathBuf};
2
3use crate::{history, message::Message, paths, query::QueryFilter};
4
5use super::filter_events::{
6    apply_event_filter, apply_per_room_event_filter, load_user_event_filter,
7};
8use super::filter_tier::{apply_per_room_tier_filter, apply_tier_filter, load_user_tier};
9use super::meta::{chat_path_from_meta, read_host_from_meta};
10use super::multi_room::poll_messages_multi;
11use super::{poll_messages, QueryOptions};
12
13use crate::oneshot::token::username_from_token;
14
15/// One-shot pull subcommand: print the last N messages from history as NDJSON.
16///
17/// Reads from the chat file directly (no broker connection required).
18/// Does **not** update the poll cursor.
19pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
20    let username = username_from_token(token)?;
21    let meta_path = paths::room_meta_path(room_id);
22    let chat_path = chat_path_from_meta(room_id, &meta_path);
23
24    let host = read_host_from_meta(&meta_path);
25    let mut messages =
26        super::pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
27    let tier = load_user_tier(room_id, &username);
28    apply_tier_filter(&mut messages, tier, &username);
29    let ef = load_user_event_filter(room_id, &username);
30    apply_event_filter(&mut messages, &ef);
31    for msg in &messages {
32        println!("{}", serde_json::to_string(msg)?);
33    }
34    Ok(())
35}
36
37/// Watch subcommand: poll in a loop until at least one foreign message arrives.
38///
39/// Reads the caller's username from the session token file. Polls every
40/// `interval_secs` seconds, filtering out own messages. Wakes on `Message`,
41/// `System`, and `DirectMessage` variants. Exits after printing the first batch
42/// of foreign messages as NDJSON. Shares the cursor file with `room poll` — the
43/// two subcommands never re-deliver the same message.
44pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
45    let username = username_from_token(token)?;
46    let meta_path = paths::room_meta_path(room_id);
47    let chat_path = chat_path_from_meta(room_id, &meta_path);
48    let cursor_path = paths::cursor_path(room_id, &username);
49    let host = read_host_from_meta(&meta_path);
50
51    let ef = load_user_event_filter(room_id, &username);
52
53    loop {
54        let mut messages = poll_messages(
55            &chat_path,
56            &cursor_path,
57            Some(&username),
58            host.as_deref(),
59            None,
60        )
61        .await?;
62
63        apply_event_filter(&mut messages, &ef);
64
65        let foreign: Vec<&Message> = messages
66            .iter()
67            .filter(|m| match m {
68                Message::Message { user, .. } | Message::System { user, .. } => user != &username,
69                Message::DirectMessage { to, .. } => to == &username,
70                Message::Event { user, .. } => user != &username,
71                _ => false,
72            })
73            .collect();
74
75        if !foreign.is_empty() {
76            for msg in foreign {
77                println!("{}", serde_json::to_string(msg)?);
78            }
79            return Ok(());
80        }
81
82        tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
83    }
84}
85
86/// One-shot poll subcommand: read messages since cursor, print as NDJSON, update cursor.
87///
88/// Reads the caller's username from the session token file. When `mentions_only` is
89/// true, only messages that @mention the caller's username are printed (cursor still
90/// advances past all messages).
91pub async fn cmd_poll(
92    room_id: &str,
93    token: &str,
94    since: Option<String>,
95    mentions_only: bool,
96) -> anyhow::Result<()> {
97    let username = username_from_token(token)?;
98    let meta_path = paths::room_meta_path(room_id);
99    let chat_path = chat_path_from_meta(room_id, &meta_path);
100    let cursor_path = paths::cursor_path(room_id, &username);
101    let host = read_host_from_meta(&meta_path);
102
103    let mut messages = poll_messages(
104        &chat_path,
105        &cursor_path,
106        Some(&username),
107        host.as_deref(),
108        since.as_deref(),
109    )
110    .await?;
111
112    let ef = load_user_event_filter(room_id, &username);
113    apply_event_filter(&mut messages, &ef);
114
115    for msg in &messages {
116        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
117            continue;
118        }
119        println!("{}", serde_json::to_string(msg)?);
120    }
121    Ok(())
122}
123
124/// One-shot multi-room poll subcommand: poll multiple rooms, merge by timestamp, print NDJSON.
125///
126/// Resolves the username from the token by trying each room in order. Each room's cursor
127/// is updated independently.
128pub async fn cmd_poll_multi(
129    room_ids: &[String],
130    token: &str,
131    mentions_only: bool,
132) -> anyhow::Result<()> {
133    // Resolve username by trying the token against each room
134    let username = username_from_token(token)?;
135
136    // Resolve chat paths for all rooms
137    let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
138    for room_id in room_ids {
139        let meta_path = paths::room_meta_path(room_id);
140        let chat_path = chat_path_from_meta(room_id, &meta_path);
141        rooms.push((room_id.as_str(), chat_path));
142    }
143
144    let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
145    let mut messages = poll_messages_multi(&room_refs, &username).await?;
146
147    let room_id_strings: Vec<String> = room_ids.iter().map(|s| s.to_string()).collect();
148    apply_per_room_event_filter(&mut messages, &room_id_strings, &username);
149
150    for msg in &messages {
151        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
152            continue;
153        }
154        println!("{}", serde_json::to_string(msg)?);
155    }
156    Ok(())
157}
158
159// ── cmd_query ─────────────────────────────────────────────────────────────────
160
161/// Unified query entry point for `room query` and the `poll`/`watch` aliases.
162///
163/// Three modes:
164/// - **Historical** (`new_only = false, wait = false`): reads full history,
165///   applies filter, no cursor update.
166/// - **New** (`new_only = true, wait = false`): reads since last cursor,
167///   applies filter, advances cursor.
168/// - **Wait** (`wait = true`): loops until at least one foreign message passes
169///   the filter, then prints and exits.
170///
171/// `room_ids` lists the rooms to read. `filter.rooms` may further restrict the
172/// output but does not affect which files are opened.
173pub async fn cmd_query(
174    room_ids: &[String],
175    token: &str,
176    mut filter: QueryFilter,
177    opts: QueryOptions,
178) -> anyhow::Result<()> {
179    if room_ids.is_empty() {
180        anyhow::bail!("at least one room ID is required");
181    }
182
183    let username = username_from_token(token)?;
184
185    // Resolve mention_user from caller if mentions_only is requested.
186    if opts.mentions_only {
187        filter.mention_user = Some(username.clone());
188    }
189
190    if opts.wait || opts.new_only {
191        cmd_query_new(room_ids, &username, filter, opts).await
192    } else {
193        cmd_query_history(room_ids, &username, filter).await
194    }
195}
196
197/// Cursor-based (new / wait) query mode.
198async fn cmd_query_new(
199    room_ids: &[String],
200    username: &str,
201    filter: QueryFilter,
202    opts: QueryOptions,
203) -> anyhow::Result<()> {
204    let deadline = opts
205        .timeout_secs
206        .map(|s| tokio::time::Instant::now() + tokio::time::Duration::from_secs(s));
207
208    loop {
209        let messages: Vec<Message> = if room_ids.len() == 1 {
210            let room_id = &room_ids[0];
211            let meta_path = paths::room_meta_path(room_id);
212            let chat_path = chat_path_from_meta(room_id, &meta_path);
213            let cursor_path = paths::cursor_path(room_id, username);
214            let host = read_host_from_meta(&meta_path);
215            poll_messages(
216                &chat_path,
217                &cursor_path,
218                Some(username),
219                host.as_deref(),
220                opts.since_uuid.as_deref(),
221            )
222            .await?
223        } else {
224            let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
225            for room_id in room_ids {
226                let meta_path = paths::room_meta_path(room_id);
227                let chat_path = chat_path_from_meta(room_id, &meta_path);
228                rooms_info.push((room_id.clone(), chat_path));
229            }
230            let room_refs: Vec<(&str, &Path)> = rooms_info
231                .iter()
232                .map(|(id, p)| (id.as_str(), p.as_path()))
233                .collect();
234            poll_messages_multi(&room_refs, username).await?
235        };
236
237        // Apply query filter, per-room subscription tiers, then sort + limit.
238        let mut filtered: Vec<Message> = messages
239            .into_iter()
240            .filter(|m| filter.matches(m, m.room()))
241            .collect();
242
243        if !filter.public_only {
244            apply_per_room_tier_filter(&mut filtered, room_ids, username);
245            apply_per_room_event_filter(&mut filtered, room_ids, username);
246        }
247
248        apply_sort_and_limit(&mut filtered, &filter);
249
250        if opts.wait {
251            // Only wake on foreign messages (includes system messages from plugins).
252            let foreign: Vec<&Message> = filtered
253                .iter()
254                .filter(|m| match m {
255                    Message::Message { user, .. } | Message::System { user, .. } => {
256                        user != username
257                    }
258                    Message::DirectMessage { to, .. } => to == username,
259                    _ => false,
260                })
261                .collect();
262
263            if !foreign.is_empty() {
264                for msg in foreign {
265                    println!("{}", serde_json::to_string(msg)?);
266                }
267                return Ok(());
268            }
269
270            // Check timeout before sleeping again.
271            if let Some(dl) = deadline {
272                if tokio::time::Instant::now() >= dl {
273                    return Ok(());
274                }
275            }
276        } else {
277            for msg in &filtered {
278                println!("{}", serde_json::to_string(msg)?);
279            }
280            return Ok(());
281        }
282
283        tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
284    }
285}
286
287/// Historical (no-cursor) query mode.
288async fn cmd_query_history(
289    room_ids: &[String],
290    username: &str,
291    filter: QueryFilter,
292) -> anyhow::Result<()> {
293    let mut all_messages: Vec<Message> = Vec::new();
294
295    for room_id in room_ids {
296        let meta_path = paths::room_meta_path(room_id);
297        let chat_path = chat_path_from_meta(room_id, &meta_path);
298        let messages = history::load(&chat_path).await?;
299        all_messages.extend(messages);
300    }
301
302    // DM privacy filter: viewer only sees their own DMs.
303    let mut filtered: Vec<Message> = all_messages
304        .into_iter()
305        .filter(|m| filter.matches(m, m.room()))
306        .filter(|m| match m {
307            Message::DirectMessage { user, to, .. } => user == username || to == username,
308            _ => true,
309        })
310        .collect();
311
312    if !filter.public_only {
313        apply_per_room_tier_filter(&mut filtered, room_ids, username);
314        apply_per_room_event_filter(&mut filtered, room_ids, username);
315    }
316
317    apply_sort_and_limit(&mut filtered, &filter);
318
319    // If a specific target_id was requested and nothing was found, report an error.
320    if filtered.is_empty() {
321        if let Some((ref target_room, target_seq)) = filter.target_id {
322            use room_protocol::format_message_id;
323            anyhow::bail!(
324                "message not found: {}",
325                format_message_id(target_room, target_seq)
326            );
327        }
328    }
329
330    for msg in &filtered {
331        println!("{}", serde_json::to_string(msg)?);
332    }
333    Ok(())
334}
335
336/// Apply sort order and optional limit to a message list in place.
337pub(super) fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
338    if filter.ascending {
339        messages.sort_by(|a, b| a.ts().cmp(b.ts()));
340    } else {
341        messages.sort_by(|a, b| b.ts().cmp(a.ts()));
342    }
343    if let Some(limit) = filter.limit {
344        messages.truncate(limit);
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use crate::message::make_message;
352    use tempfile::{NamedTempFile, TempDir};
353
354    // ── Test helpers ──────────────────────────────────────────────────────────
355
356    fn global_token_path(username: &str) -> PathBuf {
357        crate::paths::global_token_path(username)
358    }
359
360    fn write_token_file(_dir: &TempDir, _room_id: &str, username: &str, token: &str) {
361        let path = global_token_path(username);
362        if let Some(parent) = path.parent() {
363            std::fs::create_dir_all(parent).unwrap();
364        }
365        let data = serde_json::json!({ "username": username, "token": token });
366        std::fs::write(&path, format!("{data}\n")).unwrap();
367    }
368
369    fn write_meta_file(room_id: &str, chat_path: &Path) {
370        let meta_path = crate::paths::room_meta_path(room_id);
371        if let Some(parent) = meta_path.parent() {
372            std::fs::create_dir_all(parent).unwrap();
373        }
374        let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
375        std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
376    }
377
378    /// Run cmd_query and collect returned messages.
379    ///
380    /// Since cmd_query writes to stdout, we wrap it to capture results by
381    /// re-reading the chat file with the same filter in historical mode.
382    /// For `new_only` tests we verify the cursor state instead.
383    async fn oneshot_cmd_query_to_vec(
384        room_ids: &[String],
385        token: &str,
386        filter: QueryFilter,
387        opts: QueryOptions,
388        _cursor_dir: &TempDir,
389    ) -> anyhow::Result<Vec<Message>> {
390        // Snapshot cursor before run.
391        let cursor_before = room_ids.first().and_then(|id| {
392            crate::oneshot::token::username_from_token(token)
393                .ok()
394                .and_then(|u| {
395                    let p = crate::paths::cursor_path(id, &u);
396                    std::fs::read_to_string(&p).ok()
397                })
398        });
399
400        // Run cmd_query (side effect: may update cursor).
401        cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
402
403        // Snapshot cursor after run.
404        let cursor_after = room_ids.first().and_then(|id| {
405            crate::oneshot::token::username_from_token(token)
406                .ok()
407                .and_then(|u| {
408                    let p = crate::paths::cursor_path(id, &u);
409                    std::fs::read_to_string(&p).ok()
410                })
411        });
412
413        // Reconstruct what cmd_query would have returned.
414        // For historical mode: re-run with same filter and collect messages.
415        // For new_only mode: reload history and apply filter with the "before" cursor.
416        if !opts.new_only && !opts.wait {
417            // Historical: reload and reapply filter.
418            let mut all: Vec<Message> = Vec::new();
419            for room_id in room_ids {
420                let meta_path = crate::paths::room_meta_path(room_id);
421                let chat_path = super::super::meta::chat_path_from_meta(room_id, &meta_path);
422                let msgs = history::load(&chat_path).await?;
423                all.extend(msgs);
424            }
425            let username = username_from_token(token).unwrap_or_default();
426            let mut result: Vec<Message> = all
427                .into_iter()
428                .filter(|m| filter.matches(m, m.room()))
429                .filter(|m| match m {
430                    Message::DirectMessage { user, to, .. } => user == &username || to == &username,
431                    _ => true,
432                })
433                .collect();
434            apply_sort_and_limit(&mut result, &filter);
435            Ok(result)
436        } else {
437            // new_only mode: reconstruct returned messages by replaying history
438            // from cursor_before (before the call advanced it).
439            let advanced = cursor_after != cursor_before;
440            if advanced {
441                let room_id = &room_ids[0];
442                let meta_path = crate::paths::room_meta_path(room_id);
443                let chat_path = super::super::meta::chat_path_from_meta(room_id, &meta_path);
444                let all = history::load(&chat_path).await?;
445                // Find start from the pre-run cursor UUID.
446                let start = match &cursor_before {
447                    Some(id) => all
448                        .iter()
449                        .position(|m| m.id() == id.trim())
450                        .map(|i| i + 1)
451                        .unwrap_or(0),
452                    None => 0,
453                };
454                let filtered: Vec<Message> = all[start..]
455                    .iter()
456                    .filter(|m| filter.matches(m, m.room()))
457                    .cloned()
458                    .collect();
459                Ok(filtered)
460            } else {
461                Ok(vec![])
462            }
463        }
464    }
465
466    /// username_from_token returns an error for an unknown token.
467    #[test]
468    fn unknown_token_returns_error() {
469        let result = crate::oneshot::token::username_from_token("bad-token-nonexistent");
470        assert!(result.is_err());
471        assert!(result
472            .unwrap_err()
473            .to_string()
474            .contains("token not recognised"));
475    }
476
477    // ── cmd_query unit tests ───────────────────────────────────────────────────
478
479    /// cmd_query in historical mode returns all messages (newest-first by default).
480    #[tokio::test]
481    async fn cmd_query_history_returns_all_newest_first() {
482        let chat = NamedTempFile::new().unwrap();
483        let cursor_dir = TempDir::new().unwrap();
484        let token_dir = TempDir::new().unwrap();
485
486        let room_id = format!("test-cqh-{}", std::process::id());
487        write_token_file(&token_dir, &room_id, "alice-cqh", "tok-cqh");
488        write_meta_file(&room_id, chat.path());
489
490        for i in 0..3u32 {
491            crate::history::append(
492                chat.path(),
493                &make_message(&room_id, "alice-cqh", format!("{i}")),
494            )
495            .await
496            .unwrap();
497        }
498
499        let filter = QueryFilter {
500            rooms: vec![room_id.clone()],
501            ascending: false,
502            ..Default::default()
503        };
504        let opts = QueryOptions {
505            new_only: false,
506            wait: false,
507            interval_secs: 5,
508            mentions_only: false,
509            since_uuid: None,
510            timeout_secs: None,
511        };
512
513        // cursor should NOT advance (historical mode)
514        let cursor_path = crate::paths::cursor_path(&room_id, "alice-cqh");
515        let _ = std::fs::remove_file(&cursor_path);
516
517        // Run cmd_query — captures stdout indirectly by ensuring cursor unchanged.
518        oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqh", filter, opts, &cursor_dir)
519            .await
520            .unwrap();
521
522        // Cursor must not have been written in historical mode.
523        assert!(
524            !cursor_path.exists(),
525            "historical query must not write a cursor file"
526        );
527
528        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
529        let _ = std::fs::remove_file(&global_token_path("alice-cqh"));
530    }
531
532    /// cmd_query in --new mode advances the cursor.
533    #[tokio::test]
534    async fn cmd_query_new_advances_cursor() {
535        let chat = NamedTempFile::new().unwrap();
536        let cursor_dir = TempDir::new().unwrap();
537        let token_dir = TempDir::new().unwrap();
538
539        let room_id = format!("test-cqn-{}", std::process::id());
540        write_token_file(&token_dir, &room_id, "alice-cqn", "tok-cqn");
541        write_meta_file(&room_id, chat.path());
542
543        let msg = make_message(&room_id, "bob", "hello");
544        crate::history::append(chat.path(), &msg).await.unwrap();
545
546        let filter = QueryFilter {
547            rooms: vec![room_id.clone()],
548            ascending: true,
549            ..Default::default()
550        };
551        let opts = QueryOptions {
552            new_only: true,
553            wait: false,
554            interval_secs: 5,
555            mentions_only: false,
556            since_uuid: None,
557            timeout_secs: None,
558        };
559
560        // First query — should return the message and write cursor.
561        let result = oneshot_cmd_query_to_vec(
562            &[room_id.clone()],
563            "tok-cqn",
564            filter.clone(),
565            opts.clone(),
566            &cursor_dir,
567        )
568        .await
569        .unwrap();
570        assert_eq!(result.len(), 1);
571
572        // Second query — cursor advanced, nothing new.
573        let result2 =
574            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
575                .await
576                .unwrap();
577        assert!(
578            result2.is_empty(),
579            "second query should return nothing (cursor advanced)"
580        );
581
582        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
583        let _ = std::fs::remove_file(&global_token_path("alice-cqn"));
584    }
585
586    /// cmd_query with content_search only returns matching messages.
587    #[tokio::test]
588    async fn cmd_query_content_search_filters() {
589        let chat = NamedTempFile::new().unwrap();
590        let cursor_dir = TempDir::new().unwrap();
591        let token_dir = TempDir::new().unwrap();
592
593        let room_id = format!("test-cqs-{}", std::process::id());
594        write_token_file(&token_dir, &room_id, "alice-cqs", "tok-cqs");
595        write_meta_file(&room_id, chat.path());
596
597        crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
598            .await
599            .unwrap();
600        crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
601            .await
602            .unwrap();
603
604        let filter = QueryFilter {
605            rooms: vec![room_id.clone()],
606            content_search: Some("hello".into()),
607            ascending: true,
608            ..Default::default()
609        };
610        let opts = QueryOptions {
611            new_only: false,
612            wait: false,
613            interval_secs: 5,
614            mentions_only: false,
615            since_uuid: None,
616            timeout_secs: None,
617        };
618
619        let result =
620            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
621                .await
622                .unwrap();
623        assert_eq!(result.len(), 1);
624        assert!(result[0].content().unwrap().contains("hello"));
625
626        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
627        let _ = std::fs::remove_file(&global_token_path("alice-cqs"));
628    }
629
630    /// cmd_query with user filter only returns messages from that user.
631    #[tokio::test]
632    async fn cmd_query_user_filter() {
633        let chat = NamedTempFile::new().unwrap();
634        let cursor_dir = TempDir::new().unwrap();
635        let token_dir = TempDir::new().unwrap();
636
637        let room_id = format!("test-cqu-{}", std::process::id());
638        write_token_file(&token_dir, &room_id, "alice-cqu", "tok-cqu");
639        write_meta_file(&room_id, chat.path());
640
641        crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
642            .await
643            .unwrap();
644        crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
645            .await
646            .unwrap();
647
648        let filter = QueryFilter {
649            rooms: vec![room_id.clone()],
650            users: vec!["bob".into()],
651            ascending: true,
652            ..Default::default()
653        };
654        let opts = QueryOptions {
655            new_only: false,
656            wait: false,
657            interval_secs: 5,
658            mentions_only: false,
659            since_uuid: None,
660            timeout_secs: None,
661        };
662
663        let result =
664            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
665                .await
666                .unwrap();
667        assert_eq!(result.len(), 1);
668        assert_eq!(result[0].user(), "bob");
669
670        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
671        let _ = std::fs::remove_file(&global_token_path("alice-cqu"));
672    }
673
674    /// cmd_query with limit returns only N messages.
675    #[tokio::test]
676    async fn cmd_query_limit() {
677        let chat = NamedTempFile::new().unwrap();
678        let cursor_dir = TempDir::new().unwrap();
679        let token_dir = TempDir::new().unwrap();
680
681        let room_id = format!("test-cql-{}", std::process::id());
682        write_token_file(&token_dir, &room_id, "alice-cql", "tok-cql");
683        write_meta_file(&room_id, chat.path());
684
685        for i in 0..5u32 {
686            crate::history::append(
687                chat.path(),
688                &make_message(&room_id, "bob", format!("msg {i}")),
689            )
690            .await
691            .unwrap();
692        }
693
694        let filter = QueryFilter {
695            rooms: vec![room_id.clone()],
696            limit: Some(2),
697            ascending: false,
698            ..Default::default()
699        };
700        let opts = QueryOptions {
701            new_only: false,
702            wait: false,
703            interval_secs: 5,
704            mentions_only: false,
705            since_uuid: None,
706            timeout_secs: None,
707        };
708
709        let result =
710            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
711                .await
712                .unwrap();
713        assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
714
715        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
716        let _ = std::fs::remove_file(&global_token_path("alice-cql"));
717    }
718
719    /// cmd_query with Unsubscribed tier and public_only=true still returns messages.
720    #[tokio::test]
721    async fn cmd_query_public_bypasses_tier() {
722        let chat = NamedTempFile::new().unwrap();
723        let token_dir = TempDir::new().unwrap();
724        let cursor_dir = TempDir::new().unwrap();
725
726        let room_id = format!("test-pub-tier-{}", std::process::id());
727        write_token_file(&token_dir, &room_id, "alice-pub", "tok-pub-tier");
728        write_meta_file(&room_id, chat.path());
729
730        // Write subscription map marking alice-pub as Unsubscribed.
731        let state_dir = crate::paths::room_state_dir();
732        let _ = std::fs::create_dir_all(&state_dir);
733        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
734        let mut map = std::collections::HashMap::new();
735        map.insert(
736            "alice-pub".to_string(),
737            room_protocol::SubscriptionTier::Unsubscribed,
738        );
739        std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
740
741        // Add a message.
742        crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
743            .await
744            .unwrap();
745
746        // Query with public_only=true should bypass tier and return the message.
747        let filter = QueryFilter {
748            rooms: vec![room_id.clone()],
749            public_only: true,
750            ascending: true,
751            ..Default::default()
752        };
753        let opts = QueryOptions {
754            new_only: false,
755            wait: false,
756            interval_secs: 5,
757            mentions_only: false,
758            since_uuid: None,
759            timeout_secs: None,
760        };
761
762        let result = oneshot_cmd_query_to_vec(
763            &[room_id.clone()],
764            "tok-pub-tier",
765            filter,
766            opts,
767            &cursor_dir,
768        )
769        .await
770        .unwrap();
771        assert_eq!(
772            result.len(),
773            1,
774            "public flag should bypass Unsubscribed tier"
775        );
776
777        let _ = std::fs::remove_file(&sub_path);
778        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
779        let _ = std::fs::remove_file(&global_token_path("alice-pub"));
780    }
781
782    /// cmd_query with --wait and --timeout returns within the timeout even if no messages arrive.
783    #[tokio::test]
784    async fn cmd_query_wait_timeout_returns_empty() {
785        let chat = NamedTempFile::new().unwrap();
786        let cursor_dir = TempDir::new().unwrap();
787        let token_dir = TempDir::new().unwrap();
788
789        let room_id = format!("test-timeout-{}", std::process::id());
790        write_token_file(&token_dir, &room_id, "alice-timeout", "tok-timeout");
791        write_meta_file(&room_id, chat.path());
792
793        // Write one message from the caller so cursor advances past it.
794        crate::history::append(
795            chat.path(),
796            &make_message(&room_id, "alice-timeout", "my own msg"),
797        )
798        .await
799        .unwrap();
800
801        let filter = QueryFilter {
802            rooms: vec![room_id.clone()],
803            ascending: true,
804            ..Default::default()
805        };
806        let opts = QueryOptions {
807            new_only: true,
808            wait: true,
809            interval_secs: 1,
810            mentions_only: false,
811            since_uuid: None,
812            timeout_secs: Some(2),
813        };
814
815        // This should return within ~2 seconds (the timeout), not block forever.
816        // Call cmd_query directly — we only care that it returns within the deadline.
817        let start = std::time::Instant::now();
818        cmd_query(&[room_id.clone()], "tok-timeout", filter, opts)
819            .await
820            .unwrap();
821        let elapsed = start.elapsed();
822
823        assert!(
824            elapsed < std::time::Duration::from_secs(10),
825            "should return within timeout, not block forever (elapsed: {elapsed:?})"
826        );
827        assert!(
828            elapsed >= std::time::Duration::from_secs(1),
829            "should wait at least one interval before timing out (elapsed: {elapsed:?})"
830        );
831
832        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
833        let _ = std::fs::remove_file(&global_token_path("alice-timeout"));
834    }
835}