Skip to main content

room_cli/oneshot/
poll.rs

1use std::path::{Path, PathBuf};
2
3use room_protocol::{EventFilter, SubscriptionTier};
4
5use crate::{
6    broker::commands::{load_event_filter_map, load_subscription_map},
7    history,
8    message::Message,
9    paths,
10    query::QueryFilter,
11};
12
13use super::token::{read_cursor, username_from_token, write_cursor};
14
15// ── Subscription tier lookup ──────────────────────────────────────────────────
16
17/// Look up a user's subscription tier for a room from the persisted
18/// subscription map on disk.
19///
20/// Returns `Full` when the subscription file is missing, corrupt, or the user
21/// has no entry — unsubscribed users must have been explicitly recorded.
22fn load_user_tier(room_id: &str, username: &str) -> SubscriptionTier {
23    let state_dir = paths::room_state_dir();
24    let sub_path = paths::broker_subscriptions_path(&state_dir, room_id);
25    let map = load_subscription_map(&sub_path);
26    map.get(username).copied().unwrap_or(SubscriptionTier::Full)
27}
28
29/// Apply subscription-tier filtering to a message list in place.
30///
31/// - `Full` — no filtering (all messages pass).
32/// - `MentionsOnly` — keep only messages that @mention `username`.
33/// - `Unsubscribed` — remove all messages.
34fn apply_tier_filter(messages: &mut Vec<Message>, tier: SubscriptionTier, username: &str) {
35    match tier {
36        SubscriptionTier::Full => {}
37        SubscriptionTier::MentionsOnly => {
38            messages.retain(|m| m.mentions().iter().any(|mention| mention == username));
39        }
40        SubscriptionTier::Unsubscribed => {
41            messages.clear();
42        }
43    }
44}
45
46/// Apply per-room subscription-tier filtering to a message list in place.
47///
48/// Looks up the user's tier for each message's room and filters accordingly.
49/// Each room is checked independently — a user can be `Full` in one room and
50/// `Unsubscribed` in another.
51fn apply_per_room_tier_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
52    use std::collections::HashMap;
53    let tiers: HashMap<&str, SubscriptionTier> = room_ids
54        .iter()
55        .map(|r| (r.as_str(), load_user_tier(r, username)))
56        .collect();
57
58    messages.retain(|m| {
59        let tier = tiers
60            .get(m.room())
61            .copied()
62            .unwrap_or(SubscriptionTier::Full);
63        match tier {
64            SubscriptionTier::Full => true,
65            SubscriptionTier::MentionsOnly => {
66                m.mentions().iter().any(|mention| mention == username)
67            }
68            SubscriptionTier::Unsubscribed => false,
69        }
70    });
71}
72
73// ── Event filter lookup ──────────────────────────────────────────────────────
74
75/// Look up a user's event filter for a room from the persisted event filter
76/// map on disk.
77///
78/// Returns `EventFilter::All` when the file is missing, corrupt, or the user
79/// has no entry — all events pass by default.
80fn load_user_event_filter(room_id: &str, username: &str) -> EventFilter {
81    let state_dir = paths::room_state_dir();
82    let ef_path = paths::broker_event_filters_path(&state_dir, room_id);
83    let map = load_event_filter_map(&ef_path);
84    map.get(username).cloned().unwrap_or(EventFilter::All)
85}
86
87/// Apply event-type filtering to a message list in place.
88///
89/// Only [`Message::Event`] messages are affected — all other message types
90/// pass through unfiltered. For Event messages, the `event_type` field is
91/// checked against the filter.
92fn apply_event_filter(messages: &mut Vec<Message>, filter: &EventFilter) {
93    if matches!(filter, EventFilter::All) {
94        return;
95    }
96    messages.retain(|m| match m {
97        Message::Event { event_type, .. } => filter.allows(event_type),
98        _ => true,
99    });
100}
101
102/// Apply per-room event-filter filtering to a message list in place.
103///
104/// Similar to [`apply_per_room_tier_filter`] but for event types.
105fn apply_per_room_event_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
106    use std::collections::HashMap;
107    let filters: HashMap<&str, EventFilter> = room_ids
108        .iter()
109        .map(|r| (r.as_str(), load_user_event_filter(r, username)))
110        .collect();
111
112    messages.retain(|m| match m {
113        Message::Event {
114            room, event_type, ..
115        } => {
116            let filter = filters.get(room.as_str()).unwrap_or(&EventFilter::All);
117            filter.allows(event_type)
118        }
119        _ => true,
120    });
121}
122
123// ── Query engine types ─────────────────────────────────────────────────────────
124
125/// Options for the `room query` subcommand (and `poll`/`watch` aliases).
126#[derive(Debug, Clone)]
127pub struct QueryOptions {
128    /// Only return messages since the last poll cursor; advances the cursor
129    /// after printing results.
130    pub new_only: bool,
131    /// Block until at least one new foreign message arrives (implies `new_only`).
132    pub wait: bool,
133    /// Poll interval in seconds when `wait` is true.
134    pub interval_secs: u64,
135    /// If `true`, only return messages that @mention the calling user
136    /// (username resolved from the token).
137    pub mentions_only: bool,
138    /// Override the cursor with this legacy message UUID (used by the `poll`
139    /// alias `--since` flag, which predates the `room:seq` format).
140    pub since_uuid: Option<String>,
141}
142
143/// Return all messages from `chat_path` after the message with ID `since` (exclusive).
144///
145/// If `since` is `None`, the cursor file at `cursor_path` is checked for a previously
146/// stored position. A `None` cursor means all messages are returned.
147///
148/// `viewer` is the username of the caller. When `Some`, `DirectMessage` entries are
149/// filtered using [`Message::is_visible_to`], which grants access to the sender,
150/// recipient, and the room host. Pass `None` to skip DM filtering (e.g. in tests
151/// that don't involve DMs).
152///
153/// `host` is the room host username (typically the first user to join). When `Some`,
154/// the host can see all DMs regardless of sender/recipient.
155///
156/// The cursor file is updated to the last returned message's ID after each successful call.
157pub async fn poll_messages(
158    chat_path: &Path,
159    cursor_path: &Path,
160    viewer: Option<&str>,
161    host: Option<&str>,
162    since: Option<&str>,
163) -> anyhow::Result<Vec<Message>> {
164    let effective_since: Option<String> = since
165        .map(|s| s.to_owned())
166        .or_else(|| read_cursor(cursor_path));
167
168    let messages = history::load(chat_path).await?;
169
170    let start = match &effective_since {
171        Some(id) => messages
172            .iter()
173            .position(|m| m.id() == id)
174            .map(|i| i + 1)
175            .unwrap_or(0),
176        None => 0,
177    };
178
179    let result: Vec<Message> = messages[start..]
180        .iter()
181        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
182        .cloned()
183        .collect();
184
185    if let Some(last) = result.last() {
186        write_cursor(cursor_path, last.id())?;
187    }
188
189    Ok(result)
190}
191
192/// Return the last `n` messages from history without updating the poll cursor.
193///
194/// DM entries are filtered using [`Message::is_visible_to`] so that `viewer` only
195/// sees messages they are party to (sender, recipient, or host). Pass `None` to
196/// skip DM filtering.
197///
198/// `host` is the room host username. When `Some`, the host can see all DMs.
199pub async fn pull_messages(
200    chat_path: &Path,
201    n: usize,
202    viewer: Option<&str>,
203    host: Option<&str>,
204) -> anyhow::Result<Vec<Message>> {
205    let clamped = n.min(200);
206    let all = history::tail(chat_path, clamped).await?;
207    let visible: Vec<Message> = all
208        .into_iter()
209        .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
210        .collect();
211    Ok(visible)
212}
213
214/// One-shot pull subcommand: print the last N messages from history as NDJSON.
215///
216/// Reads from the chat file directly (no broker connection required).
217/// Does **not** update the poll cursor.
218pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
219    let username = username_from_token(token)?;
220    let meta_path = paths::room_meta_path(room_id);
221    let chat_path = chat_path_from_meta(room_id, &meta_path);
222
223    let host = read_host_from_meta(&meta_path);
224    let mut messages = pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
225    let tier = load_user_tier(room_id, &username);
226    apply_tier_filter(&mut messages, tier, &username);
227    let ef = load_user_event_filter(room_id, &username);
228    apply_event_filter(&mut messages, &ef);
229    for msg in &messages {
230        println!("{}", serde_json::to_string(msg)?);
231    }
232    Ok(())
233}
234
235/// Watch subcommand: poll in a loop until at least one foreign message arrives.
236///
237/// Reads the caller's username from the session token file. Polls every
238/// `interval_secs` seconds, filtering out own messages. Wakes on `Message`,
239/// `System`, and `DirectMessage` variants. Exits after printing the first batch
240/// of foreign messages as NDJSON. Shares the cursor file with `room poll` — the
241/// two subcommands never re-deliver the same message.
242pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
243    let username = username_from_token(token)?;
244    let meta_path = paths::room_meta_path(room_id);
245    let chat_path = chat_path_from_meta(room_id, &meta_path);
246    let cursor_path = paths::cursor_path(room_id, &username);
247    let host = read_host_from_meta(&meta_path);
248
249    let ef = load_user_event_filter(room_id, &username);
250
251    loop {
252        let mut messages = poll_messages(
253            &chat_path,
254            &cursor_path,
255            Some(&username),
256            host.as_deref(),
257            None,
258        )
259        .await?;
260
261        apply_event_filter(&mut messages, &ef);
262
263        let foreign: Vec<&Message> = messages
264            .iter()
265            .filter(|m| match m {
266                Message::Message { user, .. } | Message::System { user, .. } => user != &username,
267                Message::DirectMessage { to, .. } => to == &username,
268                Message::Event { user, .. } => user != &username,
269                _ => false,
270            })
271            .collect();
272
273        if !foreign.is_empty() {
274            for msg in foreign {
275                println!("{}", serde_json::to_string(msg)?);
276            }
277            return Ok(());
278        }
279
280        tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
281    }
282}
283
284/// One-shot poll subcommand: read messages since cursor, print as NDJSON, update cursor.
285///
286/// Reads the caller's username from the session token file. When `mentions_only` is
287/// true, only messages that @mention the caller's username are printed (cursor still
288/// advances past all messages).
289pub async fn cmd_poll(
290    room_id: &str,
291    token: &str,
292    since: Option<String>,
293    mentions_only: bool,
294) -> anyhow::Result<()> {
295    let username = username_from_token(token)?;
296    let meta_path = paths::room_meta_path(room_id);
297    let chat_path = chat_path_from_meta(room_id, &meta_path);
298    let cursor_path = paths::cursor_path(room_id, &username);
299    let host = read_host_from_meta(&meta_path);
300
301    let mut messages = poll_messages(
302        &chat_path,
303        &cursor_path,
304        Some(&username),
305        host.as_deref(),
306        since.as_deref(),
307    )
308    .await?;
309
310    let ef = load_user_event_filter(room_id, &username);
311    apply_event_filter(&mut messages, &ef);
312
313    for msg in &messages {
314        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
315            continue;
316        }
317        println!("{}", serde_json::to_string(msg)?);
318    }
319    Ok(())
320}
321
322/// Poll multiple rooms, returning messages merged by timestamp.
323///
324/// Each room uses its own cursor file under `~/.room/state/`.
325/// Messages are sorted by timestamp across all rooms. Each message already carries
326/// a `room` field, so the caller can distinguish sources.
327pub async fn poll_messages_multi(
328    rooms: &[(&str, &Path)],
329    username: &str,
330) -> anyhow::Result<Vec<Message>> {
331    let mut all_messages: Vec<Message> = Vec::new();
332
333    for &(room_id, chat_path) in rooms {
334        let cursor_path = paths::cursor_path(room_id, username);
335        let meta_path = paths::room_meta_path(room_id);
336        let host = read_host_from_meta(&meta_path);
337        let msgs = poll_messages(
338            chat_path,
339            &cursor_path,
340            Some(username),
341            host.as_deref(),
342            None,
343        )
344        .await?;
345        all_messages.extend(msgs);
346    }
347
348    all_messages.sort_by(|a, b| a.ts().cmp(b.ts()));
349    Ok(all_messages)
350}
351
352/// One-shot multi-room poll subcommand: poll multiple rooms, merge by timestamp, print NDJSON.
353///
354/// Resolves the username from the token by trying each room in order. Each room's cursor
355/// is updated independently.
356pub async fn cmd_poll_multi(
357    room_ids: &[String],
358    token: &str,
359    mentions_only: bool,
360) -> anyhow::Result<()> {
361    // Resolve username by trying the token against each room
362    let username = username_from_token(token)?;
363
364    // Resolve chat paths for all rooms
365    let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
366    for room_id in room_ids {
367        let meta_path = paths::room_meta_path(room_id);
368        let chat_path = chat_path_from_meta(room_id, &meta_path);
369        rooms.push((room_id.as_str(), chat_path));
370    }
371
372    let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
373    let mut messages = poll_messages_multi(&room_refs, &username).await?;
374
375    let room_id_strings: Vec<String> = room_ids.iter().map(|s| s.to_string()).collect();
376    apply_per_room_event_filter(&mut messages, &room_id_strings, &username);
377
378    for msg in &messages {
379        if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
380            continue;
381        }
382        println!("{}", serde_json::to_string(msg)?);
383    }
384    Ok(())
385}
386
387// ── cmd_query ─────────────────────────────────────────────────────────────────
388
389/// Unified query entry point for `room query` and the `poll`/`watch` aliases.
390///
391/// Three modes:
392/// - **Historical** (`new_only = false, wait = false`): reads full history,
393///   applies filter, no cursor update.
394/// - **New** (`new_only = true, wait = false`): reads since last cursor,
395///   applies filter, advances cursor.
396/// - **Wait** (`wait = true`): loops until at least one foreign message passes
397///   the filter, then prints and exits.
398///
399/// `room_ids` lists the rooms to read. `filter.rooms` may further restrict the
400/// output but does not affect which files are opened.
401pub async fn cmd_query(
402    room_ids: &[String],
403    token: &str,
404    mut filter: QueryFilter,
405    opts: QueryOptions,
406) -> anyhow::Result<()> {
407    if room_ids.is_empty() {
408        anyhow::bail!("at least one room ID is required");
409    }
410
411    let username = username_from_token(token)?;
412
413    // Resolve mention_user from caller if mentions_only is requested.
414    if opts.mentions_only {
415        filter.mention_user = Some(username.clone());
416    }
417
418    if opts.wait || opts.new_only {
419        cmd_query_new(room_ids, &username, filter, opts).await
420    } else {
421        cmd_query_history(room_ids, &username, filter).await
422    }
423}
424
425/// Cursor-based (new / wait) query mode.
426async fn cmd_query_new(
427    room_ids: &[String],
428    username: &str,
429    filter: QueryFilter,
430    opts: QueryOptions,
431) -> anyhow::Result<()> {
432    loop {
433        let messages: Vec<Message> = if room_ids.len() == 1 {
434            let room_id = &room_ids[0];
435            let meta_path = paths::room_meta_path(room_id);
436            let chat_path = chat_path_from_meta(room_id, &meta_path);
437            let cursor_path = paths::cursor_path(room_id, username);
438            let host = read_host_from_meta(&meta_path);
439            poll_messages(
440                &chat_path,
441                &cursor_path,
442                Some(username),
443                host.as_deref(),
444                opts.since_uuid.as_deref(),
445            )
446            .await?
447        } else {
448            let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
449            for room_id in room_ids {
450                let meta_path = paths::room_meta_path(room_id);
451                let chat_path = chat_path_from_meta(room_id, &meta_path);
452                rooms_info.push((room_id.clone(), chat_path));
453            }
454            let room_refs: Vec<(&str, &Path)> = rooms_info
455                .iter()
456                .map(|(id, p)| (id.as_str(), p.as_path()))
457                .collect();
458            poll_messages_multi(&room_refs, username).await?
459        };
460
461        // Apply query filter, per-room subscription tiers, then sort + limit.
462        let mut filtered: Vec<Message> = messages
463            .into_iter()
464            .filter(|m| filter.matches(m, m.room()))
465            .collect();
466
467        if !filter.public_only {
468            apply_per_room_tier_filter(&mut filtered, room_ids, username);
469            apply_per_room_event_filter(&mut filtered, room_ids, username);
470        }
471
472        apply_sort_and_limit(&mut filtered, &filter);
473
474        if opts.wait {
475            // Only wake on foreign messages (includes system messages from plugins).
476            let foreign: Vec<&Message> = filtered
477                .iter()
478                .filter(|m| match m {
479                    Message::Message { user, .. } | Message::System { user, .. } => {
480                        user != username
481                    }
482                    Message::DirectMessage { to, .. } => to == username,
483                    _ => false,
484                })
485                .collect();
486
487            if !foreign.is_empty() {
488                for msg in foreign {
489                    println!("{}", serde_json::to_string(msg)?);
490                }
491                return Ok(());
492            }
493        } else {
494            for msg in &filtered {
495                println!("{}", serde_json::to_string(msg)?);
496            }
497            return Ok(());
498        }
499
500        tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
501    }
502}
503
504/// Historical (no-cursor) query mode.
505async fn cmd_query_history(
506    room_ids: &[String],
507    username: &str,
508    filter: QueryFilter,
509) -> anyhow::Result<()> {
510    let mut all_messages: Vec<Message> = Vec::new();
511
512    for room_id in room_ids {
513        let meta_path = paths::room_meta_path(room_id);
514        let chat_path = chat_path_from_meta(room_id, &meta_path);
515        let messages = history::load(&chat_path).await?;
516        all_messages.extend(messages);
517    }
518
519    // DM privacy filter: viewer only sees their own DMs.
520    let mut filtered: Vec<Message> = all_messages
521        .into_iter()
522        .filter(|m| filter.matches(m, m.room()))
523        .filter(|m| match m {
524            Message::DirectMessage { user, to, .. } => user == username || to == username,
525            _ => true,
526        })
527        .collect();
528
529    if !filter.public_only {
530        apply_per_room_tier_filter(&mut filtered, room_ids, username);
531        apply_per_room_event_filter(&mut filtered, room_ids, username);
532    }
533
534    apply_sort_and_limit(&mut filtered, &filter);
535
536    // If a specific target_id was requested and nothing was found, report an error.
537    if filtered.is_empty() {
538        if let Some((ref target_room, target_seq)) = filter.target_id {
539            use room_protocol::format_message_id;
540            anyhow::bail!(
541                "message not found: {}",
542                format_message_id(target_room, target_seq)
543            );
544        }
545    }
546
547    for msg in &filtered {
548        println!("{}", serde_json::to_string(msg)?);
549    }
550    Ok(())
551}
552
553/// Apply sort order and optional limit to a message list in place.
554fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
555    if filter.ascending {
556        messages.sort_by(|a, b| a.ts().cmp(b.ts()));
557    } else {
558        messages.sort_by(|a, b| b.ts().cmp(a.ts()));
559    }
560    if let Some(limit) = filter.limit {
561        messages.truncate(limit);
562    }
563}
564
565/// Read the room host username from the meta file, if present.
566///
567/// Returns `None` if the meta file does not exist, cannot be parsed, or has no
568/// `"host"` field. Callers should treat `None` the same as no host information.
569pub(super) fn read_host_from_meta(meta_path: &Path) -> Option<String> {
570    if !meta_path.exists() {
571        return None;
572    }
573    let data = std::fs::read_to_string(meta_path).ok()?;
574    let v: serde_json::Value = serde_json::from_str(&data).ok()?;
575    v["host"].as_str().map(str::to_owned)
576}
577
578pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
579    if meta_path.exists() {
580        if let Ok(data) = std::fs::read_to_string(meta_path) {
581            if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
582                if let Some(p) = v["chat_path"].as_str() {
583                    return PathBuf::from(p);
584                }
585            }
586        }
587    }
588    history::default_chat_path(room_id)
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594    use crate::message::make_message;
595    use tempfile::{NamedTempFile, TempDir};
596
597    /// poll_messages with no cursor and no since returns all messages.
598    #[tokio::test]
599    async fn poll_messages_no_cursor_returns_all() {
600        let chat = NamedTempFile::new().unwrap();
601        let cursor_dir = TempDir::new().unwrap();
602        let cursor = cursor_dir.path().join("cursor");
603
604        let msg = make_message("r", "alice", "hello");
605        crate::history::append(chat.path(), &msg).await.unwrap();
606
607        let result = poll_messages(chat.path(), &cursor, None, None, None)
608            .await
609            .unwrap();
610        assert_eq!(result.len(), 1);
611        assert_eq!(result[0].id(), msg.id());
612    }
613
614    /// poll_messages advances the cursor so a second call returns nothing.
615    #[tokio::test]
616    async fn poll_messages_advances_cursor() {
617        let chat = NamedTempFile::new().unwrap();
618        let cursor_dir = TempDir::new().unwrap();
619        let cursor = cursor_dir.path().join("cursor");
620
621        let msg = make_message("r", "alice", "hello");
622        crate::history::append(chat.path(), &msg).await.unwrap();
623
624        poll_messages(chat.path(), &cursor, None, None, None)
625            .await
626            .unwrap();
627
628        let second = poll_messages(chat.path(), &cursor, None, None, None)
629            .await
630            .unwrap();
631        assert!(
632            second.is_empty(),
633            "cursor should have advanced past the first message"
634        );
635    }
636
637    /// DM visibility: viewer only sees DMs they sent or received.
638    #[tokio::test]
639    async fn poll_messages_filters_dms_by_viewer() {
640        use crate::message::make_dm;
641        let chat = NamedTempFile::new().unwrap();
642        let cursor_dir = TempDir::new().unwrap();
643        let cursor = cursor_dir.path().join("cursor");
644
645        let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
646        let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
647        crate::history::append(chat.path(), &dm_alice_bob)
648            .await
649            .unwrap();
650        crate::history::append(chat.path(), &dm_alice_carol)
651            .await
652            .unwrap();
653
654        // bob sees only his DM
655        let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
656            .await
657            .unwrap();
658        assert_eq!(result.len(), 1);
659        assert_eq!(result[0].id(), dm_alice_bob.id());
660    }
661
662    /// DMs addressed to the watcher are included in the foreign message filter
663    /// used by cmd_watch, not silently consumed.
664    #[tokio::test]
665    async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
666        use crate::message::make_dm;
667        let chat = NamedTempFile::new().unwrap();
668        let cursor_dir = TempDir::new().unwrap();
669        let cursor = cursor_dir.path().join("cursor");
670
671        // alice sends a DM to bob, and a broadcast message
672        let dm = make_dm("r", "alice", "bob", "secret for bob");
673        let msg = make_message("r", "alice", "public hello");
674        crate::history::append(chat.path(), &dm).await.unwrap();
675        crate::history::append(chat.path(), &msg).await.unwrap();
676
677        // Simulate what cmd_watch does: poll, then filter for foreign messages + DMs
678        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
679            .await
680            .unwrap();
681
682        let username = "bob";
683        let foreign: Vec<&Message> = messages
684            .iter()
685            .filter(|m| match m {
686                Message::Message { user, .. } | Message::System { user, .. } => user != username,
687                Message::DirectMessage { to, .. } => to == username,
688                _ => false,
689            })
690            .collect();
691
692        // Both the DM (addressed to bob) and the broadcast (from alice) should appear
693        assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
694        assert!(
695            foreign
696                .iter()
697                .any(|m| matches!(m, Message::DirectMessage { .. })),
698            "DM must not be silently consumed"
699        );
700    }
701
702    /// DMs sent BY the watcher are excluded from the foreign filter (no self-echo).
703    #[tokio::test]
704    async fn poll_messages_dm_from_viewer_excluded_from_watch() {
705        use crate::message::make_dm;
706        let chat = NamedTempFile::new().unwrap();
707        let cursor_dir = TempDir::new().unwrap();
708        let cursor = cursor_dir.path().join("cursor");
709
710        // bob sends a DM to alice
711        let dm = make_dm("r", "bob", "alice", "from bob");
712        crate::history::append(chat.path(), &dm).await.unwrap();
713
714        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
715            .await
716            .unwrap();
717
718        let username = "bob";
719        let foreign: Vec<&Message> = messages
720            .iter()
721            .filter(|m| match m {
722                Message::Message { user, .. } | Message::System { user, .. } => user != username,
723                Message::DirectMessage { to, .. } => to == username,
724                _ => false,
725            })
726            .collect();
727
728        assert!(
729            foreign.is_empty(),
730            "DMs sent by the watcher should not wake watch"
731        );
732    }
733
734    /// System messages from other users wake the watch filter (#452).
735    #[tokio::test]
736    async fn watch_filter_wakes_on_foreign_system_message() {
737        use room_protocol::make_system;
738        let chat = NamedTempFile::new().unwrap();
739        let cursor_dir = TempDir::new().unwrap();
740        let cursor = cursor_dir.path().join("cursor");
741
742        let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
743        crate::history::append(chat.path(), &sys).await.unwrap();
744
745        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
746            .await
747            .unwrap();
748
749        let username = "bob";
750        let foreign: Vec<&Message> = messages
751            .iter()
752            .filter(|m| match m {
753                Message::Message { user, .. } | Message::System { user, .. } => user != username,
754                Message::DirectMessage { to, .. } => to == username,
755                _ => false,
756            })
757            .collect();
758
759        assert_eq!(
760            foreign.len(),
761            1,
762            "system messages from other users should wake watch"
763        );
764        assert!(matches!(foreign[0], Message::System { .. }));
765    }
766
767    /// System messages from the watcher's own username do not wake watch.
768    #[tokio::test]
769    async fn watch_filter_ignores_own_system_message() {
770        use room_protocol::make_system;
771        let chat = NamedTempFile::new().unwrap();
772        let cursor_dir = TempDir::new().unwrap();
773        let cursor = cursor_dir.path().join("cursor");
774
775        let sys = make_system("r", "bob", "bob subscribed (tier: full)");
776        crate::history::append(chat.path(), &sys).await.unwrap();
777
778        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
779            .await
780            .unwrap();
781
782        let username = "bob";
783        let foreign: Vec<&Message> = messages
784            .iter()
785            .filter(|m| match m {
786                Message::Message { user, .. } | Message::System { user, .. } => user != username,
787                Message::DirectMessage { to, .. } => to == username,
788                _ => false,
789            })
790            .collect();
791
792        assert!(
793            foreign.is_empty(),
794            "system messages from self should not wake watch"
795        );
796    }
797
798    /// Watch filter handles a mix of messages, system events, and DMs correctly.
799    #[tokio::test]
800    async fn watch_filter_mixed_message_types() {
801        use crate::message::make_dm;
802        use room_protocol::make_system;
803        let chat = NamedTempFile::new().unwrap();
804        let cursor_dir = TempDir::new().unwrap();
805        let cursor = cursor_dir.path().join("cursor");
806
807        // Foreign regular message
808        let msg = make_message("r", "alice", "hello");
809        // Foreign system message (plugin broadcast)
810        let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
811        // Own system message (should be filtered out)
812        let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
813        // DM addressed to watcher
814        let dm = make_dm("r", "alice", "bob", "private note");
815        // Own regular message (should be filtered out)
816        let own_msg = make_message("r", "bob", "my own message");
817
818        for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
819            crate::history::append(chat.path(), m).await.unwrap();
820        }
821
822        let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
823            .await
824            .unwrap();
825
826        let username = "bob";
827        let foreign: Vec<&Message> = messages
828            .iter()
829            .filter(|m| match m {
830                Message::Message { user, .. } | Message::System { user, .. } => user != username,
831                Message::DirectMessage { to, .. } => to == username,
832                _ => false,
833            })
834            .collect();
835
836        assert_eq!(
837            foreign.len(),
838            3,
839            "should see: foreign message + foreign system + DM to self"
840        );
841        assert!(
842            foreign.iter().any(|m| matches!(m, Message::System { .. })),
843            "system message must appear in watch results"
844        );
845        assert!(
846            foreign.iter().any(|m| matches!(m, Message::Message { .. })),
847            "regular foreign message must appear"
848        );
849        assert!(
850            foreign
851                .iter()
852                .any(|m| matches!(m, Message::DirectMessage { .. })),
853            "DM to self must appear"
854        );
855    }
856
857    /// Host sees all DMs in poll regardless of sender/recipient.
858    #[tokio::test]
859    async fn poll_messages_host_sees_all_dms() {
860        use crate::message::make_dm;
861        let chat = NamedTempFile::new().unwrap();
862        let cursor_dir = TempDir::new().unwrap();
863        let cursor = cursor_dir.path().join("cursor");
864
865        let dm_alice_bob = make_dm("r", "alice", "bob", "private");
866        let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
867        crate::history::append(chat.path(), &dm_alice_bob)
868            .await
869            .unwrap();
870        crate::history::append(chat.path(), &dm_carol_dave)
871            .await
872            .unwrap();
873
874        // host "eve" can see both DMs
875        let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
876            .await
877            .unwrap();
878        assert_eq!(result.len(), 2, "host should see all DMs");
879    }
880
881    /// Non-host third party cannot see DMs they are not party to.
882    #[tokio::test]
883    async fn poll_messages_non_host_cannot_see_unrelated_dms() {
884        use crate::message::make_dm;
885        let chat = NamedTempFile::new().unwrap();
886        let cursor_dir = TempDir::new().unwrap();
887        let cursor = cursor_dir.path().join("cursor");
888
889        let dm = make_dm("r", "alice", "bob", "private");
890        crate::history::append(chat.path(), &dm).await.unwrap();
891
892        // carol is not a party and is not host
893        let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
894            .await
895            .unwrap();
896        assert!(result.is_empty(), "non-host third party should not see DM");
897    }
898
899    /// Host reads from pull_messages as well.
900    #[tokio::test]
901    async fn pull_messages_host_sees_all_dms() {
902        use crate::message::make_dm;
903        let chat = NamedTempFile::new().unwrap();
904
905        let dm = make_dm("r", "alice", "bob", "secret");
906        crate::history::append(chat.path(), &dm).await.unwrap();
907
908        let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
909            .await
910            .unwrap();
911        assert_eq!(result.len(), 1, "host should see the DM via pull");
912    }
913
914    // ── poll_messages_multi tests ──────────────────────────────────────────
915
916    /// Multi-room poll merges messages from two rooms sorted by timestamp.
917    #[tokio::test]
918    async fn poll_multi_merges_by_timestamp() {
919        let chat_a = NamedTempFile::new().unwrap();
920        let chat_b = NamedTempFile::new().unwrap();
921
922        let rid_a = format!("test-merge-a-{}", std::process::id());
923        let rid_b = format!("test-merge-b-{}", std::process::id());
924
925        // Append messages with interleaved timestamps
926        let msg_a1 = make_message(&rid_a, "alice", "a1");
927        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
928        let msg_b1 = make_message(&rid_b, "bob", "b1");
929        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
930        let msg_a2 = make_message(&rid_a, "alice", "a2");
931
932        crate::history::append(chat_a.path(), &msg_a1)
933            .await
934            .unwrap();
935        crate::history::append(chat_b.path(), &msg_b1)
936            .await
937            .unwrap();
938        crate::history::append(chat_a.path(), &msg_a2)
939            .await
940            .unwrap();
941
942        let rooms: Vec<(&str, &Path)> = vec![
943            (rid_a.as_str(), chat_a.path()),
944            (rid_b.as_str(), chat_b.path()),
945        ];
946
947        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
948        assert_eq!(result.len(), 3);
949        // Verify timestamp ordering
950        assert!(result[0].ts() <= result[1].ts());
951        assert!(result[1].ts() <= result[2].ts());
952        // First message should be from room-a (earliest)
953        assert_eq!(result[0].room(), &rid_a);
954
955        // Clean up cursor files
956        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
957        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
958    }
959
960    /// Multi-room poll uses per-room cursors (second call returns nothing).
961    #[tokio::test]
962    async fn poll_multi_advances_per_room_cursors() {
963        let chat_a = NamedTempFile::new().unwrap();
964        let chat_b = NamedTempFile::new().unwrap();
965
966        // Use unique room IDs to avoid cursor file collisions with parallel tests
967        let rid_a = format!("test-cursor-a-{}", std::process::id());
968        let rid_b = format!("test-cursor-b-{}", std::process::id());
969
970        let msg_a = make_message(&rid_a, "alice", "hello a");
971        let msg_b = make_message(&rid_b, "bob", "hello b");
972        crate::history::append(chat_a.path(), &msg_a).await.unwrap();
973        crate::history::append(chat_b.path(), &msg_b).await.unwrap();
974
975        let rooms: Vec<(&str, &Path)> = vec![
976            (rid_a.as_str(), chat_a.path()),
977            (rid_b.as_str(), chat_b.path()),
978        ];
979
980        // First poll gets everything
981        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
982        assert_eq!(result.len(), 2);
983
984        // Second poll gets nothing (cursors advanced)
985        let result2 = poll_messages_multi(&rooms, "viewer").await.unwrap();
986        assert!(
987            result2.is_empty(),
988            "second multi-poll should return nothing"
989        );
990
991        // Clean up cursor files
992        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
993        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
994    }
995
996    /// Multi-room poll with one empty room still returns messages from the other.
997    #[tokio::test]
998    async fn poll_multi_one_empty_room() {
999        let chat_a = NamedTempFile::new().unwrap();
1000        let chat_b = NamedTempFile::new().unwrap();
1001
1002        let rid_a = format!("test-empty-a-{}", std::process::id());
1003        let rid_b = format!("test-empty-b-{}", std::process::id());
1004
1005        let msg = make_message(&rid_a, "alice", "only here");
1006        crate::history::append(chat_a.path(), &msg).await.unwrap();
1007        // chat_b is empty
1008
1009        let rooms: Vec<(&str, &Path)> = vec![
1010            (rid_a.as_str(), chat_a.path()),
1011            (rid_b.as_str(), chat_b.path()),
1012        ];
1013
1014        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
1015        assert_eq!(result.len(), 1);
1016        assert_eq!(result[0].room(), &rid_a);
1017
1018        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
1019        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
1020    }
1021
1022    /// Multi-room poll with no rooms returns nothing.
1023    #[tokio::test]
1024    async fn poll_multi_no_rooms() {
1025        let rooms: Vec<(&str, &Path)> = vec![];
1026        let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
1027        assert!(result.is_empty());
1028    }
1029
1030    /// Multi-room poll filters DMs by viewer across rooms.
1031    #[tokio::test]
1032    async fn poll_multi_filters_dms_across_rooms() {
1033        use crate::message::make_dm;
1034        let chat_a = NamedTempFile::new().unwrap();
1035        let chat_b = NamedTempFile::new().unwrap();
1036
1037        let rid_a = format!("test-dm-a-{}", std::process::id());
1038        let rid_b = format!("test-dm-b-{}", std::process::id());
1039
1040        // DM to bob in room-a, DM to carol in room-b
1041        let dm_to_bob = make_dm(&rid_a, "alice", "bob", "secret for bob");
1042        let dm_to_carol = make_dm(&rid_b, "alice", "carol", "secret for carol");
1043        crate::history::append(chat_a.path(), &dm_to_bob)
1044            .await
1045            .unwrap();
1046        crate::history::append(chat_b.path(), &dm_to_carol)
1047            .await
1048            .unwrap();
1049
1050        let rooms: Vec<(&str, &Path)> = vec![
1051            (rid_a.as_str(), chat_a.path()),
1052            (rid_b.as_str(), chat_b.path()),
1053        ];
1054
1055        // bob sees only room-a DM
1056        let result = poll_messages_multi(&rooms, "bob").await.unwrap();
1057        assert_eq!(result.len(), 1);
1058        assert_eq!(result[0].room(), &rid_a);
1059
1060        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "bob"));
1061        let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "bob"));
1062    }
1063
1064    // ── cmd_query unit tests ───────────────────────────────────────────────────
1065
1066    /// cmd_query in historical mode returns all messages (newest-first by default).
1067    #[tokio::test]
1068    async fn cmd_query_history_returns_all_newest_first() {
1069        let chat = NamedTempFile::new().unwrap();
1070        let cursor_dir = TempDir::new().unwrap();
1071        let token_dir = TempDir::new().unwrap();
1072
1073        let room_id = format!("test-cqh-{}", std::process::id());
1074        write_token_file(&token_dir, &room_id, "alice-cqh", "tok-cqh");
1075        write_meta_file(&room_id, chat.path());
1076
1077        for i in 0..3u32 {
1078            crate::history::append(
1079                chat.path(),
1080                &make_message(&room_id, "alice-cqh", format!("{i}")),
1081            )
1082            .await
1083            .unwrap();
1084        }
1085
1086        let filter = QueryFilter {
1087            rooms: vec![room_id.clone()],
1088            ascending: false,
1089            ..Default::default()
1090        };
1091        let opts = QueryOptions {
1092            new_only: false,
1093            wait: false,
1094            interval_secs: 5,
1095            mentions_only: false,
1096            since_uuid: None,
1097        };
1098
1099        // cursor should NOT advance (historical mode)
1100        let cursor_path = crate::paths::cursor_path(&room_id, "alice-cqh");
1101        let _ = std::fs::remove_file(&cursor_path);
1102
1103        // Run cmd_query — captures stdout indirectly by ensuring cursor unchanged.
1104        oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqh", filter, opts, &cursor_dir)
1105            .await
1106            .unwrap();
1107
1108        // Cursor must not have been written in historical mode.
1109        assert!(
1110            !cursor_path.exists(),
1111            "historical query must not write a cursor file"
1112        );
1113
1114        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1115        let _ = std::fs::remove_file(&global_token_path("alice-cqh"));
1116    }
1117
1118    /// cmd_query in --new mode advances the cursor.
1119    #[tokio::test]
1120    async fn cmd_query_new_advances_cursor() {
1121        let chat = NamedTempFile::new().unwrap();
1122        let cursor_dir = TempDir::new().unwrap();
1123        let token_dir = TempDir::new().unwrap();
1124
1125        let room_id = format!("test-cqn-{}", std::process::id());
1126        write_token_file(&token_dir, &room_id, "alice-cqn", "tok-cqn");
1127        write_meta_file(&room_id, chat.path());
1128
1129        let msg = make_message(&room_id, "bob", "hello");
1130        crate::history::append(chat.path(), &msg).await.unwrap();
1131
1132        let filter = QueryFilter {
1133            rooms: vec![room_id.clone()],
1134            ascending: true,
1135            ..Default::default()
1136        };
1137        let opts = QueryOptions {
1138            new_only: true,
1139            wait: false,
1140            interval_secs: 5,
1141            mentions_only: false,
1142            since_uuid: None,
1143        };
1144
1145        // First query — should return the message and write cursor.
1146        let result = oneshot_cmd_query_to_vec(
1147            &[room_id.clone()],
1148            "tok-cqn",
1149            filter.clone(),
1150            opts.clone(),
1151            &cursor_dir,
1152        )
1153        .await
1154        .unwrap();
1155        assert_eq!(result.len(), 1);
1156
1157        // Second query — cursor advanced, nothing new.
1158        let result2 =
1159            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
1160                .await
1161                .unwrap();
1162        assert!(
1163            result2.is_empty(),
1164            "second query should return nothing (cursor advanced)"
1165        );
1166
1167        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1168        let _ = std::fs::remove_file(&global_token_path("alice-cqn"));
1169    }
1170
1171    /// cmd_query with content_search only returns matching messages.
1172    #[tokio::test]
1173    async fn cmd_query_content_search_filters() {
1174        let chat = NamedTempFile::new().unwrap();
1175        let cursor_dir = TempDir::new().unwrap();
1176        let token_dir = TempDir::new().unwrap();
1177
1178        let room_id = format!("test-cqs-{}", std::process::id());
1179        write_token_file(&token_dir, &room_id, "alice-cqs", "tok-cqs");
1180        write_meta_file(&room_id, chat.path());
1181
1182        crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
1183            .await
1184            .unwrap();
1185        crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
1186            .await
1187            .unwrap();
1188
1189        let filter = QueryFilter {
1190            rooms: vec![room_id.clone()],
1191            content_search: Some("hello".into()),
1192            ascending: true,
1193            ..Default::default()
1194        };
1195        let opts = QueryOptions {
1196            new_only: false,
1197            wait: false,
1198            interval_secs: 5,
1199            mentions_only: false,
1200            since_uuid: None,
1201        };
1202
1203        let result =
1204            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
1205                .await
1206                .unwrap();
1207        assert_eq!(result.len(), 1);
1208        assert!(result[0].content().unwrap().contains("hello"));
1209
1210        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1211        let _ = std::fs::remove_file(&global_token_path("alice-cqs"));
1212    }
1213
1214    /// cmd_query with user filter only returns messages from that user.
1215    #[tokio::test]
1216    async fn cmd_query_user_filter() {
1217        let chat = NamedTempFile::new().unwrap();
1218        let cursor_dir = TempDir::new().unwrap();
1219        let token_dir = TempDir::new().unwrap();
1220
1221        let room_id = format!("test-cqu-{}", std::process::id());
1222        write_token_file(&token_dir, &room_id, "alice-cqu", "tok-cqu");
1223        write_meta_file(&room_id, chat.path());
1224
1225        crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
1226            .await
1227            .unwrap();
1228        crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
1229            .await
1230            .unwrap();
1231
1232        let filter = QueryFilter {
1233            rooms: vec![room_id.clone()],
1234            users: vec!["bob".into()],
1235            ascending: true,
1236            ..Default::default()
1237        };
1238        let opts = QueryOptions {
1239            new_only: false,
1240            wait: false,
1241            interval_secs: 5,
1242            mentions_only: false,
1243            since_uuid: None,
1244        };
1245
1246        let result =
1247            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
1248                .await
1249                .unwrap();
1250        assert_eq!(result.len(), 1);
1251        assert_eq!(result[0].user(), "bob");
1252
1253        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1254        let _ = std::fs::remove_file(&global_token_path("alice-cqu"));
1255    }
1256
1257    /// cmd_query with limit returns only N messages.
1258    #[tokio::test]
1259    async fn cmd_query_limit() {
1260        let chat = NamedTempFile::new().unwrap();
1261        let cursor_dir = TempDir::new().unwrap();
1262        let token_dir = TempDir::new().unwrap();
1263
1264        let room_id = format!("test-cql-{}", std::process::id());
1265        write_token_file(&token_dir, &room_id, "alice-cql", "tok-cql");
1266        write_meta_file(&room_id, chat.path());
1267
1268        for i in 0..5u32 {
1269            crate::history::append(
1270                chat.path(),
1271                &make_message(&room_id, "bob", format!("msg {i}")),
1272            )
1273            .await
1274            .unwrap();
1275        }
1276
1277        let filter = QueryFilter {
1278            rooms: vec![room_id.clone()],
1279            limit: Some(2),
1280            ascending: false,
1281            ..Default::default()
1282        };
1283        let opts = QueryOptions {
1284            new_only: false,
1285            wait: false,
1286            interval_secs: 5,
1287            mentions_only: false,
1288            since_uuid: None,
1289        };
1290
1291        let result =
1292            oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
1293                .await
1294                .unwrap();
1295        assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
1296
1297        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1298        let _ = std::fs::remove_file(&global_token_path("alice-cql"));
1299    }
1300
1301    // ── Test helpers ──────────────────────────────────────────────────────────
1302
1303    fn global_token_path(username: &str) -> PathBuf {
1304        crate::paths::global_token_path(username)
1305    }
1306
1307    fn write_token_file(_dir: &TempDir, _room_id: &str, username: &str, token: &str) {
1308        let path = global_token_path(username);
1309        if let Some(parent) = path.parent() {
1310            std::fs::create_dir_all(parent).unwrap();
1311        }
1312        let data = serde_json::json!({ "username": username, "token": token });
1313        std::fs::write(&path, format!("{data}\n")).unwrap();
1314    }
1315
1316    fn write_meta_file(room_id: &str, chat_path: &Path) {
1317        let meta_path = crate::paths::room_meta_path(room_id);
1318        if let Some(parent) = meta_path.parent() {
1319            std::fs::create_dir_all(parent).unwrap();
1320        }
1321        let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
1322        std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
1323    }
1324
1325    /// Run cmd_query and collect returned messages.
1326    ///
1327    /// Since cmd_query writes to stdout, we wrap it to capture results by
1328    /// re-reading the chat file with the same filter in historical mode.
1329    /// For `new_only` tests we verify the cursor state instead.
1330    async fn oneshot_cmd_query_to_vec(
1331        room_ids: &[String],
1332        token: &str,
1333        filter: QueryFilter,
1334        opts: QueryOptions,
1335        _cursor_dir: &TempDir,
1336    ) -> anyhow::Result<Vec<Message>> {
1337        // Snapshot cursor before run.
1338        let cursor_before = room_ids
1339            .first()
1340            .map(|id| {
1341                // Resolve username by reading the token file for this room.
1342                super::super::token::username_from_token(token)
1343                    .ok()
1344                    .map(|u| {
1345                        let p = crate::paths::cursor_path(id, &u);
1346                        std::fs::read_to_string(&p).ok()
1347                    })
1348                    .flatten()
1349            })
1350            .flatten();
1351
1352        // Run cmd_query (side effect: may update cursor).
1353        cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
1354
1355        // Snapshot cursor after run.
1356        let cursor_after = room_ids
1357            .first()
1358            .map(|id| {
1359                super::super::token::username_from_token(token)
1360                    .ok()
1361                    .map(|u| {
1362                        let p = crate::paths::cursor_path(id, &u);
1363                        std::fs::read_to_string(&p).ok()
1364                    })
1365                    .flatten()
1366            })
1367            .flatten();
1368
1369        // Reconstruct what cmd_query would have returned.
1370        // For historical mode: re-run with same filter and collect messages.
1371        // For new_only mode: reload history and apply filter with the "before" cursor.
1372        if !opts.new_only && !opts.wait {
1373            // Historical: reload and reapply filter.
1374            let mut all: Vec<Message> = Vec::new();
1375            for room_id in room_ids {
1376                let meta_path = crate::paths::room_meta_path(room_id);
1377                let chat_path = chat_path_from_meta(room_id, &meta_path);
1378                let msgs = history::load(&chat_path).await?;
1379                all.extend(msgs);
1380            }
1381            let username = username_from_token(token).unwrap_or_default();
1382            let mut result: Vec<Message> = all
1383                .into_iter()
1384                .filter(|m| filter.matches(m, m.room()))
1385                .filter(|m| match m {
1386                    Message::DirectMessage { user, to, .. } => user == &username || to == &username,
1387                    _ => true,
1388                })
1389                .collect();
1390            apply_sort_and_limit(&mut result, &filter);
1391            Ok(result)
1392        } else {
1393            // new_only mode: reconstruct returned messages by replaying history
1394            // from cursor_before (before the call advanced it).
1395            let advanced = cursor_after != cursor_before;
1396            if advanced {
1397                let room_id = &room_ids[0];
1398                let meta_path = crate::paths::room_meta_path(room_id);
1399                let chat_path = chat_path_from_meta(room_id, &meta_path);
1400                let all = history::load(&chat_path).await?;
1401                // Find start from the pre-run cursor UUID.
1402                let start = match &cursor_before {
1403                    Some(id) => all
1404                        .iter()
1405                        .position(|m| m.id() == id.trim())
1406                        .map(|i| i + 1)
1407                        .unwrap_or(0),
1408                    None => 0,
1409                };
1410                let filtered: Vec<Message> = all[start..]
1411                    .iter()
1412                    .filter(|m| filter.matches(m, m.room()))
1413                    .cloned()
1414                    .collect();
1415                Ok(filtered)
1416            } else {
1417                Ok(vec![])
1418            }
1419        }
1420    }
1421
1422    /// username_from_token returns an error for an unknown token.
1423    #[test]
1424    fn unknown_token_returns_error() {
1425        let result = super::super::token::username_from_token("bad-token-nonexistent");
1426        assert!(result.is_err());
1427        assert!(result
1428            .unwrap_err()
1429            .to_string()
1430            .contains("token not recognised"));
1431    }
1432
1433    // ── subscription tier filtering tests ──────────────────────────────────
1434
1435    /// load_user_tier returns Full when no subscription file exists.
1436    #[test]
1437    fn load_user_tier_missing_file_returns_full() {
1438        // Use a room ID that will never have a subscription file on disk.
1439        let tier = load_user_tier("nonexistent-room-tier-test", "alice");
1440        assert_eq!(tier, SubscriptionTier::Full);
1441    }
1442
1443    /// load_user_tier returns the persisted tier when the file exists.
1444    #[test]
1445    fn load_user_tier_returns_persisted_tier() {
1446        let state_dir = crate::paths::room_state_dir();
1447        let _ = std::fs::create_dir_all(&state_dir);
1448        let room_id = format!("test-tier-load-{}", std::process::id());
1449        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1450
1451        let mut map = std::collections::HashMap::new();
1452        map.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1453        map.insert("bob".to_string(), SubscriptionTier::Unsubscribed);
1454        let json = serde_json::to_string_pretty(&map).unwrap();
1455        std::fs::write(&sub_path, json).unwrap();
1456
1457        assert_eq!(
1458            load_user_tier(&room_id, "alice"),
1459            SubscriptionTier::MentionsOnly
1460        );
1461        assert_eq!(
1462            load_user_tier(&room_id, "bob"),
1463            SubscriptionTier::Unsubscribed
1464        );
1465        // Unknown user defaults to Full.
1466        assert_eq!(load_user_tier(&room_id, "carol"), SubscriptionTier::Full);
1467
1468        let _ = std::fs::remove_file(&sub_path);
1469    }
1470
1471    /// apply_tier_filter with Full keeps all messages.
1472    #[test]
1473    fn apply_tier_filter_full_keeps_all() {
1474        let mut msgs = vec![
1475            make_message("r", "alice", "hello"),
1476            make_message("r", "bob", "world"),
1477        ];
1478        apply_tier_filter(&mut msgs, SubscriptionTier::Full, "carol");
1479        assert_eq!(msgs.len(), 2);
1480    }
1481
1482    /// apply_tier_filter with MentionsOnly keeps only @mentions.
1483    #[test]
1484    fn apply_tier_filter_mentions_only_filters() {
1485        let mut msgs = vec![
1486            make_message("r", "alice", "hey @carol check this"),
1487            make_message("r", "bob", "unrelated message"),
1488            make_message("r", "dave", "also @carol"),
1489        ];
1490        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1491        assert_eq!(msgs.len(), 2);
1492        assert!(msgs[0].content().unwrap().contains("@carol"));
1493        assert!(msgs[1].content().unwrap().contains("@carol"));
1494    }
1495
1496    /// apply_tier_filter with Unsubscribed clears all messages.
1497    #[test]
1498    fn apply_tier_filter_unsubscribed_clears_all() {
1499        let mut msgs = vec![
1500            make_message("r", "alice", "hey @carol"),
1501            make_message("r", "bob", "world"),
1502        ];
1503        apply_tier_filter(&mut msgs, SubscriptionTier::Unsubscribed, "carol");
1504        assert!(msgs.is_empty());
1505    }
1506
1507    /// apply_tier_filter with MentionsOnly and no mentions returns empty.
1508    #[test]
1509    fn apply_tier_filter_mentions_only_no_mentions_returns_empty() {
1510        let mut msgs = vec![
1511            make_message("r", "alice", "hello"),
1512            make_message("r", "bob", "world"),
1513        ];
1514        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1515        assert!(msgs.is_empty());
1516    }
1517
1518    /// cmd_query with Unsubscribed tier and public_only=true still returns messages.
1519    #[tokio::test]
1520    async fn cmd_query_public_bypasses_tier() {
1521        let chat = NamedTempFile::new().unwrap();
1522        let token_dir = TempDir::new().unwrap();
1523        let cursor_dir = TempDir::new().unwrap();
1524
1525        let room_id = format!("test-pub-tier-{}", std::process::id());
1526        write_token_file(&token_dir, &room_id, "alice-pub", "tok-pub-tier");
1527        write_meta_file(&room_id, chat.path());
1528
1529        // Write subscription map marking alice-pub as Unsubscribed.
1530        let state_dir = crate::paths::room_state_dir();
1531        let _ = std::fs::create_dir_all(&state_dir);
1532        let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1533        let mut map = std::collections::HashMap::new();
1534        map.insert("alice-pub".to_string(), SubscriptionTier::Unsubscribed);
1535        std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
1536
1537        // Add a message.
1538        crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
1539            .await
1540            .unwrap();
1541
1542        // Query with public_only=true should bypass tier and return the message.
1543        let filter = QueryFilter {
1544            rooms: vec![room_id.clone()],
1545            public_only: true,
1546            ascending: true,
1547            ..Default::default()
1548        };
1549        let opts = QueryOptions {
1550            new_only: false,
1551            wait: false,
1552            interval_secs: 5,
1553            mentions_only: false,
1554            since_uuid: None,
1555        };
1556
1557        let result = oneshot_cmd_query_to_vec(
1558            &[room_id.clone()],
1559            "tok-pub-tier",
1560            filter,
1561            opts,
1562            &cursor_dir,
1563        )
1564        .await
1565        .unwrap();
1566        assert_eq!(
1567            result.len(),
1568            1,
1569            "public flag should bypass Unsubscribed tier"
1570        );
1571
1572        let _ = std::fs::remove_file(&sub_path);
1573        let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1574        let _ = std::fs::remove_file(&global_token_path("alice-pub"));
1575    }
1576
1577    /// MentionsOnly tier sets mention_user filter, narrowing results to @mentions.
1578    #[test]
1579    fn mentions_only_tier_sets_mention_user_on_filter() {
1580        // Verify the tier logic: when tier is MentionsOnly and mention_user is
1581        // not already set, it should be set to the username.
1582        let mut filter = QueryFilter::default();
1583        let tier = SubscriptionTier::MentionsOnly;
1584
1585        // Simulate what cmd_query does.
1586        match tier {
1587            SubscriptionTier::MentionsOnly => {
1588                if filter.mention_user.is_none() {
1589                    filter.mention_user = Some("alice".to_string());
1590                }
1591            }
1592            _ => {}
1593        }
1594
1595        assert_eq!(filter.mention_user, Some("alice".to_string()));
1596
1597        // Now verify with apply_tier_filter that messages are correctly narrowed.
1598        let mut msgs = vec![
1599            make_message("r", "bob", "hey @alice look"),
1600            make_message("r", "bob", "unrelated chatter"),
1601        ];
1602        apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "alice");
1603        assert_eq!(msgs.len(), 1);
1604        assert!(msgs[0].content().unwrap().contains("@alice"));
1605    }
1606
1607    /// MentionsOnly tier does not override an existing mention_user filter.
1608    #[test]
1609    fn mentions_only_tier_preserves_existing_mention_user() {
1610        let mut filter = QueryFilter {
1611            mention_user: Some("bob".to_string()),
1612            ..Default::default()
1613        };
1614
1615        // MentionsOnly should not overwrite the existing filter.
1616        match SubscriptionTier::MentionsOnly {
1617            SubscriptionTier::MentionsOnly => {
1618                if filter.mention_user.is_none() {
1619                    filter.mention_user = Some("alice".to_string());
1620                }
1621            }
1622            _ => {}
1623        }
1624
1625        assert_eq!(
1626            filter.mention_user,
1627            Some("bob".to_string()),
1628            "existing mention_user filter should be preserved"
1629        );
1630    }
1631
1632    // ── per-room subscription tier filtering tests ─────────────────────────
1633
1634    #[test]
1635    fn per_room_tier_filter_full_keeps_all() {
1636        let mut msgs = vec![
1637            make_message("dev", "alice", "hello from dev"),
1638            make_message("lobby", "bob", "hello from lobby"),
1639        ];
1640        // No subscription files → defaults to Full for both rooms.
1641        let rooms = vec![
1642            "nonexistent-perroom-full-1".to_string(),
1643            "nonexistent-perroom-full-2".to_string(),
1644        ];
1645        apply_per_room_tier_filter(&mut msgs, &rooms, "carol");
1646        assert_eq!(msgs.len(), 2);
1647    }
1648
1649    #[test]
1650    fn per_room_tier_filter_mixed_tiers() {
1651        let state_dir = crate::paths::room_state_dir();
1652        let _ = std::fs::create_dir_all(&state_dir);
1653
1654        let room_full = format!("perroom-mixed-full-{}", std::process::id());
1655        let room_unsub = format!("perroom-mixed-unsub-{}", std::process::id());
1656        let room_mentions = format!("perroom-mixed-ment-{}", std::process::id());
1657
1658        // Write subscription maps: room_full=Full (default), room_unsub=Unsubscribed, room_mentions=MentionsOnly
1659        let sub_unsub = crate::paths::broker_subscriptions_path(&state_dir, &room_unsub);
1660        let mut map_unsub = std::collections::HashMap::new();
1661        map_unsub.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1662        std::fs::write(&sub_unsub, serde_json::to_string(&map_unsub).unwrap()).unwrap();
1663
1664        let sub_ment = crate::paths::broker_subscriptions_path(&state_dir, &room_mentions);
1665        let mut map_ment = std::collections::HashMap::new();
1666        map_ment.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1667        std::fs::write(&sub_ment, serde_json::to_string(&map_ment).unwrap()).unwrap();
1668
1669        let mut msgs = vec![
1670            make_message(&room_full, "bob", "visible in full room"),
1671            make_message(&room_unsub, "bob", "invisible — unsubscribed"),
1672            make_message(&room_mentions, "bob", "no mention — filtered"),
1673            make_message(&room_mentions, "bob", "hey @alice check this"),
1674        ];
1675
1676        let rooms = vec![room_full.clone(), room_unsub.clone(), room_mentions.clone()];
1677        apply_per_room_tier_filter(&mut msgs, &rooms, "alice");
1678
1679        // Only the Full room message and the MentionsOnly room @alice message survive.
1680        assert_eq!(msgs.len(), 2);
1681        assert!(msgs[0].content().unwrap().contains("visible in full room"));
1682        assert!(msgs[1].content().unwrap().contains("@alice"));
1683
1684        // Cleanup
1685        let _ = std::fs::remove_file(&sub_unsub);
1686        let _ = std::fs::remove_file(&sub_ment);
1687    }
1688
1689    #[test]
1690    fn per_room_tier_filter_unknown_room_defaults_to_full() {
1691        let mut msgs = vec![make_message("mystery", "bob", "hello")];
1692        // Room not in the room_ids list at all — tier defaults to Full.
1693        apply_per_room_tier_filter(&mut msgs, &["other".to_string()], "alice");
1694        assert_eq!(msgs.len(), 1);
1695    }
1696
1697    // ── event filter tests ────────────────────────────────────────────────────
1698
1699    fn make_event_msg(room: &str, event_type: room_protocol::EventType) -> Message {
1700        room_protocol::make_event(room, "bot", event_type, "event content", None)
1701    }
1702
1703    #[test]
1704    fn event_filter_all_keeps_everything() {
1705        let mut msgs = vec![
1706            make_message("r", "alice", "hello"),
1707            make_event_msg("r", room_protocol::EventType::TaskPosted),
1708            make_event_msg("r", room_protocol::EventType::TaskFinished),
1709        ];
1710        apply_event_filter(&mut msgs, &EventFilter::All);
1711        assert_eq!(msgs.len(), 3);
1712    }
1713
1714    #[test]
1715    fn event_filter_none_removes_only_events() {
1716        let mut msgs = vec![
1717            make_message("r", "alice", "hello"),
1718            make_event_msg("r", room_protocol::EventType::TaskPosted),
1719            make_event_msg("r", room_protocol::EventType::TaskFinished),
1720        ];
1721        apply_event_filter(&mut msgs, &EventFilter::None);
1722        assert_eq!(msgs.len(), 1);
1723        assert!(msgs[0].content().unwrap().contains("hello"));
1724    }
1725
1726    #[test]
1727    fn event_filter_only_keeps_matching_events() {
1728        let mut types = std::collections::BTreeSet::new();
1729        types.insert(room_protocol::EventType::TaskPosted);
1730        let filter = EventFilter::Only { types };
1731
1732        let mut msgs = vec![
1733            make_message("r", "alice", "hello"),
1734            make_event_msg("r", room_protocol::EventType::TaskPosted),
1735            make_event_msg("r", room_protocol::EventType::TaskFinished),
1736        ];
1737        apply_event_filter(&mut msgs, &filter);
1738        assert_eq!(msgs.len(), 2); // hello + task_posted
1739    }
1740
1741    #[test]
1742    fn event_filter_only_removes_non_matching_events() {
1743        let mut types = std::collections::BTreeSet::new();
1744        types.insert(room_protocol::EventType::TaskFinished);
1745        let filter = EventFilter::Only { types };
1746
1747        let mut msgs = vec![
1748            make_event_msg("r", room_protocol::EventType::TaskPosted),
1749            make_event_msg("r", room_protocol::EventType::TaskAssigned),
1750            make_event_msg("r", room_protocol::EventType::TaskFinished),
1751        ];
1752        apply_event_filter(&mut msgs, &filter);
1753        assert_eq!(msgs.len(), 1);
1754    }
1755
1756    #[test]
1757    fn event_filter_does_not_affect_non_event_messages() {
1758        let mut msgs = vec![
1759            make_message("r", "alice", "hello"),
1760            make_message("r", "bob", "world"),
1761        ];
1762        apply_event_filter(&mut msgs, &EventFilter::None);
1763        assert_eq!(msgs.len(), 2, "non-event messages should not be filtered");
1764    }
1765
1766    #[test]
1767    fn load_user_event_filter_missing_file_returns_all() {
1768        let ef = load_user_event_filter("nonexistent-room-ef-test", "alice");
1769        assert_eq!(ef, EventFilter::All);
1770    }
1771
1772    #[test]
1773    fn load_user_event_filter_returns_persisted() {
1774        let state_dir = crate::paths::room_state_dir();
1775        let _ = std::fs::create_dir_all(&state_dir);
1776        let room_id = format!("test-ef-load-{}", std::process::id());
1777        let ef_path = crate::paths::broker_event_filters_path(&state_dir, &room_id);
1778
1779        let mut map = std::collections::HashMap::new();
1780        map.insert("alice".to_string(), EventFilter::None);
1781        let mut types = std::collections::BTreeSet::new();
1782        types.insert(room_protocol::EventType::TaskPosted);
1783        map.insert("bob".to_string(), EventFilter::Only { types });
1784        let json = serde_json::to_string_pretty(&map).unwrap();
1785        std::fs::write(&ef_path, json).unwrap();
1786
1787        assert_eq!(load_user_event_filter(&room_id, "alice"), EventFilter::None);
1788        let bob_filter = load_user_event_filter(&room_id, "bob");
1789        match bob_filter {
1790            EventFilter::Only { types } => {
1791                assert!(types.contains(&room_protocol::EventType::TaskPosted));
1792                assert_eq!(types.len(), 1);
1793            }
1794            _ => panic!("expected Only filter for bob"),
1795        }
1796        // Unknown user defaults to All.
1797        assert_eq!(load_user_event_filter(&room_id, "carol"), EventFilter::All);
1798
1799        let _ = std::fs::remove_file(&ef_path);
1800    }
1801
1802    /// pull_messages returns the last n entries without moving the cursor.
1803    #[tokio::test]
1804    async fn pull_messages_returns_tail_without_cursor_change() {
1805        let chat = NamedTempFile::new().unwrap();
1806        let cursor_dir = TempDir::new().unwrap();
1807        let cursor = cursor_dir.path().join("cursor");
1808
1809        for i in 0..5u32 {
1810            crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
1811                .await
1812                .unwrap();
1813        }
1814
1815        let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
1816        assert_eq!(pulled.len(), 3);
1817
1818        // cursor untouched — poll still returns all 5
1819        let polled = poll_messages(chat.path(), &cursor, None, None, None)
1820            .await
1821            .unwrap();
1822        assert_eq!(polled.len(), 5);
1823    }
1824}