Skip to main content

room_cli/oneshot/
poll.rs

1use std::path::{Path, PathBuf};
2
3use room_protocol::SubscriptionTier;
4
5use crate::{
6    broker::commands::load_subscription_map, history, message::Message, paths, query::QueryFilter,
7};
8
9use super::token::{read_cursor, username_from_token, write_cursor};
10
11// ── Subscription tier lookup ──────────────────────────────────────────────────
12
13/// Look up a user's subscription tier for a room from the persisted
14/// subscription map on disk.
15///
16/// Returns `Full` when the subscription file is missing, corrupt, or the user
17/// has no entry — unsubscribed users must have been explicitly recorded.
18fn load_user_tier(room_id: &str, username: &str) -> SubscriptionTier {
19    let state_dir = paths::room_state_dir();
20    let sub_path = paths::broker_subscriptions_path(&state_dir, room_id);
21    let map = load_subscription_map(&sub_path);
22    map.get(username).copied().unwrap_or(SubscriptionTier::Full)
23}
24
25/// Apply subscription-tier filtering to a message list in place.
26///
27/// - `Full` — no filtering (all messages pass).
28/// - `MentionsOnly` — keep only messages that @mention `username`.
29/// - `Unsubscribed` — remove all messages.
30fn apply_tier_filter(messages: &mut Vec<Message>, tier: SubscriptionTier, username: &str) {
31    match tier {
32        SubscriptionTier::Full => {}
33        SubscriptionTier::MentionsOnly => {
34            messages.retain(|m| m.mentions().iter().any(|mention| mention == username));
35        }
36        SubscriptionTier::Unsubscribed => {
37            messages.clear();
38        }
39    }
40}
41
42/// Apply per-room subscription-tier filtering to a message list in place.
43///
44/// Looks up the user's tier for each message's room and filters accordingly.
45/// Each room is checked independently — a user can be `Full` in one room and
46/// `Unsubscribed` in another.
47fn apply_per_room_tier_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
48    use std::collections::HashMap;
49    let tiers: HashMap<&str, SubscriptionTier> = room_ids
50        .iter()
51        .map(|r| (r.as_str(), load_user_tier(r, username)))
52        .collect();
53
54    messages.retain(|m| {
55        let tier = tiers
56            .get(m.room())
57            .copied()
58            .unwrap_or(SubscriptionTier::Full);
59        match tier {
60            SubscriptionTier::Full => true,
61            SubscriptionTier::MentionsOnly => {
62                m.mentions().iter().any(|mention| mention == username)
63            }
64            SubscriptionTier::Unsubscribed => false,
65        }
66    });
67}
68
69// ── Query engine types ─────────────────────────────────────────────────────────
70
71/// Options for the `room query` subcommand (and `poll`/`watch` aliases).
72#[derive(Debug, Clone)]
73pub struct QueryOptions {
74    /// Only return messages since the last poll cursor; advances the cursor
75    /// after printing results.
76    pub new_only: bool,
77    /// Block until at least one new foreign message arrives (implies `new_only`).
78    pub wait: bool,
79    /// Poll interval in seconds when `wait` is true.
80    pub interval_secs: u64,
81    /// If `true`, only return messages that @mention the calling user
82    /// (username resolved from the token).
83    pub mentions_only: bool,
84    /// Override the cursor with this legacy message UUID (used by the `poll`
85    /// alias `--since` flag, which predates the `room:seq` format).
86    pub since_uuid: Option<String>,
87}
88
89/// Return all messages from `chat_path` after the message with ID `since` (exclusive).
90///
91/// If `since` is `None`, the cursor file at `cursor_path` is checked for a previously
92/// stored position. A `None` cursor means all messages are returned.
93///
94/// `viewer` is the username of the caller. When `Some`, `DirectMessage` entries are
95/// filtered using [`Message::is_visible_to`], which grants access to the sender,
96/// recipient, and the room host. Pass `None` to skip DM filtering (e.g. in tests
97/// that don't involve DMs).
98///
99/// `host` is the room host username (typically the first user to join). When `Some`,
100/// the host can see all DMs regardless of sender/recipient.
101///
102/// The cursor file is updated to the last returned message's ID after each successful call.
103pub async fn poll_messages(
104    chat_path: &Path,
105    cursor_path: &Path,
106    viewer: Option<&str>,
107    host: Option<&str>,
108    since: Option<&str>,
109) -> anyhow::Result<Vec<Message>> {
110    let effective_since: Option<String> = since
111        .map(|s| s.to_owned())
112        .or_else(|| read_cursor(cursor_path));
113
114    let messages = history::load(chat_path).await?;
115
116    let start = match &effective_since {
117        Some(id) => messages
118            .iter()
119            .position(|m| m.id() == id)
120            .map(|i| i + 1)
121            .unwrap_or(0),
122        None => 0,
123    };
124
125    let result: Vec<Message> = messages[start..]
126        .iter()
127        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
128        .cloned()
129        .collect();
130
131    if let Some(last) = result.last() {
132        write_cursor(cursor_path, last.id())?;
133    }
134
135    Ok(result)
136}
137
138/// Return the last `n` messages from history without updating the poll cursor.
139///
140/// DM entries are filtered using [`Message::is_visible_to`] so that `viewer` only
141/// sees messages they are party to (sender, recipient, or host). Pass `None` to
142/// skip DM filtering.
143///
144/// `host` is the room host username. When `Some`, the host can see all DMs.
145pub async fn pull_messages(
146    chat_path: &Path,
147    n: usize,
148    viewer: Option<&str>,
149    host: Option<&str>,
150) -> anyhow::Result<Vec<Message>> {
151    let clamped = n.min(200);
152    let all = history::tail(chat_path, clamped).await?;
153    let visible: Vec<Message> = all
154        .into_iter()
155        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
156        .collect();
157    Ok(visible)
158}
159
160/// One-shot pull subcommand: print the last N messages from history as NDJSON.
161///
162/// Reads from the chat file directly (no broker connection required).
163/// Does **not** update the poll cursor.
164pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
165    let username = username_from_token(token)?;
166    let meta_path = paths::room_meta_path(room_id);
167    let chat_path = chat_path_from_meta(room_id, &meta_path);
168
169    let host = read_host_from_meta(&meta_path);
170    let mut messages = pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
171    let tier = load_user_tier(room_id, &username);
172    apply_tier_filter(&mut messages, tier, &username);
173    for msg in &messages {
174        println!("{}", serde_json::to_string(msg)?);
175    }
176    Ok(())
177}
178
179/// Watch subcommand: poll in a loop until at least one foreign message arrives.
180///
181/// Reads the caller's username from the session token file. Polls every
182/// `interval_secs` seconds, filtering out own messages. Wakes on `Message`,
183/// `System`, and `DirectMessage` variants. Exits after printing the first batch
184/// of foreign messages as NDJSON. Shares the cursor file with `room poll` — the
185/// two subcommands never re-deliver the same message.
186pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
187    let username = username_from_token(token)?;
188    let meta_path = paths::room_meta_path(room_id);
189    let chat_path = chat_path_from_meta(room_id, &meta_path);
190    let cursor_path = paths::cursor_path(room_id, &username);
191    let host = read_host_from_meta(&meta_path);
192
193    loop {
194        let messages = poll_messages(
195            &chat_path,
196            &cursor_path,
197            Some(&username),
198            host.as_deref(),
199            None,
200        )
201        .await?;
202
203        let foreign: Vec<&Message> = messages
204            .iter()
205            .filter(|m| match m {
206                Message::Message { user, .. } | Message::System { user, .. } => user != &username,
207                Message::DirectMessage { to, .. } => to == &username,
208                _ => false,
209            })
210            .collect();
211
212        if !foreign.is_empty() {
213            for msg in foreign {
214                println!("{}", serde_json::to_string(msg)?);
215            }
216            return Ok(());
217        }
218
219        tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
220    }
221}
222
223/// One-shot poll subcommand: read messages since cursor, print as NDJSON, update cursor.
224///
225/// Reads the caller's username from the session token file. When `mentions_only` is
226/// true, only messages that @mention the caller's username are printed (cursor still
227/// advances past all messages).
228pub async fn cmd_poll(
229    room_id: &str,
230    token: &str,
231    since: Option<String>,
232    mentions_only: bool,
233) -> anyhow::Result<()> {
234    let username = username_from_token(token)?;
235    let meta_path = paths::room_meta_path(room_id);
236    let chat_path = chat_path_from_meta(room_id, &meta_path);
237    let cursor_path = paths::cursor_path(room_id, &username);
238    let host = read_host_from_meta(&meta_path);
239
240    let messages = poll_messages(
241        &chat_path,
242        &cursor_path,
243        Some(&username),
244        host.as_deref(),
245        since.as_deref(),
246    )
247    .await?;
248    for msg in &messages {
249        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
250            continue;
251        }
252        println!("{}", serde_json::to_string(msg)?);
253    }
254    Ok(())
255}
256
257/// Poll multiple rooms, returning messages merged by timestamp.
258///
259/// Each room uses its own cursor file under `~/.room/state/`.
260/// Messages are sorted by timestamp across all rooms. Each message already carries
261/// a `room` field, so the caller can distinguish sources.
262pub async fn poll_messages_multi(
263    rooms: &[(&str, &Path)],
264    username: &str,
265) -> anyhow::Result<Vec<Message>> {
266    let mut all_messages: Vec<Message> = Vec::new();
267
268    for &(room_id, chat_path) in rooms {
269        let cursor_path = paths::cursor_path(room_id, username);
270        let meta_path = paths::room_meta_path(room_id);
271        let host = read_host_from_meta(&meta_path);
272        let msgs = poll_messages(
273            chat_path,
274            &cursor_path,
275            Some(username),
276            host.as_deref(),
277            None,
278        )
279        .await?;
280        all_messages.extend(msgs);
281    }
282
283    all_messages.sort_by(|a, b| a.ts().cmp(b.ts()));
284    Ok(all_messages)
285}
286
287/// One-shot multi-room poll subcommand: poll multiple rooms, merge by timestamp, print NDJSON.
288///
289/// Resolves the username from the token by trying each room in order. Each room's cursor
290/// is updated independently.
291pub async fn cmd_poll_multi(
292    room_ids: &[String],
293    token: &str,
294    mentions_only: bool,
295) -> anyhow::Result<()> {
296    // Resolve username by trying the token against each room
297    let username = username_from_token(token)?;
298
299    // Resolve chat paths for all rooms
300    let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
301    for room_id in room_ids {
302        let meta_path = paths::room_meta_path(room_id);
303        let chat_path = chat_path_from_meta(room_id, &meta_path);
304        rooms.push((room_id.as_str(), chat_path));
305    }
306
307    let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
308    let messages = poll_messages_multi(&room_refs, &username).await?;
309    for msg in &messages {
310        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
311            continue;
312        }
313        println!("{}", serde_json::to_string(msg)?);
314    }
315    Ok(())
316}
317
318// ── cmd_query ─────────────────────────────────────────────────────────────────
319
320/// Unified query entry point for `room query` and the `poll`/`watch` aliases.
321///
322/// Three modes:
323/// - **Historical** (`new_only = false, wait = false`): reads full history,
324///   applies filter, no cursor update.
325/// - **New** (`new_only = true, wait = false`): reads since last cursor,
326///   applies filter, advances cursor.
327/// - **Wait** (`wait = true`): loops until at least one foreign message passes
328///   the filter, then prints and exits.
329///
330/// `room_ids` lists the rooms to read. `filter.rooms` may further restrict the
331/// output but does not affect which files are opened.
332pub async fn cmd_query(
333    room_ids: &[String],
334    token: &str,
335    mut filter: QueryFilter,
336    opts: QueryOptions,
337) -> anyhow::Result<()> {
338    if room_ids.is_empty() {
339        anyhow::bail!("at least one room ID is required");
340    }
341
342    let username = username_from_token(token)?;
343
344    // Resolve mention_user from caller if mentions_only is requested.
345    if opts.mentions_only {
346        filter.mention_user = Some(username.clone());
347    }
348
349    if opts.wait || opts.new_only {
350        cmd_query_new(room_ids, &username, filter, opts).await
351    } else {
352        cmd_query_history(room_ids, &username, filter).await
353    }
354}
355
356/// Cursor-based (new / wait) query mode.
357async fn cmd_query_new(
358    room_ids: &[String],
359    username: &str,
360    filter: QueryFilter,
361    opts: QueryOptions,
362) -> anyhow::Result<()> {
363    loop {
364        let messages: Vec<Message> = if room_ids.len() == 1 {
365            let room_id = &room_ids[0];
366            let meta_path = paths::room_meta_path(room_id);
367            let chat_path = chat_path_from_meta(room_id, &meta_path);
368            let cursor_path = paths::cursor_path(room_id, username);
369            let host = read_host_from_meta(&meta_path);
370            poll_messages(
371                &chat_path,
372                &cursor_path,
373                Some(username),
374                host.as_deref(),
375                opts.since_uuid.as_deref(),
376            )
377            .await?
378        } else {
379            let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
380            for room_id in room_ids {
381                let meta_path = paths::room_meta_path(room_id);
382                let chat_path = chat_path_from_meta(room_id, &meta_path);
383                rooms_info.push((room_id.clone(), chat_path));
384            }
385            let room_refs: Vec<(&str, &Path)> = rooms_info
386                .iter()
387                .map(|(id, p)| (id.as_str(), p.as_path()))
388                .collect();
389            poll_messages_multi(&room_refs, username).await?
390        };
391
392        // Apply query filter, per-room subscription tiers, then sort + limit.
393        let mut filtered: Vec<Message> = messages
394            .into_iter()
395            .filter(|m| filter.matches(m, m.room()))
396            .collect();
397
398        if !filter.public_only {
399            apply_per_room_tier_filter(&mut filtered, room_ids, username);
400        }
401
402        apply_sort_and_limit(&mut filtered, &filter);
403
404        if opts.wait {
405            // Only wake on foreign messages (includes system messages from plugins).
406            let foreign: Vec<&Message> = filtered
407                .iter()
408                .filter(|m| match m {
409                    Message::Message { user, .. } | Message::System { user, .. } => {
410                        user != username
411                    }
412                    Message::DirectMessage { to, .. } => to == username,
413                    _ => false,
414                })
415                .collect();
416
417            if !foreign.is_empty() {
418                for msg in foreign {
419                    println!("{}", serde_json::to_string(msg)?);
420                }
421                return Ok(());
422            }
423        } else {
424            for msg in &filtered {
425                println!("{}", serde_json::to_string(msg)?);
426            }
427            return Ok(());
428        }
429
430        tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
431    }
432}
433
434/// Historical (no-cursor) query mode.
435async fn cmd_query_history(
436    room_ids: &[String],
437    username: &str,
438    filter: QueryFilter,
439) -> anyhow::Result<()> {
440    let mut all_messages: Vec<Message> = Vec::new();
441
442    for room_id in room_ids {
443        let meta_path = paths::room_meta_path(room_id);
444        let chat_path = chat_path_from_meta(room_id, &meta_path);
445        let messages = history::load(&chat_path).await?;
446        all_messages.extend(messages);
447    }
448
449    // DM privacy filter: viewer only sees their own DMs.
450    let mut filtered: Vec<Message> = all_messages
451        .into_iter()
452        .filter(|m| filter.matches(m, m.room()))
453        .filter(|m| match m {
454            Message::DirectMessage { user, to, .. } => user == username || to == username,
455            _ => true,
456        })
457        .collect();
458
459    if !filter.public_only {
460        apply_per_room_tier_filter(&mut filtered, room_ids, username);
461    }
462
463    apply_sort_and_limit(&mut filtered, &filter);
464
465    // If a specific target_id was requested and nothing was found, report an error.
466    if filtered.is_empty() {
467        if let Some((ref target_room, target_seq)) = filter.target_id {
468            use room_protocol::format_message_id;
469            anyhow::bail!(
470                "message not found: {}",
471                format_message_id(target_room, target_seq)
472            );
473        }
474    }
475
476    for msg in &filtered {
477        println!("{}", serde_json::to_string(msg)?);
478    }
479    Ok(())
480}
481
482/// Apply sort order and optional limit to a message list in place.
483fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
484    if filter.ascending {
485        messages.sort_by(|a, b| a.ts().cmp(b.ts()));
486    } else {
487        messages.sort_by(|a, b| b.ts().cmp(a.ts()));
488    }
489    if let Some(limit) = filter.limit {
490        messages.truncate(limit);
491    }
492}
493
494/// Read the room host username from the meta file, if present.
495///
496/// Returns `None` if the meta file does not exist, cannot be parsed, or has no
497/// `"host"` field. Callers should treat `None` the same as no host information.
498pub(super) fn read_host_from_meta(meta_path: &Path) -> Option<String> {
499    if !meta_path.exists() {
500        return None;
501    }
502    let data = std::fs::read_to_string(meta_path).ok()?;
503    let v: serde_json::Value = serde_json::from_str(&data).ok()?;
504    v["host"].as_str().map(str::to_owned)
505}
506
507pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
508    if meta_path.exists() {
509        if let Ok(data) = std::fs::read_to_string(meta_path) {
510            if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
511                if let Some(p) = v["chat_path"].as_str() {
512                    return PathBuf::from(p);
513                }
514            }
515        }
516    }
517    history::default_chat_path(room_id)
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use crate::message::make_message;
524    use tempfile::{NamedTempFile, TempDir};
525
526    /// poll_messages with no cursor and no since returns all messages.
527    #[tokio::test]
528    async fn poll_messages_no_cursor_returns_all() {
529        let chat = NamedTempFile::new().unwrap();
530        let cursor_dir = TempDir::new().unwrap();
531        let cursor = cursor_dir.path().join("cursor");
532
533        let msg = make_message("r", "alice", "hello");
534        crate::history::append(chat.path(), &msg).await.unwrap();
535
536        let result = poll_messages(chat.path(), &cursor, None, None, None)
537            .await
538            .unwrap();
539        assert_eq!(result.len(), 1);
540        assert_eq!(result[0].id(), msg.id());
541    }
542
543    /// poll_messages advances the cursor so a second call returns nothing.
544    #[tokio::test]
545    async fn poll_messages_advances_cursor() {
546        let chat = NamedTempFile::new().unwrap();
547        let cursor_dir = TempDir::new().unwrap();
548        let cursor = cursor_dir.path().join("cursor");
549
550        let msg = make_message("r", "alice", "hello");
551        crate::history::append(chat.path(), &msg).await.unwrap();
552
553        poll_messages(chat.path(), &cursor, None, None, None)
554            .await
555            .unwrap();
556
557        let second = poll_messages(chat.path(), &cursor, None, None, None)
558            .await
559            .unwrap();
560        assert!(
561            second.is_empty(),
562            "cursor should have advanced past the first message"
563        );
564    }
565
566    /// DM visibility: viewer only sees DMs they sent or received.
567    #[tokio::test]
568    async fn poll_messages_filters_dms_by_viewer() {
569        use crate::message::make_dm;
570        let chat = NamedTempFile::new().unwrap();
571        let cursor_dir = TempDir::new().unwrap();
572        let cursor = cursor_dir.path().join("cursor");
573
574        let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
575        let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
576        crate::history::append(chat.path(), &dm_alice_bob)
577            .await
578            .unwrap();
579        crate::history::append(chat.path(), &dm_alice_carol)
580            .await
581            .unwrap();
582
583        // bob sees only his DM
584        let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
585            .await
586            .unwrap();
587        assert_eq!(result.len(), 1);
588        assert_eq!(result[0].id(), dm_alice_bob.id());
589    }
590
591    /// DMs addressed to the watcher are included in the foreign message filter
592    /// used by cmd_watch, not silently consumed.
593    #[tokio::test]
594    async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
595        use crate::message::make_dm;
596        let chat = NamedTempFile::new().unwrap();
597        let cursor_dir = TempDir::new().unwrap();
598        let cursor = cursor_dir.path().join("cursor");
599
600        // alice sends a DM to bob, and a broadcast message
601        let dm = make_dm("r", "alice", "bob", "secret for bob");
602        let msg = make_message("r", "alice", "public hello");
603        crate::history::append(chat.path(), &dm).await.unwrap();
604        crate::history::append(chat.path(), &msg).await.unwrap();
605
606        // Simulate what cmd_watch does: poll, then filter for foreign messages + DMs
607        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
608            .await
609            .unwrap();
610
611        let username = "bob";
612        let foreign: Vec<&Message> = messages
613            .iter()
614            .filter(|m| match m {
615                Message::Message { user, .. } | Message::System { user, .. } => user != username,
616                Message::DirectMessage { to, .. } => to == username,
617                _ => false,
618            })
619            .collect();
620
621        // Both the DM (addressed to bob) and the broadcast (from alice) should appear
622        assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
623        assert!(
624            foreign
625                .iter()
626                .any(|m| matches!(m, Message::DirectMessage { .. })),
627            "DM must not be silently consumed"
628        );
629    }
630
631    /// DMs sent BY the watcher are excluded from the foreign filter (no self-echo).
632    #[tokio::test]
633    async fn poll_messages_dm_from_viewer_excluded_from_watch() {
634        use crate::message::make_dm;
635        let chat = NamedTempFile::new().unwrap();
636        let cursor_dir = TempDir::new().unwrap();
637        let cursor = cursor_dir.path().join("cursor");
638
639        // bob sends a DM to alice
640        let dm = make_dm("r", "bob", "alice", "from bob");
641        crate::history::append(chat.path(), &dm).await.unwrap();
642
643        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
644            .await
645            .unwrap();
646
647        let username = "bob";
648        let foreign: Vec<&Message> = messages
649            .iter()
650            .filter(|m| match m {
651                Message::Message { user, .. } | Message::System { user, .. } => user != username,
652                Message::DirectMessage { to, .. } => to == username,
653                _ => false,
654            })
655            .collect();
656
657        assert!(
658            foreign.is_empty(),
659            "DMs sent by the watcher should not wake watch"
660        );
661    }
662
663    /// System messages from other users wake the watch filter (#452).
664    #[tokio::test]
665    async fn watch_filter_wakes_on_foreign_system_message() {
666        use room_protocol::make_system;
667        let chat = NamedTempFile::new().unwrap();
668        let cursor_dir = TempDir::new().unwrap();
669        let cursor = cursor_dir.path().join("cursor");
670
671        let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
672        crate::history::append(chat.path(), &sys).await.unwrap();
673
674        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
675            .await
676            .unwrap();
677
678        let username = "bob";
679        let foreign: Vec<&Message> = messages
680            .iter()
681            .filter(|m| match m {
682                Message::Message { user, .. } | Message::System { user, .. } => user != username,
683                Message::DirectMessage { to, .. } => to == username,
684                _ => false,
685            })
686            .collect();
687
688        assert_eq!(
689            foreign.len(),
690            1,
691            "system messages from other users should wake watch"
692        );
693        assert!(matches!(foreign[0], Message::System { .. }));
694    }
695
696    /// System messages from the watcher's own username do not wake watch.
697    #[tokio::test]
698    async fn watch_filter_ignores_own_system_message() {
699        use room_protocol::make_system;
700        let chat = NamedTempFile::new().unwrap();
701        let cursor_dir = TempDir::new().unwrap();
702        let cursor = cursor_dir.path().join("cursor");
703
704        let sys = make_system("r", "bob", "bob subscribed (tier: full)");
705        crate::history::append(chat.path(), &sys).await.unwrap();
706
707        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
708            .await
709            .unwrap();
710
711        let username = "bob";
712        let foreign: Vec<&Message> = messages
713            .iter()
714            .filter(|m| match m {
715                Message::Message { user, .. } | Message::System { user, .. } => user != username,
716                Message::DirectMessage { to, .. } => to == username,
717                _ => false,
718            })
719            .collect();
720
721        assert!(
722            foreign.is_empty(),
723            "system messages from self should not wake watch"
724        );
725    }
726
727    /// Watch filter handles a mix of messages, system events, and DMs correctly.
728    #[tokio::test]
729    async fn watch_filter_mixed_message_types() {
730        use crate::message::make_dm;
731        use room_protocol::make_system;
732        let chat = NamedTempFile::new().unwrap();
733        let cursor_dir = TempDir::new().unwrap();
734        let cursor = cursor_dir.path().join("cursor");
735
736        // Foreign regular message
737        let msg = make_message("r", "alice", "hello");
738        // Foreign system message (plugin broadcast)
739        let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
740        // Own system message (should be filtered out)
741        let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
742        // DM addressed to watcher
743        let dm = make_dm("r", "alice", "bob", "private note");
744        // Own regular message (should be filtered out)
745        let own_msg = make_message("r", "bob", "my own message");
746
747        for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
748            crate::history::append(chat.path(), m).await.unwrap();
749        }
750
751        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
752            .await
753            .unwrap();
754
755        let username = "bob";
756        let foreign: Vec<&Message> = messages
757            .iter()
758            .filter(|m| match m {
759                Message::Message { user, .. } | Message::System { user, .. } => user != username,
760                Message::DirectMessage { to, .. } => to == username,
761                _ => false,
762            })
763            .collect();
764
765        assert_eq!(
766            foreign.len(),
767            3,
768            "should see: foreign message + foreign system + DM to self"
769        );
770        assert!(
771            foreign.iter().any(|m| matches!(m, Message::System { .. })),
772            "system message must appear in watch results"
773        );
774        assert!(
775            foreign.iter().any(|m| matches!(m, Message::Message { .. })),
776            "regular foreign message must appear"
777        );
778        assert!(
779            foreign
780                .iter()
781                .any(|m| matches!(m, Message::DirectMessage { .. })),
782            "DM to self must appear"
783        );
784    }
785
786    /// Host sees all DMs in poll regardless of sender/recipient.
787    #[tokio::test]
788    async fn poll_messages_host_sees_all_dms() {
789        use crate::message::make_dm;
790        let chat = NamedTempFile::new().unwrap();
791        let cursor_dir = TempDir::new().unwrap();
792        let cursor = cursor_dir.path().join("cursor");
793
794        let dm_alice_bob = make_dm("r", "alice", "bob", "private");
795        let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
796        crate::history::append(chat.path(), &dm_alice_bob)
797            .await
798            .unwrap();
799        crate::history::append(chat.path(), &dm_carol_dave)
800            .await
801            .unwrap();
802
803        // host "eve" can see both DMs
804        let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
805            .await
806            .unwrap();
807        assert_eq!(result.len(), 2, "host should see all DMs");
808    }
809
810    /// Non-host third party cannot see DMs they are not party to.
811    #[tokio::test]
812    async fn poll_messages_non_host_cannot_see_unrelated_dms() {
813        use crate::message::make_dm;
814        let chat = NamedTempFile::new().unwrap();
815        let cursor_dir = TempDir::new().unwrap();
816        let cursor = cursor_dir.path().join("cursor");
817
818        let dm = make_dm("r", "alice", "bob", "private");
819        crate::history::append(chat.path(), &dm).await.unwrap();
820
821        // carol is not a party and is not host
822        let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
823            .await
824            .unwrap();
825        assert!(result.is_empty(), "non-host third party should not see DM");
826    }
827
828    /// Host reads from pull_messages as well.
829    #[tokio::test]
830    async fn pull_messages_host_sees_all_dms() {
831        use crate::message::make_dm;
832        let chat = NamedTempFile::new().unwrap();
833
834        let dm = make_dm("r", "alice", "bob", "secret");
835        crate::history::append(chat.path(), &dm).await.unwrap();
836
837        let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
838            .await
839            .unwrap();
840        assert_eq!(result.len(), 1, "host should see the DM via pull");
841    }
842
843    // ── poll_messages_multi tests ──────────────────────────────────────────
844
845    /// Multi-room poll merges messages from two rooms sorted by timestamp.
846    #[tokio::test]
847    async fn poll_multi_merges_by_timestamp() {
848        let chat_a = NamedTempFile::new().unwrap();
849        let chat_b = NamedTempFile::new().unwrap();
850
851        let rid_a = format!("test-merge-a-{}", std::process::id());
852        let rid_b = format!("test-merge-b-{}", std::process::id());
853
854        // Append messages with interleaved timestamps
855        let msg_a1 = make_message(&rid_a, "alice", "a1");
856        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
857        let msg_b1 = make_message(&rid_b, "bob", "b1");
858        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
859        let msg_a2 = make_message(&rid_a, "alice", "a2");
860
861        crate::history::append(chat_a.path(), &msg_a1)
862            .await
863            .unwrap();
864        crate::history::append(chat_b.path(), &msg_b1)
865            .await
866            .unwrap();
867        crate::history::append(chat_a.path(), &msg_a2)
868            .await
869            .unwrap();
870
871        let rooms: Vec<(&str, &Path)> = vec![
872            (rid_a.as_str(), chat_a.path()),
873            (rid_b.as_str(), chat_b.path()),
874        ];
875
876        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
877        assert_eq!(result.len(), 3);
878        // Verify timestamp ordering
879        assert!(result[0].ts() <= result[1].ts());
880        assert!(result[1].ts() <= result[2].ts());
881        // First message should be from room-a (earliest)
882        assert_eq!(result[0].room(), &rid_a);
883
884        // Clean up cursor files
885        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
886        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
887    }
888
889    /// Multi-room poll uses per-room cursors (second call returns nothing).
890    #[tokio::test]
891    async fn poll_multi_advances_per_room_cursors() {
892        let chat_a = NamedTempFile::new().unwrap();
893        let chat_b = NamedTempFile::new().unwrap();
894
895        // Use unique room IDs to avoid cursor file collisions with parallel tests
896        let rid_a = format!("test-cursor-a-{}", std::process::id());
897        let rid_b = format!("test-cursor-b-{}", std::process::id());
898
899        let msg_a = make_message(&rid_a, "alice", "hello a");
900        let msg_b = make_message(&rid_b, "bob", "hello b");
901        crate::history::append(chat_a.path(), &msg_a).await.unwrap();
902        crate::history::append(chat_b.path(), &msg_b).await.unwrap();
903
904        let rooms: Vec<(&str, &Path)> = vec![
905            (rid_a.as_str(), chat_a.path()),
906            (rid_b.as_str(), chat_b.path()),
907        ];
908
909        // First poll gets everything
910        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
911        assert_eq!(result.len(), 2);
912
913        // Second poll gets nothing (cursors advanced)
914        let result2 = poll_messages_multi(&rooms, "viewer").await.unwrap();
915        assert!(
916            result2.is_empty(),
917            "second multi-poll should return nothing"
918        );
919
920        // Clean up cursor files
921        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
922        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
923    }
924
925    /// Multi-room poll with one empty room still returns messages from the other.
926    #[tokio::test]
927    async fn poll_multi_one_empty_room() {
928        let chat_a = NamedTempFile::new().unwrap();
929        let chat_b = NamedTempFile::new().unwrap();
930
931        let rid_a = format!("test-empty-a-{}", std::process::id());
932        let rid_b = format!("test-empty-b-{}", std::process::id());
933
934        let msg = make_message(&rid_a, "alice", "only here");
935        crate::history::append(chat_a.path(), &msg).await.unwrap();
936        // chat_b is empty
937
938        let rooms: Vec<(&str, &Path)> = vec![
939            (rid_a.as_str(), chat_a.path()),
940            (rid_b.as_str(), chat_b.path()),
941        ];
942
943        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
944        assert_eq!(result.len(), 1);
945        assert_eq!(result[0].room(), &rid_a);
946
947        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
948        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
949    }
950
951    /// Multi-room poll with no rooms returns nothing.
952    #[tokio::test]
953    async fn poll_multi_no_rooms() {
954        let rooms: Vec<(&str, &Path)> = vec![];
955        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
956        assert!(result.is_empty());
957    }
958
959    /// Multi-room poll filters DMs by viewer across rooms.
960    #[tokio::test]
961    async fn poll_multi_filters_dms_across_rooms() {
962        use crate::message::make_dm;
963        let chat_a = NamedTempFile::new().unwrap();
964        let chat_b = NamedTempFile::new().unwrap();
965
966        let rid_a = format!("test-dm-a-{}", std::process::id());
967        let rid_b = format!("test-dm-b-{}", std::process::id());
968
969        // DM to bob in room-a, DM to carol in room-b
970        let dm_to_bob = make_dm(&rid_a, "alice", "bob", "secret for bob");
971        let dm_to_carol = make_dm(&rid_b, "alice", "carol", "secret for carol");
972        crate::history::append(chat_a.path(), &dm_to_bob)
973            .await
974            .unwrap();
975        crate::history::append(chat_b.path(), &dm_to_carol)
976            .await
977            .unwrap();
978
979        let rooms: Vec<(&str, &Path)> = vec![
980            (rid_a.as_str(), chat_a.path()),
981            (rid_b.as_str(), chat_b.path()),
982        ];
983
984        // bob sees only room-a DM
985        let result = poll_messages_multi(&rooms, "bob").await.unwrap();
986        assert_eq!(result.len(), 1);
987        assert_eq!(result[0].room(), &rid_a);
988
989        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "bob"));
990        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "bob"));
991    }
992
993    // ── cmd_query unit tests ───────────────────────────────────────────────────
994
995    /// cmd_query in historical mode returns all messages (newest-first by default).
996    #[tokio::test]
997    async fn cmd_query_history_returns_all_newest_first() {
998        let chat = NamedTempFile::new().unwrap();
999        let cursor_dir = TempDir::new().unwrap();
1000        let token_dir = TempDir::new().unwrap();
1001
1002        let room_id = format!("test-cqh-{}", std::process::id());
1003        write_token_file(&token_dir, &room_id, "alice-cqh", "tok-cqh");
1004        write_meta_file(&room_id, chat.path());
1005
1006        for i in 0..3u32 {
1007            crate::history::append(
1008                chat.path(),
1009                &make_message(&room_id, "alice-cqh", format!("{i}")),
1010            )
1011            .await
1012            .unwrap();
1013        }
1014
1015        let filter = QueryFilter {
1016            rooms: vec![room_id.clone()],
1017            ascending: false,
1018            ..Default::default()
1019        };
1020        let opts = QueryOptions {
1021            new_only: false,
1022            wait: false,
1023            interval_secs: 5,
1024            mentions_only: false,
1025            since_uuid: None,
1026        };
1027
1028        // cursor should NOT advance (historical mode)
1029        let cursor_path = crate::paths::cursor_path(&room_id, "alice-cqh");
1030        let _ = std::fs::remove_file(&cursor_path);
1031
1032        // Run cmd_query — captures stdout indirectly by ensuring cursor unchanged.
1033        oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqh", filter, opts, &cursor_dir)
1034            .await
1035            .unwrap();
1036
1037        // Cursor must not have been written in historical mode.
1038        assert!(
1039            !cursor_path.exists(),
1040            "historical query must not write a cursor file"
1041        );
1042
1043        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1044        let _ = std::fs::remove_file(&global_token_path("alice-cqh"));
1045    }
1046
1047    /// cmd_query in --new mode advances the cursor.
1048    #[tokio::test]
1049    async fn cmd_query_new_advances_cursor() {
1050        let chat = NamedTempFile::new().unwrap();
1051        let cursor_dir = TempDir::new().unwrap();
1052        let token_dir = TempDir::new().unwrap();
1053
1054        let room_id = format!("test-cqn-{}", std::process::id());
1055        write_token_file(&token_dir, &room_id, "alice-cqn", "tok-cqn");
1056        write_meta_file(&room_id, chat.path());
1057
1058        let msg = make_message(&room_id, "bob", "hello");
1059        crate::history::append(chat.path(), &msg).await.unwrap();
1060
1061        let filter = QueryFilter {
1062            rooms: vec![room_id.clone()],
1063            ascending: true,
1064            ..Default::default()
1065        };
1066        let opts = QueryOptions {
1067            new_only: true,
1068            wait: false,
1069            interval_secs: 5,
1070            mentions_only: false,
1071            since_uuid: None,
1072        };
1073
1074        // First query — should return the message and write cursor.
1075        let result = oneshot_cmd_query_to_vec(
1076            &[room_id.clone()],
1077            "tok-cqn",
1078            filter.clone(),
1079            opts.clone(),
1080            &cursor_dir,
1081        )
1082        .await
1083        .unwrap();
1084        assert_eq!(result.len(), 1);
1085
1086        // Second query — cursor advanced, nothing new.
1087        let result2 =
1088            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
1089                .await
1090                .unwrap();
1091        assert!(
1092            result2.is_empty(),
1093            "second query should return nothing (cursor advanced)"
1094        );
1095
1096        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1097        let _ = std::fs::remove_file(&global_token_path("alice-cqn"));
1098    }
1099
1100    /// cmd_query with content_search only returns matching messages.
1101    #[tokio::test]
1102    async fn cmd_query_content_search_filters() {
1103        let chat = NamedTempFile::new().unwrap();
1104        let cursor_dir = TempDir::new().unwrap();
1105        let token_dir = TempDir::new().unwrap();
1106
1107        let room_id = format!("test-cqs-{}", std::process::id());
1108        write_token_file(&token_dir, &room_id, "alice-cqs", "tok-cqs");
1109        write_meta_file(&room_id, chat.path());
1110
1111        crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
1112            .await
1113            .unwrap();
1114        crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
1115            .await
1116            .unwrap();
1117
1118        let filter = QueryFilter {
1119            rooms: vec![room_id.clone()],
1120            content_search: Some("hello".into()),
1121            ascending: true,
1122            ..Default::default()
1123        };
1124        let opts = QueryOptions {
1125            new_only: false,
1126            wait: false,
1127            interval_secs: 5,
1128            mentions_only: false,
1129            since_uuid: None,
1130        };
1131
1132        let result =
1133            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
1134                .await
1135                .unwrap();
1136        assert_eq!(result.len(), 1);
1137        assert!(result[0].content().unwrap().contains("hello"));
1138
1139        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1140        let _ = std::fs::remove_file(&global_token_path("alice-cqs"));
1141    }
1142
1143    /// cmd_query with user filter only returns messages from that user.
1144    #[tokio::test]
1145    async fn cmd_query_user_filter() {
1146        let chat = NamedTempFile::new().unwrap();
1147        let cursor_dir = TempDir::new().unwrap();
1148        let token_dir = TempDir::new().unwrap();
1149
1150        let room_id = format!("test-cqu-{}", std::process::id());
1151        write_token_file(&token_dir, &room_id, "alice-cqu", "tok-cqu");
1152        write_meta_file(&room_id, chat.path());
1153
1154        crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
1155            .await
1156            .unwrap();
1157        crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
1158            .await
1159            .unwrap();
1160
1161        let filter = QueryFilter {
1162            rooms: vec![room_id.clone()],
1163            users: vec!["bob".into()],
1164            ascending: true,
1165            ..Default::default()
1166        };
1167        let opts = QueryOptions {
1168            new_only: false,
1169            wait: false,
1170            interval_secs: 5,
1171            mentions_only: false,
1172            since_uuid: None,
1173        };
1174
1175        let result =
1176            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
1177                .await
1178                .unwrap();
1179        assert_eq!(result.len(), 1);
1180        assert_eq!(result[0].user(), "bob");
1181
1182        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1183        let _ = std::fs::remove_file(&global_token_path("alice-cqu"));
1184    }
1185
1186    /// cmd_query with limit returns only N messages.
1187    #[tokio::test]
1188    async fn cmd_query_limit() {
1189        let chat = NamedTempFile::new().unwrap();
1190        let cursor_dir = TempDir::new().unwrap();
1191        let token_dir = TempDir::new().unwrap();
1192
1193        let room_id = format!("test-cql-{}", std::process::id());
1194        write_token_file(&token_dir, &room_id, "alice-cql", "tok-cql");
1195        write_meta_file(&room_id, chat.path());
1196
1197        for i in 0..5u32 {
1198            crate::history::append(
1199                chat.path(),
1200                &make_message(&room_id, "bob", format!("msg {i}")),
1201            )
1202            .await
1203            .unwrap();
1204        }
1205
1206        let filter = QueryFilter {
1207            rooms: vec![room_id.clone()],
1208            limit: Some(2),
1209            ascending: false,
1210            ..Default::default()
1211        };
1212        let opts = QueryOptions {
1213            new_only: false,
1214            wait: false,
1215            interval_secs: 5,
1216            mentions_only: false,
1217            since_uuid: None,
1218        };
1219
1220        let result =
1221            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
1222                .await
1223                .unwrap();
1224        assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
1225
1226        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1227        let _ = std::fs::remove_file(&global_token_path("alice-cql"));
1228    }
1229
1230    // ── Test helpers ──────────────────────────────────────────────────────────
1231
1232    fn global_token_path(username: &str) -> PathBuf {
1233        crate::paths::global_token_path(username)
1234    }
1235
1236    fn write_token_file(_dir: &TempDir, _room_id: &str, username: &str, token: &str) {
1237        let path = global_token_path(username);
1238        if let Some(parent) = path.parent() {
1239            std::fs::create_dir_all(parent).unwrap();
1240        }
1241        let data = serde_json::json!({ "username": username, "token": token });
1242        std::fs::write(&path, format!("{data}\n")).unwrap();
1243    }
1244
1245    fn write_meta_file(room_id: &str, chat_path: &Path) {
1246        let meta_path = crate::paths::room_meta_path(room_id);
1247        if let Some(parent) = meta_path.parent() {
1248            std::fs::create_dir_all(parent).unwrap();
1249        }
1250        let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
1251        std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
1252    }
1253
1254    /// Run cmd_query and collect returned messages.
1255    ///
1256    /// Since cmd_query writes to stdout, we wrap it to capture results by
1257    /// re-reading the chat file with the same filter in historical mode.
1258    /// For `new_only` tests we verify the cursor state instead.
1259    async fn oneshot_cmd_query_to_vec(
1260        room_ids: &[String],
1261        token: &str,
1262        filter: QueryFilter,
1263        opts: QueryOptions,
1264        _cursor_dir: &TempDir,
1265    ) -> anyhow::Result<Vec<Message>> {
1266        // Snapshot cursor before run.
1267        let cursor_before = room_ids
1268            .first()
1269            .map(|id| {
1270                // Resolve username by reading the token file for this room.
1271                super::super::token::username_from_token(token)
1272                    .ok()
1273                    .map(|u| {
1274                        let p = crate::paths::cursor_path(id, &u);
1275                        std::fs::read_to_string(&p).ok()
1276                    })
1277                    .flatten()
1278            })
1279            .flatten();
1280
1281        // Run cmd_query (side effect: may update cursor).
1282        cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
1283
1284        // Snapshot cursor after run.
1285        let cursor_after = room_ids
1286            .first()
1287            .map(|id| {
1288                super::super::token::username_from_token(token)
1289                    .ok()
1290                    .map(|u| {
1291                        let p = crate::paths::cursor_path(id, &u);
1292                        std::fs::read_to_string(&p).ok()
1293                    })
1294                    .flatten()
1295            })
1296            .flatten();
1297
1298        // Reconstruct what cmd_query would have returned.
1299        // For historical mode: re-run with same filter and collect messages.
1300        // For new_only mode: reload history and apply filter with the "before" cursor.
1301        if !opts.new_only && !opts.wait {
1302            // Historical: reload and reapply filter.
1303            let mut all: Vec<Message> = Vec::new();
1304            for room_id in room_ids {
1305                let meta_path = crate::paths::room_meta_path(room_id);
1306                let chat_path = chat_path_from_meta(room_id, &meta_path);
1307                let msgs = history::load(&chat_path).await?;
1308                all.extend(msgs);
1309            }
1310            let username = username_from_token(token).unwrap_or_default();
1311            let mut result: Vec<Message> = all
1312                .into_iter()
1313                .filter(|m| filter.matches(m, m.room()))
1314                .filter(|m| match m {
1315                    Message::DirectMessage { user, to, .. } => user == &username || to == &username,
1316                    _ => true,
1317                })
1318                .collect();
1319            apply_sort_and_limit(&mut result, &filter);
1320            Ok(result)
1321        } else {
1322            // new_only mode: reconstruct returned messages by replaying history
1323            // from cursor_before (before the call advanced it).
1324            let advanced = cursor_after != cursor_before;
1325            if advanced {
1326                let room_id = &room_ids[0];
1327                let meta_path = crate::paths::room_meta_path(room_id);
1328                let chat_path = chat_path_from_meta(room_id, &meta_path);
1329                let all = history::load(&chat_path).await?;
1330                // Find start from the pre-run cursor UUID.
1331                let start = match &cursor_before {
1332                    Some(id) => all
1333                        .iter()
1334                        .position(|m| m.id() == id.trim())
1335                        .map(|i| i + 1)
1336                        .unwrap_or(0),
1337                    None => 0,
1338                };
1339                let filtered: Vec<Message> = all[start..]
1340                    .iter()
1341                    .filter(|m| filter.matches(m, m.room()))
1342                    .cloned()
1343                    .collect();
1344                Ok(filtered)
1345            } else {
1346                Ok(vec![])
1347            }
1348        }
1349    }
1350
1351    /// username_from_token returns an error for an unknown token.
1352    #[test]
1353    fn unknown_token_returns_error() {
1354        let result = super::super::token::username_from_token("bad-token-nonexistent");
1355        assert!(result.is_err());
1356        assert!(result
1357            .unwrap_err()
1358            .to_string()
1359            .contains("token not recognised"));
1360    }
1361
1362    // ── subscription tier filtering tests ──────────────────────────────────
1363
1364    /// load_user_tier returns Full when no subscription file exists.
1365    #[test]
1366    fn load_user_tier_missing_file_returns_full() {
1367        // Use a room ID that will never have a subscription file on disk.
1368        let tier = load_user_tier("nonexistent-room-tier-test", "alice");
1369        assert_eq!(tier, SubscriptionTier::Full);
1370    }
1371
1372    /// load_user_tier returns the persisted tier when the file exists.
1373    #[test]
1374    fn load_user_tier_returns_persisted_tier() {
1375        let state_dir = crate::paths::room_state_dir();
1376        let _ = std::fs::create_dir_all(&state_dir);
1377        let room_id = format!("test-tier-load-{}", std::process::id());
1378        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1379
1380        let mut map = std::collections::HashMap::new();
1381        map.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1382        map.insert("bob".to_string(), SubscriptionTier::Unsubscribed);
1383        let json = serde_json::to_string_pretty(&map).unwrap();
1384        std::fs::write(&sub_path, json).unwrap();
1385
1386        assert_eq!(
1387            load_user_tier(&room_id, "alice"),
1388            SubscriptionTier::MentionsOnly
1389        );
1390        assert_eq!(
1391            load_user_tier(&room_id, "bob"),
1392            SubscriptionTier::Unsubscribed
1393        );
1394        // Unknown user defaults to Full.
1395        assert_eq!(load_user_tier(&room_id, "carol"), SubscriptionTier::Full);
1396
1397        let _ = std::fs::remove_file(&sub_path);
1398    }
1399
1400    /// apply_tier_filter with Full keeps all messages.
1401    #[test]
1402    fn apply_tier_filter_full_keeps_all() {
1403        let mut msgs = vec![
1404            make_message("r", "alice", "hello"),
1405            make_message("r", "bob", "world"),
1406        ];
1407        apply_tier_filter(&mut msgs, SubscriptionTier::Full, "carol");
1408        assert_eq!(msgs.len(), 2);
1409    }
1410
1411    /// apply_tier_filter with MentionsOnly keeps only @mentions.
1412    #[test]
1413    fn apply_tier_filter_mentions_only_filters() {
1414        let mut msgs = vec![
1415            make_message("r", "alice", "hey @carol check this"),
1416            make_message("r", "bob", "unrelated message"),
1417            make_message("r", "dave", "also @carol"),
1418        ];
1419        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1420        assert_eq!(msgs.len(), 2);
1421        assert!(msgs[0].content().unwrap().contains("@carol"));
1422        assert!(msgs[1].content().unwrap().contains("@carol"));
1423    }
1424
1425    /// apply_tier_filter with Unsubscribed clears all messages.
1426    #[test]
1427    fn apply_tier_filter_unsubscribed_clears_all() {
1428        let mut msgs = vec![
1429            make_message("r", "alice", "hey @carol"),
1430            make_message("r", "bob", "world"),
1431        ];
1432        apply_tier_filter(&mut msgs, SubscriptionTier::Unsubscribed, "carol");
1433        assert!(msgs.is_empty());
1434    }
1435
1436    /// apply_tier_filter with MentionsOnly and no mentions returns empty.
1437    #[test]
1438    fn apply_tier_filter_mentions_only_no_mentions_returns_empty() {
1439        let mut msgs = vec![
1440            make_message("r", "alice", "hello"),
1441            make_message("r", "bob", "world"),
1442        ];
1443        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1444        assert!(msgs.is_empty());
1445    }
1446
1447    /// cmd_query with Unsubscribed tier and public_only=true still returns messages.
1448    #[tokio::test]
1449    async fn cmd_query_public_bypasses_tier() {
1450        let chat = NamedTempFile::new().unwrap();
1451        let token_dir = TempDir::new().unwrap();
1452        let cursor_dir = TempDir::new().unwrap();
1453
1454        let room_id = format!("test-pub-tier-{}", std::process::id());
1455        write_token_file(&token_dir, &room_id, "alice-pub", "tok-pub-tier");
1456        write_meta_file(&room_id, chat.path());
1457
1458        // Write subscription map marking alice-pub as Unsubscribed.
1459        let state_dir = crate::paths::room_state_dir();
1460        let _ = std::fs::create_dir_all(&state_dir);
1461        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1462        let mut map = std::collections::HashMap::new();
1463        map.insert("alice-pub".to_string(), SubscriptionTier::Unsubscribed);
1464        std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
1465
1466        // Add a message.
1467        crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
1468            .await
1469            .unwrap();
1470
1471        // Query with public_only=true should bypass tier and return the message.
1472        let filter = QueryFilter {
1473            rooms: vec![room_id.clone()],
1474            public_only: true,
1475            ascending: true,
1476            ..Default::default()
1477        };
1478        let opts = QueryOptions {
1479            new_only: false,
1480            wait: false,
1481            interval_secs: 5,
1482            mentions_only: false,
1483            since_uuid: None,
1484        };
1485
1486        let result = oneshot_cmd_query_to_vec(
1487            &[room_id.clone()],
1488            "tok-pub-tier",
1489            filter,
1490            opts,
1491            &cursor_dir,
1492        )
1493        .await
1494        .unwrap();
1495        assert_eq!(
1496            result.len(),
1497            1,
1498            "public flag should bypass Unsubscribed tier"
1499        );
1500
1501        let _ = std::fs::remove_file(&sub_path);
1502        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1503        let _ = std::fs::remove_file(&global_token_path("alice-pub"));
1504    }
1505
1506    /// MentionsOnly tier sets mention_user filter, narrowing results to @mentions.
1507    #[test]
1508    fn mentions_only_tier_sets_mention_user_on_filter() {
1509        // Verify the tier logic: when tier is MentionsOnly and mention_user is
1510        // not already set, it should be set to the username.
1511        let mut filter = QueryFilter::default();
1512        let tier = SubscriptionTier::MentionsOnly;
1513
1514        // Simulate what cmd_query does.
1515        match tier {
1516            SubscriptionTier::MentionsOnly => {
1517                if filter.mention_user.is_none() {
1518                    filter.mention_user = Some("alice".to_string());
1519                }
1520            }
1521            _ => {}
1522        }
1523
1524        assert_eq!(filter.mention_user, Some("alice".to_string()));
1525
1526        // Now verify with apply_tier_filter that messages are correctly narrowed.
1527        let mut msgs = vec![
1528            make_message("r", "bob", "hey @alice look"),
1529            make_message("r", "bob", "unrelated chatter"),
1530        ];
1531        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "alice");
1532        assert_eq!(msgs.len(), 1);
1533        assert!(msgs[0].content().unwrap().contains("@alice"));
1534    }
1535
1536    /// MentionsOnly tier does not override an existing mention_user filter.
1537    #[test]
1538    fn mentions_only_tier_preserves_existing_mention_user() {
1539        let mut filter = QueryFilter {
1540            mention_user: Some("bob".to_string()),
1541            ..Default::default()
1542        };
1543
1544        // MentionsOnly should not overwrite the existing filter.
1545        match SubscriptionTier::MentionsOnly {
1546            SubscriptionTier::MentionsOnly => {
1547                if filter.mention_user.is_none() {
1548                    filter.mention_user = Some("alice".to_string());
1549                }
1550            }
1551            _ => {}
1552        }
1553
1554        assert_eq!(
1555            filter.mention_user,
1556            Some("bob".to_string()),
1557            "existing mention_user filter should be preserved"
1558        );
1559    }
1560
1561    // ── per-room subscription tier filtering tests ─────────────────────────
1562
1563    #[test]
1564    fn per_room_tier_filter_full_keeps_all() {
1565        let mut msgs = vec![
1566            make_message("dev", "alice", "hello from dev"),
1567            make_message("lobby", "bob", "hello from lobby"),
1568        ];
1569        // No subscription files → defaults to Full for both rooms.
1570        let rooms = vec![
1571            "nonexistent-perroom-full-1".to_string(),
1572            "nonexistent-perroom-full-2".to_string(),
1573        ];
1574        apply_per_room_tier_filter(&mut msgs, &rooms, "carol");
1575        assert_eq!(msgs.len(), 2);
1576    }
1577
1578    #[test]
1579    fn per_room_tier_filter_mixed_tiers() {
1580        let state_dir = crate::paths::room_state_dir();
1581        let _ = std::fs::create_dir_all(&state_dir);
1582
1583        let room_full = format!("perroom-mixed-full-{}", std::process::id());
1584        let room_unsub = format!("perroom-mixed-unsub-{}", std::process::id());
1585        let room_mentions = format!("perroom-mixed-ment-{}", std::process::id());
1586
1587        // Write subscription maps: room_full=Full (default), room_unsub=Unsubscribed, room_mentions=MentionsOnly
1588        let sub_unsub = crate::paths::broker_subscriptions_path(&state_dir, &room_unsub);
1589        let mut map_unsub = std::collections::HashMap::new();
1590        map_unsub.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1591        std::fs::write(&sub_unsub, serde_json::to_string(&map_unsub).unwrap()).unwrap();
1592
1593        let sub_ment = crate::paths::broker_subscriptions_path(&state_dir, &room_mentions);
1594        let mut map_ment = std::collections::HashMap::new();
1595        map_ment.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1596        std::fs::write(&sub_ment, serde_json::to_string(&map_ment).unwrap()).unwrap();
1597
1598        let mut msgs = vec![
1599            make_message(&room_full, "bob", "visible in full room"),
1600            make_message(&room_unsub, "bob", "invisible — unsubscribed"),
1601            make_message(&room_mentions, "bob", "no mention — filtered"),
1602            make_message(&room_mentions, "bob", "hey @alice check this"),
1603        ];
1604
1605        let rooms = vec![room_full.clone(), room_unsub.clone(), room_mentions.clone()];
1606        apply_per_room_tier_filter(&mut msgs, &rooms, "alice");
1607
1608        // Only the Full room message and the MentionsOnly room @alice message survive.
1609        assert_eq!(msgs.len(), 2);
1610        assert!(msgs[0].content().unwrap().contains("visible in full room"));
1611        assert!(msgs[1].content().unwrap().contains("@alice"));
1612
1613        // Cleanup
1614        let _ = std::fs::remove_file(&sub_unsub);
1615        let _ = std::fs::remove_file(&sub_ment);
1616    }
1617
1618    #[test]
1619    fn per_room_tier_filter_unknown_room_defaults_to_full() {
1620        let mut msgs = vec![make_message("mystery", "bob", "hello")];
1621        // Room not in the room_ids list at all — tier defaults to Full.
1622        apply_per_room_tier_filter(&mut msgs, &["other".to_string()], "alice");
1623        assert_eq!(msgs.len(), 1);
1624    }
1625
1626    /// pull_messages returns the last n entries without moving the cursor.
1627    #[tokio::test]
1628    async fn pull_messages_returns_tail_without_cursor_change() {
1629        let chat = NamedTempFile::new().unwrap();
1630        let cursor_dir = TempDir::new().unwrap();
1631        let cursor = cursor_dir.path().join("cursor");
1632
1633        for i in 0..5u32 {
1634            crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
1635                .await
1636                .unwrap();
1637        }
1638
1639        let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
1640        assert_eq!(pulled.len(), 3);
1641
1642        // cursor untouched — poll still returns all 5
1643        let polled = poll_messages(chat.path(), &cursor, None, None, None)
1644            .await
1645            .unwrap();
1646        assert_eq!(polled.len(), 5);
1647    }
1648}