Skip to main content

room_cli/oneshot/
poll.rs

1use std::path::{Path, PathBuf};
2
3use room_protocol::SubscriptionTier;
4
5use crate::{
6    broker::commands::load_subscription_map, history, message::Message, paths, query::QueryFilter,
7};
8
9use super::token::{read_cursor, username_from_token, write_cursor};
10
11// ── Subscription tier lookup ──────────────────────────────────────────────────
12
13/// Look up a user's subscription tier for a room from the persisted
14/// subscription map on disk.
15///
16/// Returns `Full` when the subscription file is missing, corrupt, or the user
17/// has no entry — unsubscribed users must have been explicitly recorded.
18fn load_user_tier(room_id: &str, username: &str) -> SubscriptionTier {
19    let state_dir = paths::room_state_dir();
20    let sub_path = paths::broker_subscriptions_path(&state_dir, room_id);
21    let map = load_subscription_map(&sub_path);
22    map.get(username).copied().unwrap_or(SubscriptionTier::Full)
23}
24
25/// Apply subscription-tier filtering to a message list in place.
26///
27/// - `Full` — no filtering (all messages pass).
28/// - `MentionsOnly` — keep only messages that @mention `username`.
29/// - `Unsubscribed` — remove all messages.
30fn apply_tier_filter(messages: &mut Vec<Message>, tier: SubscriptionTier, username: &str) {
31    match tier {
32        SubscriptionTier::Full => {}
33        SubscriptionTier::MentionsOnly => {
34            messages.retain(|m| m.mentions().iter().any(|mention| mention == username));
35        }
36        SubscriptionTier::Unsubscribed => {
37            messages.clear();
38        }
39    }
40}
41
42/// Apply per-room subscription-tier filtering to a message list in place.
43///
44/// Looks up the user's tier for each message's room and filters accordingly.
45/// Each room is checked independently — a user can be `Full` in one room and
46/// `Unsubscribed` in another.
47fn apply_per_room_tier_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
48    use std::collections::HashMap;
49    let tiers: HashMap<&str, SubscriptionTier> = room_ids
50        .iter()
51        .map(|r| (r.as_str(), load_user_tier(r, username)))
52        .collect();
53
54    messages.retain(|m| {
55        let tier = tiers
56            .get(m.room())
57            .copied()
58            .unwrap_or(SubscriptionTier::Full);
59        match tier {
60            SubscriptionTier::Full => true,
61            SubscriptionTier::MentionsOnly => {
62                m.mentions().iter().any(|mention| mention == username)
63            }
64            SubscriptionTier::Unsubscribed => false,
65        }
66    });
67}
68
69// ── Query engine types ─────────────────────────────────────────────────────────
70
71/// Options for the `room query` subcommand (and `poll`/`watch` aliases).
72#[derive(Debug, Clone)]
73pub struct QueryOptions {
74    /// Only return messages since the last poll cursor; advances the cursor
75    /// after printing results.
76    pub new_only: bool,
77    /// Block until at least one new foreign message arrives (implies `new_only`).
78    pub wait: bool,
79    /// Poll interval in seconds when `wait` is true.
80    pub interval_secs: u64,
81    /// If `true`, only return messages that @mention the calling user
82    /// (username resolved from the token).
83    pub mentions_only: bool,
84    /// Override the cursor with this legacy message UUID (used by the `poll`
85    /// alias `--since` flag, which predates the `room:seq` format).
86    pub since_uuid: Option<String>,
87}
88
89/// Return all messages from `chat_path` after the message with ID `since` (exclusive).
90///
91/// If `since` is `None`, the cursor file at `cursor_path` is checked for a previously
92/// stored position. A `None` cursor means all messages are returned.
93///
94/// `viewer` is the username of the caller. When `Some`, `DirectMessage` entries are
95/// filtered using [`Message::is_visible_to`], which grants access to the sender,
96/// recipient, and the room host. Pass `None` to skip DM filtering (e.g. in tests
97/// that don't involve DMs).
98///
99/// `host` is the room host username (typically the first user to join). When `Some`,
100/// the host can see all DMs regardless of sender/recipient.
101///
102/// The cursor file is updated to the last returned message's ID after each successful call.
103pub async fn poll_messages(
104    chat_path: &Path,
105    cursor_path: &Path,
106    viewer: Option<&str>,
107    host: Option<&str>,
108    since: Option<&str>,
109) -> anyhow::Result<Vec<Message>> {
110    let effective_since: Option<String> = since
111        .map(|s| s.to_owned())
112        .or_else(|| read_cursor(cursor_path));
113
114    let messages = history::load(chat_path).await?;
115
116    let start = match &effective_since {
117        Some(id) => messages
118            .iter()
119            .position(|m| m.id() == id)
120            .map(|i| i + 1)
121            .unwrap_or(0),
122        None => 0,
123    };
124
125    let result: Vec<Message> = messages[start..]
126        .iter()
127        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
128        .cloned()
129        .collect();
130
131    if let Some(last) = result.last() {
132        write_cursor(cursor_path, last.id())?;
133    }
134
135    Ok(result)
136}
137
138/// Return the last `n` messages from history without updating the poll cursor.
139///
140/// DM entries are filtered using [`Message::is_visible_to`] so that `viewer` only
141/// sees messages they are party to (sender, recipient, or host). Pass `None` to
142/// skip DM filtering.
143///
144/// `host` is the room host username. When `Some`, the host can see all DMs.
145pub async fn pull_messages(
146    chat_path: &Path,
147    n: usize,
148    viewer: Option<&str>,
149    host: Option<&str>,
150) -> anyhow::Result<Vec<Message>> {
151    let clamped = n.min(200);
152    let all = history::tail(chat_path, clamped).await?;
153    let visible: Vec<Message> = all
154        .into_iter()
155        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
156        .collect();
157    Ok(visible)
158}
159
160/// One-shot pull subcommand: print the last N messages from history as NDJSON.
161///
162/// Reads from the chat file directly (no broker connection required).
163/// Does **not** update the poll cursor.
164pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
165    let username = username_from_token(room_id, token)?;
166    let meta_path = paths::room_meta_path(room_id);
167    let chat_path = chat_path_from_meta(room_id, &meta_path);
168
169    let host = read_host_from_meta(&meta_path);
170    let mut messages = pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
171    let tier = load_user_tier(room_id, &username);
172    apply_tier_filter(&mut messages, tier, &username);
173    for msg in &messages {
174        println!("{}", serde_json::to_string(msg)?);
175    }
176    Ok(())
177}
178
179/// Watch subcommand: poll in a loop until at least one foreign `Message` arrives.
180///
181/// Reads the caller's username from the session token file. Polls every
182/// `interval_secs` seconds, filtering out own messages and non-`Message` variants.
183/// Exits after printing the first batch of foreign messages as NDJSON.
184/// Shares the cursor file with `room poll` — the two subcommands never re-deliver
185/// the same message.
186pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
187    let username = username_from_token(room_id, token)?;
188    let meta_path = paths::room_meta_path(room_id);
189    let chat_path = chat_path_from_meta(room_id, &meta_path);
190    let cursor_path = paths::cursor_path(room_id, &username);
191    let host = read_host_from_meta(&meta_path);
192
193    loop {
194        let messages = poll_messages(
195            &chat_path,
196            &cursor_path,
197            Some(&username),
198            host.as_deref(),
199            None,
200        )
201        .await?;
202
203        let foreign: Vec<&Message> = messages
204            .iter()
205            .filter(|m| match m {
206                Message::Message { user, .. } => user != &username,
207                Message::DirectMessage { to, .. } => to == &username,
208                _ => false,
209            })
210            .collect();
211
212        if !foreign.is_empty() {
213            for msg in foreign {
214                println!("{}", serde_json::to_string(msg)?);
215            }
216            return Ok(());
217        }
218
219        tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
220    }
221}
222
223/// One-shot poll subcommand: read messages since cursor, print as NDJSON, update cursor.
224///
225/// Reads the caller's username from the session token file. When `mentions_only` is
226/// true, only messages that @mention the caller's username are printed (cursor still
227/// advances past all messages).
228pub async fn cmd_poll(
229    room_id: &str,
230    token: &str,
231    since: Option<String>,
232    mentions_only: bool,
233) -> anyhow::Result<()> {
234    let username = username_from_token(room_id, token)?;
235    let meta_path = paths::room_meta_path(room_id);
236    let chat_path = chat_path_from_meta(room_id, &meta_path);
237    let cursor_path = paths::cursor_path(room_id, &username);
238    let host = read_host_from_meta(&meta_path);
239
240    let messages = poll_messages(
241        &chat_path,
242        &cursor_path,
243        Some(&username),
244        host.as_deref(),
245        since.as_deref(),
246    )
247    .await?;
248    for msg in &messages {
249        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
250            continue;
251        }
252        println!("{}", serde_json::to_string(msg)?);
253    }
254    Ok(())
255}
256
257/// Poll multiple rooms, returning messages merged by timestamp.
258///
259/// Each room uses its own cursor file under `~/.room/state/`.
260/// Messages are sorted by timestamp across all rooms. Each message already carries
261/// a `room` field, so the caller can distinguish sources.
262pub async fn poll_messages_multi(
263    rooms: &[(&str, &Path)],
264    username: &str,
265) -> anyhow::Result<Vec<Message>> {
266    let mut all_messages: Vec<Message> = Vec::new();
267
268    for &(room_id, chat_path) in rooms {
269        let cursor_path = paths::cursor_path(room_id, username);
270        let meta_path = paths::room_meta_path(room_id);
271        let host = read_host_from_meta(&meta_path);
272        let msgs = poll_messages(
273            chat_path,
274            &cursor_path,
275            Some(username),
276            host.as_deref(),
277            None,
278        )
279        .await?;
280        all_messages.extend(msgs);
281    }
282
283    all_messages.sort_by(|a, b| a.ts().cmp(b.ts()));
284    Ok(all_messages)
285}
286
287/// One-shot multi-room poll subcommand: poll multiple rooms, merge by timestamp, print NDJSON.
288///
289/// Resolves the username from the token by trying each room in order. Each room's cursor
290/// is updated independently.
291pub async fn cmd_poll_multi(
292    room_ids: &[String],
293    token: &str,
294    mentions_only: bool,
295) -> anyhow::Result<()> {
296    // Resolve username by trying the token against each room
297    let username = resolve_username_from_rooms(room_ids, token)?;
298
299    // Resolve chat paths for all rooms
300    let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
301    for room_id in room_ids {
302        let meta_path = paths::room_meta_path(room_id);
303        let chat_path = chat_path_from_meta(room_id, &meta_path);
304        rooms.push((room_id.as_str(), chat_path));
305    }
306
307    let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
308    let messages = poll_messages_multi(&room_refs, &username).await?;
309    for msg in &messages {
310        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
311            continue;
312        }
313        println!("{}", serde_json::to_string(msg)?);
314    }
315    Ok(())
316}
317
318// ── cmd_query ─────────────────────────────────────────────────────────────────
319
320/// Unified query entry point for `room query` and the `poll`/`watch` aliases.
321///
322/// Three modes:
323/// - **Historical** (`new_only = false, wait = false`): reads full history,
324///   applies filter, no cursor update.
325/// - **New** (`new_only = true, wait = false`): reads since last cursor,
326///   applies filter, advances cursor.
327/// - **Wait** (`wait = true`): loops until at least one foreign message passes
328///   the filter, then prints and exits.
329///
330/// `room_ids` lists the rooms to read. `filter.rooms` may further restrict the
331/// output but does not affect which files are opened.
332pub async fn cmd_query(
333    room_ids: &[String],
334    token: &str,
335    mut filter: QueryFilter,
336    opts: QueryOptions,
337) -> anyhow::Result<()> {
338    if room_ids.is_empty() {
339        anyhow::bail!("at least one room ID is required");
340    }
341
342    let username = resolve_username_from_rooms(room_ids, token)?;
343
344    // Resolve mention_user from caller if mentions_only is requested.
345    if opts.mentions_only {
346        filter.mention_user = Some(username.clone());
347    }
348
349    if opts.wait || opts.new_only {
350        cmd_query_new(room_ids, &username, filter, opts).await
351    } else {
352        cmd_query_history(room_ids, &username, filter).await
353    }
354}
355
356/// Cursor-based (new / wait) query mode.
357async fn cmd_query_new(
358    room_ids: &[String],
359    username: &str,
360    filter: QueryFilter,
361    opts: QueryOptions,
362) -> anyhow::Result<()> {
363    loop {
364        let messages: Vec<Message> = if room_ids.len() == 1 {
365            let room_id = &room_ids[0];
366            let meta_path = paths::room_meta_path(room_id);
367            let chat_path = chat_path_from_meta(room_id, &meta_path);
368            let cursor_path = paths::cursor_path(room_id, username);
369            let host = read_host_from_meta(&meta_path);
370            poll_messages(
371                &chat_path,
372                &cursor_path,
373                Some(username),
374                host.as_deref(),
375                opts.since_uuid.as_deref(),
376            )
377            .await?
378        } else {
379            let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
380            for room_id in room_ids {
381                let meta_path = paths::room_meta_path(room_id);
382                let chat_path = chat_path_from_meta(room_id, &meta_path);
383                rooms_info.push((room_id.clone(), chat_path));
384            }
385            let room_refs: Vec<(&str, &Path)> = rooms_info
386                .iter()
387                .map(|(id, p)| (id.as_str(), p.as_path()))
388                .collect();
389            poll_messages_multi(&room_refs, username).await?
390        };
391
392        // Apply query filter, per-room subscription tiers, then sort + limit.
393        let mut filtered: Vec<Message> = messages
394            .into_iter()
395            .filter(|m| filter.matches(m, m.room()))
396            .collect();
397
398        if !filter.public_only {
399            apply_per_room_tier_filter(&mut filtered, room_ids, username);
400        }
401
402        apply_sort_and_limit(&mut filtered, &filter);
403
404        if opts.wait {
405            // Only wake on foreign messages.
406            let foreign: Vec<&Message> = filtered
407                .iter()
408                .filter(|m| match m {
409                    Message::Message { user, .. } => user != username,
410                    Message::DirectMessage { to, .. } => to == username,
411                    _ => false,
412                })
413                .collect();
414
415            if !foreign.is_empty() {
416                for msg in foreign {
417                    println!("{}", serde_json::to_string(msg)?);
418                }
419                return Ok(());
420            }
421        } else {
422            for msg in &filtered {
423                println!("{}", serde_json::to_string(msg)?);
424            }
425            return Ok(());
426        }
427
428        tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
429    }
430}
431
432/// Historical (no-cursor) query mode.
433async fn cmd_query_history(
434    room_ids: &[String],
435    username: &str,
436    filter: QueryFilter,
437) -> anyhow::Result<()> {
438    let mut all_messages: Vec<Message> = Vec::new();
439
440    for room_id in room_ids {
441        let meta_path = paths::room_meta_path(room_id);
442        let chat_path = chat_path_from_meta(room_id, &meta_path);
443        let messages = history::load(&chat_path).await?;
444        all_messages.extend(messages);
445    }
446
447    // DM privacy filter: viewer only sees their own DMs.
448    let mut filtered: Vec<Message> = all_messages
449        .into_iter()
450        .filter(|m| filter.matches(m, m.room()))
451        .filter(|m| match m {
452            Message::DirectMessage { user, to, .. } => user == username || to == username,
453            _ => true,
454        })
455        .collect();
456
457    if !filter.public_only {
458        apply_per_room_tier_filter(&mut filtered, room_ids, username);
459    }
460
461    apply_sort_and_limit(&mut filtered, &filter);
462
463    // If a specific target_id was requested and nothing was found, report an error.
464    if filtered.is_empty() {
465        if let Some((ref target_room, target_seq)) = filter.target_id {
466            use room_protocol::format_message_id;
467            anyhow::bail!(
468                "message not found: {}",
469                format_message_id(target_room, target_seq)
470            );
471        }
472    }
473
474    for msg in &filtered {
475        println!("{}", serde_json::to_string(msg)?);
476    }
477    Ok(())
478}
479
480/// Apply sort order and optional limit to a message list in place.
481fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
482    if filter.ascending {
483        messages.sort_by(|a, b| a.ts().cmp(b.ts()));
484    } else {
485        messages.sort_by(|a, b| b.ts().cmp(a.ts()));
486    }
487    if let Some(limit) = filter.limit {
488        messages.truncate(limit);
489    }
490}
491
492/// Try to resolve a username from a token by scanning token files for each room.
493///
494/// Returns the username from the first room where the token is found.
495fn resolve_username_from_rooms(room_ids: &[String], token: &str) -> anyhow::Result<String> {
496    for room_id in room_ids {
497        if let Ok(username) = username_from_token(room_id, token) {
498            return Ok(username);
499        }
500    }
501    anyhow::bail!("token not recognised in any of the specified rooms — run: room join <username>")
502}
503
504/// Read the room host username from the meta file, if present.
505///
506/// Returns `None` if the meta file does not exist, cannot be parsed, or has no
507/// `"host"` field. Callers should treat `None` the same as no host information.
508pub(super) fn read_host_from_meta(meta_path: &Path) -> Option<String> {
509    if !meta_path.exists() {
510        return None;
511    }
512    let data = std::fs::read_to_string(meta_path).ok()?;
513    let v: serde_json::Value = serde_json::from_str(&data).ok()?;
514    v["host"].as_str().map(str::to_owned)
515}
516
517pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
518    if meta_path.exists() {
519        if let Ok(data) = std::fs::read_to_string(meta_path) {
520            if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
521                if let Some(p) = v["chat_path"].as_str() {
522                    return PathBuf::from(p);
523                }
524            }
525        }
526    }
527    history::default_chat_path(room_id)
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use crate::message::make_message;
534    use tempfile::{NamedTempFile, TempDir};
535
536    /// poll_messages with no cursor and no since returns all messages.
537    #[tokio::test]
538    async fn poll_messages_no_cursor_returns_all() {
539        let chat = NamedTempFile::new().unwrap();
540        let cursor_dir = TempDir::new().unwrap();
541        let cursor = cursor_dir.path().join("cursor");
542
543        let msg = make_message("r", "alice", "hello");
544        crate::history::append(chat.path(), &msg).await.unwrap();
545
546        let result = poll_messages(chat.path(), &cursor, None, None, None)
547            .await
548            .unwrap();
549        assert_eq!(result.len(), 1);
550        assert_eq!(result[0].id(), msg.id());
551    }
552
553    /// poll_messages advances the cursor so a second call returns nothing.
554    #[tokio::test]
555    async fn poll_messages_advances_cursor() {
556        let chat = NamedTempFile::new().unwrap();
557        let cursor_dir = TempDir::new().unwrap();
558        let cursor = cursor_dir.path().join("cursor");
559
560        let msg = make_message("r", "alice", "hello");
561        crate::history::append(chat.path(), &msg).await.unwrap();
562
563        poll_messages(chat.path(), &cursor, None, None, None)
564            .await
565            .unwrap();
566
567        let second = poll_messages(chat.path(), &cursor, None, None, None)
568            .await
569            .unwrap();
570        assert!(
571            second.is_empty(),
572            "cursor should have advanced past the first message"
573        );
574    }
575
576    /// DM visibility: viewer only sees DMs they sent or received.
577    #[tokio::test]
578    async fn poll_messages_filters_dms_by_viewer() {
579        use crate::message::make_dm;
580        let chat = NamedTempFile::new().unwrap();
581        let cursor_dir = TempDir::new().unwrap();
582        let cursor = cursor_dir.path().join("cursor");
583
584        let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
585        let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
586        crate::history::append(chat.path(), &dm_alice_bob)
587            .await
588            .unwrap();
589        crate::history::append(chat.path(), &dm_alice_carol)
590            .await
591            .unwrap();
592
593        // bob sees only his DM
594        let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
595            .await
596            .unwrap();
597        assert_eq!(result.len(), 1);
598        assert_eq!(result[0].id(), dm_alice_bob.id());
599    }
600
601    /// DMs addressed to the watcher are included in the foreign message filter
602    /// used by cmd_watch, not silently consumed.
603    #[tokio::test]
604    async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
605        use crate::message::make_dm;
606        let chat = NamedTempFile::new().unwrap();
607        let cursor_dir = TempDir::new().unwrap();
608        let cursor = cursor_dir.path().join("cursor");
609
610        // alice sends a DM to bob, and a broadcast message
611        let dm = make_dm("r", "alice", "bob", "secret for bob");
612        let msg = make_message("r", "alice", "public hello");
613        crate::history::append(chat.path(), &dm).await.unwrap();
614        crate::history::append(chat.path(), &msg).await.unwrap();
615
616        // Simulate what cmd_watch does: poll, then filter for foreign messages + DMs
617        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
618            .await
619            .unwrap();
620
621        let username = "bob";
622        let foreign: Vec<&Message> = messages
623            .iter()
624            .filter(|m| match m {
625                Message::Message { user, .. } => user != username,
626                Message::DirectMessage { to, .. } => to == username,
627                _ => false,
628            })
629            .collect();
630
631        // Both the DM (addressed to bob) and the broadcast (from alice) should appear
632        assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
633        assert!(
634            foreign
635                .iter()
636                .any(|m| matches!(m, Message::DirectMessage { .. })),
637            "DM must not be silently consumed"
638        );
639    }
640
641    /// DMs sent BY the watcher are excluded from the foreign filter (no self-echo).
642    #[tokio::test]
643    async fn poll_messages_dm_from_viewer_excluded_from_watch() {
644        use crate::message::make_dm;
645        let chat = NamedTempFile::new().unwrap();
646        let cursor_dir = TempDir::new().unwrap();
647        let cursor = cursor_dir.path().join("cursor");
648
649        // bob sends a DM to alice
650        let dm = make_dm("r", "bob", "alice", "from bob");
651        crate::history::append(chat.path(), &dm).await.unwrap();
652
653        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
654            .await
655            .unwrap();
656
657        let username = "bob";
658        let foreign: Vec<&Message> = messages
659            .iter()
660            .filter(|m| match m {
661                Message::Message { user, .. } => user != username,
662                Message::DirectMessage { to, .. } => to == username,
663                _ => false,
664            })
665            .collect();
666
667        assert!(
668            foreign.is_empty(),
669            "DMs sent by the watcher should not wake watch"
670        );
671    }
672
673    /// Host sees all DMs in poll regardless of sender/recipient.
674    #[tokio::test]
675    async fn poll_messages_host_sees_all_dms() {
676        use crate::message::make_dm;
677        let chat = NamedTempFile::new().unwrap();
678        let cursor_dir = TempDir::new().unwrap();
679        let cursor = cursor_dir.path().join("cursor");
680
681        let dm_alice_bob = make_dm("r", "alice", "bob", "private");
682        let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
683        crate::history::append(chat.path(), &dm_alice_bob)
684            .await
685            .unwrap();
686        crate::history::append(chat.path(), &dm_carol_dave)
687            .await
688            .unwrap();
689
690        // host "eve" can see both DMs
691        let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
692            .await
693            .unwrap();
694        assert_eq!(result.len(), 2, "host should see all DMs");
695    }
696
697    /// Non-host third party cannot see DMs they are not party to.
698    #[tokio::test]
699    async fn poll_messages_non_host_cannot_see_unrelated_dms() {
700        use crate::message::make_dm;
701        let chat = NamedTempFile::new().unwrap();
702        let cursor_dir = TempDir::new().unwrap();
703        let cursor = cursor_dir.path().join("cursor");
704
705        let dm = make_dm("r", "alice", "bob", "private");
706        crate::history::append(chat.path(), &dm).await.unwrap();
707
708        // carol is not a party and is not host
709        let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
710            .await
711            .unwrap();
712        assert!(result.is_empty(), "non-host third party should not see DM");
713    }
714
715    /// Host reads from pull_messages as well.
716    #[tokio::test]
717    async fn pull_messages_host_sees_all_dms() {
718        use crate::message::make_dm;
719        let chat = NamedTempFile::new().unwrap();
720
721        let dm = make_dm("r", "alice", "bob", "secret");
722        crate::history::append(chat.path(), &dm).await.unwrap();
723
724        let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
725            .await
726            .unwrap();
727        assert_eq!(result.len(), 1, "host should see the DM via pull");
728    }
729
730    // ── poll_messages_multi tests ──────────────────────────────────────────
731
732    /// Multi-room poll merges messages from two rooms sorted by timestamp.
733    #[tokio::test]
734    async fn poll_multi_merges_by_timestamp() {
735        let chat_a = NamedTempFile::new().unwrap();
736        let chat_b = NamedTempFile::new().unwrap();
737
738        let rid_a = format!("test-merge-a-{}", std::process::id());
739        let rid_b = format!("test-merge-b-{}", std::process::id());
740
741        // Append messages with interleaved timestamps
742        let msg_a1 = make_message(&rid_a, "alice", "a1");
743        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
744        let msg_b1 = make_message(&rid_b, "bob", "b1");
745        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
746        let msg_a2 = make_message(&rid_a, "alice", "a2");
747
748        crate::history::append(chat_a.path(), &msg_a1)
749            .await
750            .unwrap();
751        crate::history::append(chat_b.path(), &msg_b1)
752            .await
753            .unwrap();
754        crate::history::append(chat_a.path(), &msg_a2)
755            .await
756            .unwrap();
757
758        let rooms: Vec<(&str, &Path)> = vec![
759            (rid_a.as_str(), chat_a.path()),
760            (rid_b.as_str(), chat_b.path()),
761        ];
762
763        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
764        assert_eq!(result.len(), 3);
765        // Verify timestamp ordering
766        assert!(result[0].ts() <= result[1].ts());
767        assert!(result[1].ts() <= result[2].ts());
768        // First message should be from room-a (earliest)
769        assert_eq!(result[0].room(), &rid_a);
770
771        // Clean up cursor files
772        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
773        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
774    }
775
776    /// Multi-room poll uses per-room cursors (second call returns nothing).
777    #[tokio::test]
778    async fn poll_multi_advances_per_room_cursors() {
779        let chat_a = NamedTempFile::new().unwrap();
780        let chat_b = NamedTempFile::new().unwrap();
781
782        // Use unique room IDs to avoid cursor file collisions with parallel tests
783        let rid_a = format!("test-cursor-a-{}", std::process::id());
784        let rid_b = format!("test-cursor-b-{}", std::process::id());
785
786        let msg_a = make_message(&rid_a, "alice", "hello a");
787        let msg_b = make_message(&rid_b, "bob", "hello b");
788        crate::history::append(chat_a.path(), &msg_a).await.unwrap();
789        crate::history::append(chat_b.path(), &msg_b).await.unwrap();
790
791        let rooms: Vec<(&str, &Path)> = vec![
792            (rid_a.as_str(), chat_a.path()),
793            (rid_b.as_str(), chat_b.path()),
794        ];
795
796        // First poll gets everything
797        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
798        assert_eq!(result.len(), 2);
799
800        // Second poll gets nothing (cursors advanced)
801        let result2 = poll_messages_multi(&rooms, "viewer").await.unwrap();
802        assert!(
803            result2.is_empty(),
804            "second multi-poll should return nothing"
805        );
806
807        // Clean up cursor files
808        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
809        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
810    }
811
812    /// Multi-room poll with one empty room still returns messages from the other.
813    #[tokio::test]
814    async fn poll_multi_one_empty_room() {
815        let chat_a = NamedTempFile::new().unwrap();
816        let chat_b = NamedTempFile::new().unwrap();
817
818        let rid_a = format!("test-empty-a-{}", std::process::id());
819        let rid_b = format!("test-empty-b-{}", std::process::id());
820
821        let msg = make_message(&rid_a, "alice", "only here");
822        crate::history::append(chat_a.path(), &msg).await.unwrap();
823        // chat_b is empty
824
825        let rooms: Vec<(&str, &Path)> = vec![
826            (rid_a.as_str(), chat_a.path()),
827            (rid_b.as_str(), chat_b.path()),
828        ];
829
830        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
831        assert_eq!(result.len(), 1);
832        assert_eq!(result[0].room(), &rid_a);
833
834        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
835        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
836    }
837
838    /// Multi-room poll with no rooms returns nothing.
839    #[tokio::test]
840    async fn poll_multi_no_rooms() {
841        let rooms: Vec<(&str, &Path)> = vec![];
842        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
843        assert!(result.is_empty());
844    }
845
846    /// Multi-room poll filters DMs by viewer across rooms.
847    #[tokio::test]
848    async fn poll_multi_filters_dms_across_rooms() {
849        use crate::message::make_dm;
850        let chat_a = NamedTempFile::new().unwrap();
851        let chat_b = NamedTempFile::new().unwrap();
852
853        let rid_a = format!("test-dm-a-{}", std::process::id());
854        let rid_b = format!("test-dm-b-{}", std::process::id());
855
856        // DM to bob in room-a, DM to carol in room-b
857        let dm_to_bob = make_dm(&rid_a, "alice", "bob", "secret for bob");
858        let dm_to_carol = make_dm(&rid_b, "alice", "carol", "secret for carol");
859        crate::history::append(chat_a.path(), &dm_to_bob)
860            .await
861            .unwrap();
862        crate::history::append(chat_b.path(), &dm_to_carol)
863            .await
864            .unwrap();
865
866        let rooms: Vec<(&str, &Path)> = vec![
867            (rid_a.as_str(), chat_a.path()),
868            (rid_b.as_str(), chat_b.path()),
869        ];
870
871        // bob sees only room-a DM
872        let result = poll_messages_multi(&rooms, "bob").await.unwrap();
873        assert_eq!(result.len(), 1);
874        assert_eq!(result[0].room(), &rid_a);
875
876        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "bob"));
877        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "bob"));
878    }
879
880    // ── cmd_query unit tests ───────────────────────────────────────────────────
881
882    /// cmd_query in historical mode returns all messages (newest-first by default).
883    #[tokio::test]
884    async fn cmd_query_history_returns_all_newest_first() {
885        let chat = NamedTempFile::new().unwrap();
886        let cursor_dir = TempDir::new().unwrap();
887        let token_dir = TempDir::new().unwrap();
888
889        let room_id = format!("test-cqh-{}", std::process::id());
890        write_token_file(&token_dir, &room_id, "alice", "tok-alice");
891        write_meta_file(&room_id, chat.path());
892
893        for i in 0..3u32 {
894            crate::history::append(
895                chat.path(),
896                &make_message(&room_id, "alice", format!("{i}")),
897            )
898            .await
899            .unwrap();
900        }
901
902        let filter = QueryFilter {
903            rooms: vec![room_id.clone()],
904            ascending: false,
905            ..Default::default()
906        };
907        let opts = QueryOptions {
908            new_only: false,
909            wait: false,
910            interval_secs: 5,
911            mentions_only: false,
912            since_uuid: None,
913        };
914
915        // cursor should NOT advance (historical mode)
916        let cursor_path = crate::paths::cursor_path(&room_id, "alice");
917        let _ = std::fs::remove_file(&cursor_path);
918
919        // Run cmd_query — captures stdout indirectly by ensuring cursor unchanged.
920        oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-alice", filter, opts, &cursor_dir)
921            .await
922            .unwrap();
923
924        // Cursor must not have been written in historical mode.
925        assert!(
926            !cursor_path.exists(),
927            "historical query must not write a cursor file"
928        );
929
930        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
931        let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
932    }
933
934    /// cmd_query in --new mode advances the cursor.
935    #[tokio::test]
936    async fn cmd_query_new_advances_cursor() {
937        let chat = NamedTempFile::new().unwrap();
938        let cursor_dir = TempDir::new().unwrap();
939        let token_dir = TempDir::new().unwrap();
940
941        let room_id = format!("test-cqn-{}", std::process::id());
942        write_token_file(&token_dir, &room_id, "alice", "tok-cqn");
943        write_meta_file(&room_id, chat.path());
944
945        let msg = make_message(&room_id, "bob", "hello");
946        crate::history::append(chat.path(), &msg).await.unwrap();
947
948        let filter = QueryFilter {
949            rooms: vec![room_id.clone()],
950            ascending: true,
951            ..Default::default()
952        };
953        let opts = QueryOptions {
954            new_only: true,
955            wait: false,
956            interval_secs: 5,
957            mentions_only: false,
958            since_uuid: None,
959        };
960
961        // First query — should return the message and write cursor.
962        let result = oneshot_cmd_query_to_vec(
963            &[room_id.clone()],
964            "tok-cqn",
965            filter.clone(),
966            opts.clone(),
967            &cursor_dir,
968        )
969        .await
970        .unwrap();
971        assert_eq!(result.len(), 1);
972
973        // Second query — cursor advanced, nothing new.
974        let result2 =
975            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
976                .await
977                .unwrap();
978        assert!(
979            result2.is_empty(),
980            "second query should return nothing (cursor advanced)"
981        );
982
983        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
984        let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
985    }
986
987    /// cmd_query with content_search only returns matching messages.
988    #[tokio::test]
989    async fn cmd_query_content_search_filters() {
990        let chat = NamedTempFile::new().unwrap();
991        let cursor_dir = TempDir::new().unwrap();
992        let token_dir = TempDir::new().unwrap();
993
994        let room_id = format!("test-cqs-{}", std::process::id());
995        write_token_file(&token_dir, &room_id, "alice", "tok-cqs");
996        write_meta_file(&room_id, chat.path());
997
998        crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
999            .await
1000            .unwrap();
1001        crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
1002            .await
1003            .unwrap();
1004
1005        let filter = QueryFilter {
1006            rooms: vec![room_id.clone()],
1007            content_search: Some("hello".into()),
1008            ascending: true,
1009            ..Default::default()
1010        };
1011        let opts = QueryOptions {
1012            new_only: false,
1013            wait: false,
1014            interval_secs: 5,
1015            mentions_only: false,
1016            since_uuid: None,
1017        };
1018
1019        let result =
1020            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
1021                .await
1022                .unwrap();
1023        assert_eq!(result.len(), 1);
1024        assert!(result[0].content().unwrap().contains("hello"));
1025
1026        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1027        let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1028    }
1029
1030    /// cmd_query with user filter only returns messages from that user.
1031    #[tokio::test]
1032    async fn cmd_query_user_filter() {
1033        let chat = NamedTempFile::new().unwrap();
1034        let cursor_dir = TempDir::new().unwrap();
1035        let token_dir = TempDir::new().unwrap();
1036
1037        let room_id = format!("test-cqu-{}", std::process::id());
1038        write_token_file(&token_dir, &room_id, "alice", "tok-cqu");
1039        write_meta_file(&room_id, chat.path());
1040
1041        crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
1042            .await
1043            .unwrap();
1044        crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
1045            .await
1046            .unwrap();
1047
1048        let filter = QueryFilter {
1049            rooms: vec![room_id.clone()],
1050            users: vec!["bob".into()],
1051            ascending: true,
1052            ..Default::default()
1053        };
1054        let opts = QueryOptions {
1055            new_only: false,
1056            wait: false,
1057            interval_secs: 5,
1058            mentions_only: false,
1059            since_uuid: None,
1060        };
1061
1062        let result =
1063            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
1064                .await
1065                .unwrap();
1066        assert_eq!(result.len(), 1);
1067        assert_eq!(result[0].user(), "bob");
1068
1069        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1070        let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1071    }
1072
1073    /// cmd_query with limit returns only N messages.
1074    #[tokio::test]
1075    async fn cmd_query_limit() {
1076        let chat = NamedTempFile::new().unwrap();
1077        let cursor_dir = TempDir::new().unwrap();
1078        let token_dir = TempDir::new().unwrap();
1079
1080        let room_id = format!("test-cql-{}", std::process::id());
1081        write_token_file(&token_dir, &room_id, "alice", "tok-cql");
1082        write_meta_file(&room_id, chat.path());
1083
1084        for i in 0..5u32 {
1085            crate::history::append(
1086                chat.path(),
1087                &make_message(&room_id, "bob", format!("msg {i}")),
1088            )
1089            .await
1090            .unwrap();
1091        }
1092
1093        let filter = QueryFilter {
1094            rooms: vec![room_id.clone()],
1095            limit: Some(2),
1096            ascending: false,
1097            ..Default::default()
1098        };
1099        let opts = QueryOptions {
1100            new_only: false,
1101            wait: false,
1102            interval_secs: 5,
1103            mentions_only: false,
1104            since_uuid: None,
1105        };
1106
1107        let result =
1108            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
1109                .await
1110                .unwrap();
1111        assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
1112
1113        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1114        let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1115    }
1116
1117    // ── Test helpers ──────────────────────────────────────────────────────────
1118
1119    fn token_path(room_id: &str, username: &str) -> PathBuf {
1120        crate::paths::token_path(room_id, username)
1121    }
1122
1123    fn write_token_file(_dir: &TempDir, room_id: &str, username: &str, token: &str) {
1124        let path = token_path(room_id, username);
1125        if let Some(parent) = path.parent() {
1126            std::fs::create_dir_all(parent).unwrap();
1127        }
1128        let data = serde_json::json!({ "username": username, "token": token });
1129        std::fs::write(&path, format!("{data}\n")).unwrap();
1130    }
1131
1132    fn write_meta_file(room_id: &str, chat_path: &Path) {
1133        let meta_path = crate::paths::room_meta_path(room_id);
1134        if let Some(parent) = meta_path.parent() {
1135            std::fs::create_dir_all(parent).unwrap();
1136        }
1137        let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
1138        std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
1139    }
1140
1141    /// Run cmd_query and collect returned messages.
1142    ///
1143    /// Since cmd_query writes to stdout, we wrap it to capture results by
1144    /// re-reading the chat file with the same filter in historical mode.
1145    /// For `new_only` tests we verify the cursor state instead.
1146    async fn oneshot_cmd_query_to_vec(
1147        room_ids: &[String],
1148        token: &str,
1149        filter: QueryFilter,
1150        opts: QueryOptions,
1151        _cursor_dir: &TempDir,
1152    ) -> anyhow::Result<Vec<Message>> {
1153        // Snapshot cursor before run.
1154        let cursor_before = room_ids
1155            .first()
1156            .map(|id| {
1157                // Resolve username by reading the token file for this room.
1158                super::super::token::username_from_token(id, token)
1159                    .ok()
1160                    .map(|u| {
1161                        let p = crate::paths::cursor_path(id, &u);
1162                        std::fs::read_to_string(&p).ok()
1163                    })
1164                    .flatten()
1165            })
1166            .flatten();
1167
1168        // Run cmd_query (side effect: may update cursor).
1169        cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
1170
1171        // Snapshot cursor after run.
1172        let cursor_after = room_ids
1173            .first()
1174            .map(|id| {
1175                super::super::token::username_from_token(id, token)
1176                    .ok()
1177                    .map(|u| {
1178                        let p = crate::paths::cursor_path(id, &u);
1179                        std::fs::read_to_string(&p).ok()
1180                    })
1181                    .flatten()
1182            })
1183            .flatten();
1184
1185        // Reconstruct what cmd_query would have returned.
1186        // For historical mode: re-run with same filter and collect messages.
1187        // For new_only mode: reload history and apply filter with the "before" cursor.
1188        if !opts.new_only && !opts.wait {
1189            // Historical: reload and reapply filter.
1190            let mut all: Vec<Message> = Vec::new();
1191            for room_id in room_ids {
1192                let meta_path = crate::paths::room_meta_path(room_id);
1193                let chat_path = chat_path_from_meta(room_id, &meta_path);
1194                let msgs = history::load(&chat_path).await?;
1195                all.extend(msgs);
1196            }
1197            let username = resolve_username_from_rooms(room_ids, token).unwrap_or_default();
1198            let mut result: Vec<Message> = all
1199                .into_iter()
1200                .filter(|m| filter.matches(m, m.room()))
1201                .filter(|m| match m {
1202                    Message::DirectMessage { user, to, .. } => user == &username || to == &username,
1203                    _ => true,
1204                })
1205                .collect();
1206            apply_sort_and_limit(&mut result, &filter);
1207            Ok(result)
1208        } else {
1209            // new_only mode: reconstruct returned messages by replaying history
1210            // from cursor_before (before the call advanced it).
1211            let advanced = cursor_after != cursor_before;
1212            if advanced {
1213                let room_id = &room_ids[0];
1214                let meta_path = crate::paths::room_meta_path(room_id);
1215                let chat_path = chat_path_from_meta(room_id, &meta_path);
1216                let all = history::load(&chat_path).await?;
1217                // Find start from the pre-run cursor UUID.
1218                let start = match &cursor_before {
1219                    Some(id) => all
1220                        .iter()
1221                        .position(|m| m.id() == id.trim())
1222                        .map(|i| i + 1)
1223                        .unwrap_or(0),
1224                    None => 0,
1225                };
1226                let filtered: Vec<Message> = all[start..]
1227                    .iter()
1228                    .filter(|m| filter.matches(m, m.room()))
1229                    .cloned()
1230                    .collect();
1231                Ok(filtered)
1232            } else {
1233                Ok(vec![])
1234            }
1235        }
1236    }
1237
1238    /// resolve_username_from_rooms finds username from the first matching room.
1239    #[test]
1240    fn resolve_username_finds_token_in_second_room() {
1241        // This test can't easily be hermetic since resolve_username_from_rooms
1242        // calls username_from_token which scans /tmp. We test the error case instead.
1243        let result = resolve_username_from_rooms(&["nonexistent-room-xyz".to_owned()], "bad-token");
1244        assert!(result.is_err());
1245        assert!(result
1246            .unwrap_err()
1247            .to_string()
1248            .contains("token not recognised"));
1249    }
1250
1251    // ── subscription tier filtering tests ──────────────────────────────────
1252
1253    /// load_user_tier returns Full when no subscription file exists.
1254    #[test]
1255    fn load_user_tier_missing_file_returns_full() {
1256        // Use a room ID that will never have a subscription file on disk.
1257        let tier = load_user_tier("nonexistent-room-tier-test", "alice");
1258        assert_eq!(tier, SubscriptionTier::Full);
1259    }
1260
1261    /// load_user_tier returns the persisted tier when the file exists.
1262    #[test]
1263    fn load_user_tier_returns_persisted_tier() {
1264        let state_dir = crate::paths::room_state_dir();
1265        let _ = std::fs::create_dir_all(&state_dir);
1266        let room_id = format!("test-tier-load-{}", std::process::id());
1267        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1268
1269        let mut map = std::collections::HashMap::new();
1270        map.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1271        map.insert("bob".to_string(), SubscriptionTier::Unsubscribed);
1272        let json = serde_json::to_string_pretty(&map).unwrap();
1273        std::fs::write(&sub_path, json).unwrap();
1274
1275        assert_eq!(
1276            load_user_tier(&room_id, "alice"),
1277            SubscriptionTier::MentionsOnly
1278        );
1279        assert_eq!(
1280            load_user_tier(&room_id, "bob"),
1281            SubscriptionTier::Unsubscribed
1282        );
1283        // Unknown user defaults to Full.
1284        assert_eq!(load_user_tier(&room_id, "carol"), SubscriptionTier::Full);
1285
1286        let _ = std::fs::remove_file(&sub_path);
1287    }
1288
1289    /// apply_tier_filter with Full keeps all messages.
1290    #[test]
1291    fn apply_tier_filter_full_keeps_all() {
1292        let mut msgs = vec![
1293            make_message("r", "alice", "hello"),
1294            make_message("r", "bob", "world"),
1295        ];
1296        apply_tier_filter(&mut msgs, SubscriptionTier::Full, "carol");
1297        assert_eq!(msgs.len(), 2);
1298    }
1299
1300    /// apply_tier_filter with MentionsOnly keeps only @mentions.
1301    #[test]
1302    fn apply_tier_filter_mentions_only_filters() {
1303        let mut msgs = vec![
1304            make_message("r", "alice", "hey @carol check this"),
1305            make_message("r", "bob", "unrelated message"),
1306            make_message("r", "dave", "also @carol"),
1307        ];
1308        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1309        assert_eq!(msgs.len(), 2);
1310        assert!(msgs[0].content().unwrap().contains("@carol"));
1311        assert!(msgs[1].content().unwrap().contains("@carol"));
1312    }
1313
1314    /// apply_tier_filter with Unsubscribed clears all messages.
1315    #[test]
1316    fn apply_tier_filter_unsubscribed_clears_all() {
1317        let mut msgs = vec![
1318            make_message("r", "alice", "hey @carol"),
1319            make_message("r", "bob", "world"),
1320        ];
1321        apply_tier_filter(&mut msgs, SubscriptionTier::Unsubscribed, "carol");
1322        assert!(msgs.is_empty());
1323    }
1324
1325    /// apply_tier_filter with MentionsOnly and no mentions returns empty.
1326    #[test]
1327    fn apply_tier_filter_mentions_only_no_mentions_returns_empty() {
1328        let mut msgs = vec![
1329            make_message("r", "alice", "hello"),
1330            make_message("r", "bob", "world"),
1331        ];
1332        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1333        assert!(msgs.is_empty());
1334    }
1335
1336    /// cmd_query with Unsubscribed tier and public_only=true still returns messages.
1337    #[tokio::test]
1338    async fn cmd_query_public_bypasses_tier() {
1339        let chat = NamedTempFile::new().unwrap();
1340        let token_dir = TempDir::new().unwrap();
1341        let cursor_dir = TempDir::new().unwrap();
1342
1343        let room_id = format!("test-pub-tier-{}", std::process::id());
1344        write_token_file(&token_dir, &room_id, "alice", "tok-pub-tier");
1345        write_meta_file(&room_id, chat.path());
1346
1347        // Write subscription map marking alice as Unsubscribed.
1348        let state_dir = crate::paths::room_state_dir();
1349        let _ = std::fs::create_dir_all(&state_dir);
1350        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1351        let mut map = std::collections::HashMap::new();
1352        map.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1353        std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
1354
1355        // Add a message.
1356        crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
1357            .await
1358            .unwrap();
1359
1360        // Query with public_only=true should bypass tier and return the message.
1361        let filter = QueryFilter {
1362            rooms: vec![room_id.clone()],
1363            public_only: true,
1364            ascending: true,
1365            ..Default::default()
1366        };
1367        let opts = QueryOptions {
1368            new_only: false,
1369            wait: false,
1370            interval_secs: 5,
1371            mentions_only: false,
1372            since_uuid: None,
1373        };
1374
1375        let result = oneshot_cmd_query_to_vec(
1376            &[room_id.clone()],
1377            "tok-pub-tier",
1378            filter,
1379            opts,
1380            &cursor_dir,
1381        )
1382        .await
1383        .unwrap();
1384        assert_eq!(
1385            result.len(),
1386            1,
1387            "public flag should bypass Unsubscribed tier"
1388        );
1389
1390        let _ = std::fs::remove_file(&sub_path);
1391        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1392        let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1393    }
1394
1395    /// MentionsOnly tier sets mention_user filter, narrowing results to @mentions.
1396    #[test]
1397    fn mentions_only_tier_sets_mention_user_on_filter() {
1398        // Verify the tier logic: when tier is MentionsOnly and mention_user is
1399        // not already set, it should be set to the username.
1400        let mut filter = QueryFilter::default();
1401        let tier = SubscriptionTier::MentionsOnly;
1402
1403        // Simulate what cmd_query does.
1404        match tier {
1405            SubscriptionTier::MentionsOnly => {
1406                if filter.mention_user.is_none() {
1407                    filter.mention_user = Some("alice".to_string());
1408                }
1409            }
1410            _ => {}
1411        }
1412
1413        assert_eq!(filter.mention_user, Some("alice".to_string()));
1414
1415        // Now verify with apply_tier_filter that messages are correctly narrowed.
1416        let mut msgs = vec![
1417            make_message("r", "bob", "hey @alice look"),
1418            make_message("r", "bob", "unrelated chatter"),
1419        ];
1420        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "alice");
1421        assert_eq!(msgs.len(), 1);
1422        assert!(msgs[0].content().unwrap().contains("@alice"));
1423    }
1424
1425    /// MentionsOnly tier does not override an existing mention_user filter.
1426    #[test]
1427    fn mentions_only_tier_preserves_existing_mention_user() {
1428        let mut filter = QueryFilter {
1429            mention_user: Some("bob".to_string()),
1430            ..Default::default()
1431        };
1432
1433        // MentionsOnly should not overwrite the existing filter.
1434        match SubscriptionTier::MentionsOnly {
1435            SubscriptionTier::MentionsOnly => {
1436                if filter.mention_user.is_none() {
1437                    filter.mention_user = Some("alice".to_string());
1438                }
1439            }
1440            _ => {}
1441        }
1442
1443        assert_eq!(
1444            filter.mention_user,
1445            Some("bob".to_string()),
1446            "existing mention_user filter should be preserved"
1447        );
1448    }
1449
1450    // ── per-room subscription tier filtering tests ─────────────────────────
1451
1452    #[test]
1453    fn per_room_tier_filter_full_keeps_all() {
1454        let mut msgs = vec![
1455            make_message("dev", "alice", "hello from dev"),
1456            make_message("lobby", "bob", "hello from lobby"),
1457        ];
1458        // No subscription files → defaults to Full for both rooms.
1459        let rooms = vec![
1460            "nonexistent-perroom-full-1".to_string(),
1461            "nonexistent-perroom-full-2".to_string(),
1462        ];
1463        apply_per_room_tier_filter(&mut msgs, &rooms, "carol");
1464        assert_eq!(msgs.len(), 2);
1465    }
1466
1467    #[test]
1468    fn per_room_tier_filter_mixed_tiers() {
1469        let state_dir = crate::paths::room_state_dir();
1470        let _ = std::fs::create_dir_all(&state_dir);
1471
1472        let room_full = format!("perroom-mixed-full-{}", std::process::id());
1473        let room_unsub = format!("perroom-mixed-unsub-{}", std::process::id());
1474        let room_mentions = format!("perroom-mixed-ment-{}", std::process::id());
1475
1476        // Write subscription maps: room_full=Full (default), room_unsub=Unsubscribed, room_mentions=MentionsOnly
1477        let sub_unsub = crate::paths::broker_subscriptions_path(&state_dir, &room_unsub);
1478        let mut map_unsub = std::collections::HashMap::new();
1479        map_unsub.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1480        std::fs::write(&sub_unsub, serde_json::to_string(&map_unsub).unwrap()).unwrap();
1481
1482        let sub_ment = crate::paths::broker_subscriptions_path(&state_dir, &room_mentions);
1483        let mut map_ment = std::collections::HashMap::new();
1484        map_ment.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1485        std::fs::write(&sub_ment, serde_json::to_string(&map_ment).unwrap()).unwrap();
1486
1487        let mut msgs = vec![
1488            make_message(&room_full, "bob", "visible in full room"),
1489            make_message(&room_unsub, "bob", "invisible — unsubscribed"),
1490            make_message(&room_mentions, "bob", "no mention — filtered"),
1491            make_message(&room_mentions, "bob", "hey @alice check this"),
1492        ];
1493
1494        let rooms = vec![room_full.clone(), room_unsub.clone(), room_mentions.clone()];
1495        apply_per_room_tier_filter(&mut msgs, &rooms, "alice");
1496
1497        // Only the Full room message and the MentionsOnly room @alice message survive.
1498        assert_eq!(msgs.len(), 2);
1499        assert!(msgs[0].content().unwrap().contains("visible in full room"));
1500        assert!(msgs[1].content().unwrap().contains("@alice"));
1501
1502        // Cleanup
1503        let _ = std::fs::remove_file(&sub_unsub);
1504        let _ = std::fs::remove_file(&sub_ment);
1505    }
1506
1507    #[test]
1508    fn per_room_tier_filter_unknown_room_defaults_to_full() {
1509        let mut msgs = vec![make_message("mystery", "bob", "hello")];
1510        // Room not in the room_ids list at all — tier defaults to Full.
1511        apply_per_room_tier_filter(&mut msgs, &["other".to_string()], "alice");
1512        assert_eq!(msgs.len(), 1);
1513    }
1514
1515    /// pull_messages returns the last n entries without moving the cursor.
1516    #[tokio::test]
1517    async fn pull_messages_returns_tail_without_cursor_change() {
1518        let chat = NamedTempFile::new().unwrap();
1519        let cursor_dir = TempDir::new().unwrap();
1520        let cursor = cursor_dir.path().join("cursor");
1521
1522        for i in 0..5u32 {
1523            crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
1524                .await
1525                .unwrap();
1526        }
1527
1528        let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
1529        assert_eq!(pulled.len(), 3);
1530
1531        // cursor untouched — poll still returns all 5
1532        let polled = poll_messages(chat.path(), &cursor, None, None, None)
1533            .await
1534            .unwrap();
1535        assert_eq!(polled.len(), 5);
1536    }
1537}