1use std::path::{Path, PathBuf};
2
3use room_protocol::SubscriptionTier;
4
5use crate::{
6 broker::commands::load_subscription_map, history, message::Message, paths, query::QueryFilter,
7};
8
9use super::token::{read_cursor, username_from_token, write_cursor};
10
11fn load_user_tier(room_id: &str, username: &str) -> SubscriptionTier {
19 let state_dir = paths::room_state_dir();
20 let sub_path = paths::broker_subscriptions_path(&state_dir, room_id);
21 let map = load_subscription_map(&sub_path);
22 map.get(username).copied().unwrap_or(SubscriptionTier::Full)
23}
24
25fn apply_tier_filter(messages: &mut Vec<Message>, tier: SubscriptionTier, username: &str) {
31 match tier {
32 SubscriptionTier::Full => {}
33 SubscriptionTier::MentionsOnly => {
34 messages.retain(|m| m.mentions().iter().any(|mention| mention == username));
35 }
36 SubscriptionTier::Unsubscribed => {
37 messages.clear();
38 }
39 }
40}
41
42fn apply_per_room_tier_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
48 use std::collections::HashMap;
49 let tiers: HashMap<&str, SubscriptionTier> = room_ids
50 .iter()
51 .map(|r| (r.as_str(), load_user_tier(r, username)))
52 .collect();
53
54 messages.retain(|m| {
55 let tier = tiers
56 .get(m.room())
57 .copied()
58 .unwrap_or(SubscriptionTier::Full);
59 match tier {
60 SubscriptionTier::Full => true,
61 SubscriptionTier::MentionsOnly => {
62 m.mentions().iter().any(|mention| mention == username)
63 }
64 SubscriptionTier::Unsubscribed => false,
65 }
66 });
67}
68
69#[derive(Debug, Clone)]
73pub struct QueryOptions {
74 pub new_only: bool,
77 pub wait: bool,
79 pub interval_secs: u64,
81 pub mentions_only: bool,
84 pub since_uuid: Option<String>,
87}
88
89pub async fn poll_messages(
104 chat_path: &Path,
105 cursor_path: &Path,
106 viewer: Option<&str>,
107 host: Option<&str>,
108 since: Option<&str>,
109) -> anyhow::Result<Vec<Message>> {
110 let effective_since: Option<String> = since
111 .map(|s| s.to_owned())
112 .or_else(|| read_cursor(cursor_path));
113
114 let messages = history::load(chat_path).await?;
115
116 let start = match &effective_since {
117 Some(id) => messages
118 .iter()
119 .position(|m| m.id() == id)
120 .map(|i| i + 1)
121 .unwrap_or(0),
122 None => 0,
123 };
124
125 let result: Vec<Message> = messages[start..]
126 .iter()
127 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
128 .cloned()
129 .collect();
130
131 if let Some(last) = result.last() {
132 write_cursor(cursor_path, last.id())?;
133 }
134
135 Ok(result)
136}
137
138pub async fn pull_messages(
146 chat_path: &Path,
147 n: usize,
148 viewer: Option<&str>,
149 host: Option<&str>,
150) -> anyhow::Result<Vec<Message>> {
151 let clamped = n.min(200);
152 let all = history::tail(chat_path, clamped).await?;
153 let visible: Vec<Message> = all
154 .into_iter()
155 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
156 .collect();
157 Ok(visible)
158}
159
160pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
165 let username = username_from_token(token)?;
166 let meta_path = paths::room_meta_path(room_id);
167 let chat_path = chat_path_from_meta(room_id, &meta_path);
168
169 let host = read_host_from_meta(&meta_path);
170 let mut messages = pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
171 let tier = load_user_tier(room_id, &username);
172 apply_tier_filter(&mut messages, tier, &username);
173 for msg in &messages {
174 println!("{}", serde_json::to_string(msg)?);
175 }
176 Ok(())
177}
178
179pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
187 let username = username_from_token(token)?;
188 let meta_path = paths::room_meta_path(room_id);
189 let chat_path = chat_path_from_meta(room_id, &meta_path);
190 let cursor_path = paths::cursor_path(room_id, &username);
191 let host = read_host_from_meta(&meta_path);
192
193 loop {
194 let messages = poll_messages(
195 &chat_path,
196 &cursor_path,
197 Some(&username),
198 host.as_deref(),
199 None,
200 )
201 .await?;
202
203 let foreign: Vec<&Message> = messages
204 .iter()
205 .filter(|m| match m {
206 Message::Message { user, .. } | Message::System { user, .. } => user != &username,
207 Message::DirectMessage { to, .. } => to == &username,
208 _ => false,
209 })
210 .collect();
211
212 if !foreign.is_empty() {
213 for msg in foreign {
214 println!("{}", serde_json::to_string(msg)?);
215 }
216 return Ok(());
217 }
218
219 tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
220 }
221}
222
223pub async fn cmd_poll(
229 room_id: &str,
230 token: &str,
231 since: Option<String>,
232 mentions_only: bool,
233) -> anyhow::Result<()> {
234 let username = username_from_token(token)?;
235 let meta_path = paths::room_meta_path(room_id);
236 let chat_path = chat_path_from_meta(room_id, &meta_path);
237 let cursor_path = paths::cursor_path(room_id, &username);
238 let host = read_host_from_meta(&meta_path);
239
240 let messages = poll_messages(
241 &chat_path,
242 &cursor_path,
243 Some(&username),
244 host.as_deref(),
245 since.as_deref(),
246 )
247 .await?;
248 for msg in &messages {
249 if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
250 continue;
251 }
252 println!("{}", serde_json::to_string(msg)?);
253 }
254 Ok(())
255}
256
257pub async fn poll_messages_multi(
263 rooms: &[(&str, &Path)],
264 username: &str,
265) -> anyhow::Result<Vec<Message>> {
266 let mut all_messages: Vec<Message> = Vec::new();
267
268 for &(room_id, chat_path) in rooms {
269 let cursor_path = paths::cursor_path(room_id, username);
270 let meta_path = paths::room_meta_path(room_id);
271 let host = read_host_from_meta(&meta_path);
272 let msgs = poll_messages(
273 chat_path,
274 &cursor_path,
275 Some(username),
276 host.as_deref(),
277 None,
278 )
279 .await?;
280 all_messages.extend(msgs);
281 }
282
283 all_messages.sort_by(|a, b| a.ts().cmp(b.ts()));
284 Ok(all_messages)
285}
286
287pub async fn cmd_poll_multi(
292 room_ids: &[String],
293 token: &str,
294 mentions_only: bool,
295) -> anyhow::Result<()> {
296 let username = username_from_token(token)?;
298
299 let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
301 for room_id in room_ids {
302 let meta_path = paths::room_meta_path(room_id);
303 let chat_path = chat_path_from_meta(room_id, &meta_path);
304 rooms.push((room_id.as_str(), chat_path));
305 }
306
307 let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
308 let messages = poll_messages_multi(&room_refs, &username).await?;
309 for msg in &messages {
310 if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
311 continue;
312 }
313 println!("{}", serde_json::to_string(msg)?);
314 }
315 Ok(())
316}
317
318pub async fn cmd_query(
333 room_ids: &[String],
334 token: &str,
335 mut filter: QueryFilter,
336 opts: QueryOptions,
337) -> anyhow::Result<()> {
338 if room_ids.is_empty() {
339 anyhow::bail!("at least one room ID is required");
340 }
341
342 let username = username_from_token(token)?;
343
344 if opts.mentions_only {
346 filter.mention_user = Some(username.clone());
347 }
348
349 if opts.wait || opts.new_only {
350 cmd_query_new(room_ids, &username, filter, opts).await
351 } else {
352 cmd_query_history(room_ids, &username, filter).await
353 }
354}
355
356async fn cmd_query_new(
358 room_ids: &[String],
359 username: &str,
360 filter: QueryFilter,
361 opts: QueryOptions,
362) -> anyhow::Result<()> {
363 loop {
364 let messages: Vec<Message> = if room_ids.len() == 1 {
365 let room_id = &room_ids[0];
366 let meta_path = paths::room_meta_path(room_id);
367 let chat_path = chat_path_from_meta(room_id, &meta_path);
368 let cursor_path = paths::cursor_path(room_id, username);
369 let host = read_host_from_meta(&meta_path);
370 poll_messages(
371 &chat_path,
372 &cursor_path,
373 Some(username),
374 host.as_deref(),
375 opts.since_uuid.as_deref(),
376 )
377 .await?
378 } else {
379 let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
380 for room_id in room_ids {
381 let meta_path = paths::room_meta_path(room_id);
382 let chat_path = chat_path_from_meta(room_id, &meta_path);
383 rooms_info.push((room_id.clone(), chat_path));
384 }
385 let room_refs: Vec<(&str, &Path)> = rooms_info
386 .iter()
387 .map(|(id, p)| (id.as_str(), p.as_path()))
388 .collect();
389 poll_messages_multi(&room_refs, username).await?
390 };
391
392 let mut filtered: Vec<Message> = messages
394 .into_iter()
395 .filter(|m| filter.matches(m, m.room()))
396 .collect();
397
398 if !filter.public_only {
399 apply_per_room_tier_filter(&mut filtered, room_ids, username);
400 }
401
402 apply_sort_and_limit(&mut filtered, &filter);
403
404 if opts.wait {
405 let foreign: Vec<&Message> = filtered
407 .iter()
408 .filter(|m| match m {
409 Message::Message { user, .. } | Message::System { user, .. } => {
410 user != username
411 }
412 Message::DirectMessage { to, .. } => to == username,
413 _ => false,
414 })
415 .collect();
416
417 if !foreign.is_empty() {
418 for msg in foreign {
419 println!("{}", serde_json::to_string(msg)?);
420 }
421 return Ok(());
422 }
423 } else {
424 for msg in &filtered {
425 println!("{}", serde_json::to_string(msg)?);
426 }
427 return Ok(());
428 }
429
430 tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
431 }
432}
433
434async fn cmd_query_history(
436 room_ids: &[String],
437 username: &str,
438 filter: QueryFilter,
439) -> anyhow::Result<()> {
440 let mut all_messages: Vec<Message> = Vec::new();
441
442 for room_id in room_ids {
443 let meta_path = paths::room_meta_path(room_id);
444 let chat_path = chat_path_from_meta(room_id, &meta_path);
445 let messages = history::load(&chat_path).await?;
446 all_messages.extend(messages);
447 }
448
449 let mut filtered: Vec<Message> = all_messages
451 .into_iter()
452 .filter(|m| filter.matches(m, m.room()))
453 .filter(|m| match m {
454 Message::DirectMessage { user, to, .. } => user == username || to == username,
455 _ => true,
456 })
457 .collect();
458
459 if !filter.public_only {
460 apply_per_room_tier_filter(&mut filtered, room_ids, username);
461 }
462
463 apply_sort_and_limit(&mut filtered, &filter);
464
465 if filtered.is_empty() {
467 if let Some((ref target_room, target_seq)) = filter.target_id {
468 use room_protocol::format_message_id;
469 anyhow::bail!(
470 "message not found: {}",
471 format_message_id(target_room, target_seq)
472 );
473 }
474 }
475
476 for msg in &filtered {
477 println!("{}", serde_json::to_string(msg)?);
478 }
479 Ok(())
480}
481
482fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
484 if filter.ascending {
485 messages.sort_by(|a, b| a.ts().cmp(b.ts()));
486 } else {
487 messages.sort_by(|a, b| b.ts().cmp(a.ts()));
488 }
489 if let Some(limit) = filter.limit {
490 messages.truncate(limit);
491 }
492}
493
494pub(super) fn read_host_from_meta(meta_path: &Path) -> Option<String> {
499 if !meta_path.exists() {
500 return None;
501 }
502 let data = std::fs::read_to_string(meta_path).ok()?;
503 let v: serde_json::Value = serde_json::from_str(&data).ok()?;
504 v["host"].as_str().map(str::to_owned)
505}
506
507pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
508 if meta_path.exists() {
509 if let Ok(data) = std::fs::read_to_string(meta_path) {
510 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
511 if let Some(p) = v["chat_path"].as_str() {
512 return PathBuf::from(p);
513 }
514 }
515 }
516 }
517 history::default_chat_path(room_id)
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use crate::message::make_message;
524 use tempfile::{NamedTempFile, TempDir};
525
526 #[tokio::test]
528 async fn poll_messages_no_cursor_returns_all() {
529 let chat = NamedTempFile::new().unwrap();
530 let cursor_dir = TempDir::new().unwrap();
531 let cursor = cursor_dir.path().join("cursor");
532
533 let msg = make_message("r", "alice", "hello");
534 crate::history::append(chat.path(), &msg).await.unwrap();
535
536 let result = poll_messages(chat.path(), &cursor, None, None, None)
537 .await
538 .unwrap();
539 assert_eq!(result.len(), 1);
540 assert_eq!(result[0].id(), msg.id());
541 }
542
543 #[tokio::test]
545 async fn poll_messages_advances_cursor() {
546 let chat = NamedTempFile::new().unwrap();
547 let cursor_dir = TempDir::new().unwrap();
548 let cursor = cursor_dir.path().join("cursor");
549
550 let msg = make_message("r", "alice", "hello");
551 crate::history::append(chat.path(), &msg).await.unwrap();
552
553 poll_messages(chat.path(), &cursor, None, None, None)
554 .await
555 .unwrap();
556
557 let second = poll_messages(chat.path(), &cursor, None, None, None)
558 .await
559 .unwrap();
560 assert!(
561 second.is_empty(),
562 "cursor should have advanced past the first message"
563 );
564 }
565
566 #[tokio::test]
568 async fn poll_messages_filters_dms_by_viewer() {
569 use crate::message::make_dm;
570 let chat = NamedTempFile::new().unwrap();
571 let cursor_dir = TempDir::new().unwrap();
572 let cursor = cursor_dir.path().join("cursor");
573
574 let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
575 let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
576 crate::history::append(chat.path(), &dm_alice_bob)
577 .await
578 .unwrap();
579 crate::history::append(chat.path(), &dm_alice_carol)
580 .await
581 .unwrap();
582
583 let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
585 .await
586 .unwrap();
587 assert_eq!(result.len(), 1);
588 assert_eq!(result[0].id(), dm_alice_bob.id());
589 }
590
591 #[tokio::test]
594 async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
595 use crate::message::make_dm;
596 let chat = NamedTempFile::new().unwrap();
597 let cursor_dir = TempDir::new().unwrap();
598 let cursor = cursor_dir.path().join("cursor");
599
600 let dm = make_dm("r", "alice", "bob", "secret for bob");
602 let msg = make_message("r", "alice", "public hello");
603 crate::history::append(chat.path(), &dm).await.unwrap();
604 crate::history::append(chat.path(), &msg).await.unwrap();
605
606 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
608 .await
609 .unwrap();
610
611 let username = "bob";
612 let foreign: Vec<&Message> = messages
613 .iter()
614 .filter(|m| match m {
615 Message::Message { user, .. } | Message::System { user, .. } => user != username,
616 Message::DirectMessage { to, .. } => to == username,
617 _ => false,
618 })
619 .collect();
620
621 assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
623 assert!(
624 foreign
625 .iter()
626 .any(|m| matches!(m, Message::DirectMessage { .. })),
627 "DM must not be silently consumed"
628 );
629 }
630
631 #[tokio::test]
633 async fn poll_messages_dm_from_viewer_excluded_from_watch() {
634 use crate::message::make_dm;
635 let chat = NamedTempFile::new().unwrap();
636 let cursor_dir = TempDir::new().unwrap();
637 let cursor = cursor_dir.path().join("cursor");
638
639 let dm = make_dm("r", "bob", "alice", "from bob");
641 crate::history::append(chat.path(), &dm).await.unwrap();
642
643 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
644 .await
645 .unwrap();
646
647 let username = "bob";
648 let foreign: Vec<&Message> = messages
649 .iter()
650 .filter(|m| match m {
651 Message::Message { user, .. } | Message::System { user, .. } => user != username,
652 Message::DirectMessage { to, .. } => to == username,
653 _ => false,
654 })
655 .collect();
656
657 assert!(
658 foreign.is_empty(),
659 "DMs sent by the watcher should not wake watch"
660 );
661 }
662
663 #[tokio::test]
665 async fn watch_filter_wakes_on_foreign_system_message() {
666 use room_protocol::make_system;
667 let chat = NamedTempFile::new().unwrap();
668 let cursor_dir = TempDir::new().unwrap();
669 let cursor = cursor_dir.path().join("cursor");
670
671 let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
672 crate::history::append(chat.path(), &sys).await.unwrap();
673
674 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
675 .await
676 .unwrap();
677
678 let username = "bob";
679 let foreign: Vec<&Message> = messages
680 .iter()
681 .filter(|m| match m {
682 Message::Message { user, .. } | Message::System { user, .. } => user != username,
683 Message::DirectMessage { to, .. } => to == username,
684 _ => false,
685 })
686 .collect();
687
688 assert_eq!(
689 foreign.len(),
690 1,
691 "system messages from other users should wake watch"
692 );
693 assert!(matches!(foreign[0], Message::System { .. }));
694 }
695
696 #[tokio::test]
698 async fn watch_filter_ignores_own_system_message() {
699 use room_protocol::make_system;
700 let chat = NamedTempFile::new().unwrap();
701 let cursor_dir = TempDir::new().unwrap();
702 let cursor = cursor_dir.path().join("cursor");
703
704 let sys = make_system("r", "bob", "bob subscribed (tier: full)");
705 crate::history::append(chat.path(), &sys).await.unwrap();
706
707 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
708 .await
709 .unwrap();
710
711 let username = "bob";
712 let foreign: Vec<&Message> = messages
713 .iter()
714 .filter(|m| match m {
715 Message::Message { user, .. } | Message::System { user, .. } => user != username,
716 Message::DirectMessage { to, .. } => to == username,
717 _ => false,
718 })
719 .collect();
720
721 assert!(
722 foreign.is_empty(),
723 "system messages from self should not wake watch"
724 );
725 }
726
727 #[tokio::test]
729 async fn watch_filter_mixed_message_types() {
730 use crate::message::make_dm;
731 use room_protocol::make_system;
732 let chat = NamedTempFile::new().unwrap();
733 let cursor_dir = TempDir::new().unwrap();
734 let cursor = cursor_dir.path().join("cursor");
735
736 let msg = make_message("r", "alice", "hello");
738 let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
740 let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
742 let dm = make_dm("r", "alice", "bob", "private note");
744 let own_msg = make_message("r", "bob", "my own message");
746
747 for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
748 crate::history::append(chat.path(), m).await.unwrap();
749 }
750
751 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
752 .await
753 .unwrap();
754
755 let username = "bob";
756 let foreign: Vec<&Message> = messages
757 .iter()
758 .filter(|m| match m {
759 Message::Message { user, .. } | Message::System { user, .. } => user != username,
760 Message::DirectMessage { to, .. } => to == username,
761 _ => false,
762 })
763 .collect();
764
765 assert_eq!(
766 foreign.len(),
767 3,
768 "should see: foreign message + foreign system + DM to self"
769 );
770 assert!(
771 foreign.iter().any(|m| matches!(m, Message::System { .. })),
772 "system message must appear in watch results"
773 );
774 assert!(
775 foreign.iter().any(|m| matches!(m, Message::Message { .. })),
776 "regular foreign message must appear"
777 );
778 assert!(
779 foreign
780 .iter()
781 .any(|m| matches!(m, Message::DirectMessage { .. })),
782 "DM to self must appear"
783 );
784 }
785
786 #[tokio::test]
788 async fn poll_messages_host_sees_all_dms() {
789 use crate::message::make_dm;
790 let chat = NamedTempFile::new().unwrap();
791 let cursor_dir = TempDir::new().unwrap();
792 let cursor = cursor_dir.path().join("cursor");
793
794 let dm_alice_bob = make_dm("r", "alice", "bob", "private");
795 let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
796 crate::history::append(chat.path(), &dm_alice_bob)
797 .await
798 .unwrap();
799 crate::history::append(chat.path(), &dm_carol_dave)
800 .await
801 .unwrap();
802
803 let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
805 .await
806 .unwrap();
807 assert_eq!(result.len(), 2, "host should see all DMs");
808 }
809
810 #[tokio::test]
812 async fn poll_messages_non_host_cannot_see_unrelated_dms() {
813 use crate::message::make_dm;
814 let chat = NamedTempFile::new().unwrap();
815 let cursor_dir = TempDir::new().unwrap();
816 let cursor = cursor_dir.path().join("cursor");
817
818 let dm = make_dm("r", "alice", "bob", "private");
819 crate::history::append(chat.path(), &dm).await.unwrap();
820
821 let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
823 .await
824 .unwrap();
825 assert!(result.is_empty(), "non-host third party should not see DM");
826 }
827
828 #[tokio::test]
830 async fn pull_messages_host_sees_all_dms() {
831 use crate::message::make_dm;
832 let chat = NamedTempFile::new().unwrap();
833
834 let dm = make_dm("r", "alice", "bob", "secret");
835 crate::history::append(chat.path(), &dm).await.unwrap();
836
837 let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
838 .await
839 .unwrap();
840 assert_eq!(result.len(), 1, "host should see the DM via pull");
841 }
842
843 #[tokio::test]
847 async fn poll_multi_merges_by_timestamp() {
848 let chat_a = NamedTempFile::new().unwrap();
849 let chat_b = NamedTempFile::new().unwrap();
850
851 let rid_a = format!("test-merge-a-{}", std::process::id());
852 let rid_b = format!("test-merge-b-{}", std::process::id());
853
854 let msg_a1 = make_message(&rid_a, "alice", "a1");
856 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
857 let msg_b1 = make_message(&rid_b, "bob", "b1");
858 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
859 let msg_a2 = make_message(&rid_a, "alice", "a2");
860
861 crate::history::append(chat_a.path(), &msg_a1)
862 .await
863 .unwrap();
864 crate::history::append(chat_b.path(), &msg_b1)
865 .await
866 .unwrap();
867 crate::history::append(chat_a.path(), &msg_a2)
868 .await
869 .unwrap();
870
871 let rooms: Vec<(&str, &Path)> = vec![
872 (rid_a.as_str(), chat_a.path()),
873 (rid_b.as_str(), chat_b.path()),
874 ];
875
876 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
877 assert_eq!(result.len(), 3);
878 assert!(result[0].ts() <= result[1].ts());
880 assert!(result[1].ts() <= result[2].ts());
881 assert_eq!(result[0].room(), &rid_a);
883
884 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
886 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
887 }
888
889 #[tokio::test]
891 async fn poll_multi_advances_per_room_cursors() {
892 let chat_a = NamedTempFile::new().unwrap();
893 let chat_b = NamedTempFile::new().unwrap();
894
895 let rid_a = format!("test-cursor-a-{}", std::process::id());
897 let rid_b = format!("test-cursor-b-{}", std::process::id());
898
899 let msg_a = make_message(&rid_a, "alice", "hello a");
900 let msg_b = make_message(&rid_b, "bob", "hello b");
901 crate::history::append(chat_a.path(), &msg_a).await.unwrap();
902 crate::history::append(chat_b.path(), &msg_b).await.unwrap();
903
904 let rooms: Vec<(&str, &Path)> = vec![
905 (rid_a.as_str(), chat_a.path()),
906 (rid_b.as_str(), chat_b.path()),
907 ];
908
909 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
911 assert_eq!(result.len(), 2);
912
913 let result2 = poll_messages_multi(&rooms, "viewer").await.unwrap();
915 assert!(
916 result2.is_empty(),
917 "second multi-poll should return nothing"
918 );
919
920 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
922 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
923 }
924
925 #[tokio::test]
927 async fn poll_multi_one_empty_room() {
928 let chat_a = NamedTempFile::new().unwrap();
929 let chat_b = NamedTempFile::new().unwrap();
930
931 let rid_a = format!("test-empty-a-{}", std::process::id());
932 let rid_b = format!("test-empty-b-{}", std::process::id());
933
934 let msg = make_message(&rid_a, "alice", "only here");
935 crate::history::append(chat_a.path(), &msg).await.unwrap();
936 let rooms: Vec<(&str, &Path)> = vec![
939 (rid_a.as_str(), chat_a.path()),
940 (rid_b.as_str(), chat_b.path()),
941 ];
942
943 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
944 assert_eq!(result.len(), 1);
945 assert_eq!(result[0].room(), &rid_a);
946
947 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
948 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
949 }
950
951 #[tokio::test]
953 async fn poll_multi_no_rooms() {
954 let rooms: Vec<(&str, &Path)> = vec![];
955 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
956 assert!(result.is_empty());
957 }
958
959 #[tokio::test]
961 async fn poll_multi_filters_dms_across_rooms() {
962 use crate::message::make_dm;
963 let chat_a = NamedTempFile::new().unwrap();
964 let chat_b = NamedTempFile::new().unwrap();
965
966 let rid_a = format!("test-dm-a-{}", std::process::id());
967 let rid_b = format!("test-dm-b-{}", std::process::id());
968
969 let dm_to_bob = make_dm(&rid_a, "alice", "bob", "secret for bob");
971 let dm_to_carol = make_dm(&rid_b, "alice", "carol", "secret for carol");
972 crate::history::append(chat_a.path(), &dm_to_bob)
973 .await
974 .unwrap();
975 crate::history::append(chat_b.path(), &dm_to_carol)
976 .await
977 .unwrap();
978
979 let rooms: Vec<(&str, &Path)> = vec![
980 (rid_a.as_str(), chat_a.path()),
981 (rid_b.as_str(), chat_b.path()),
982 ];
983
984 let result = poll_messages_multi(&rooms, "bob").await.unwrap();
986 assert_eq!(result.len(), 1);
987 assert_eq!(result[0].room(), &rid_a);
988
989 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "bob"));
990 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "bob"));
991 }
992
993 #[tokio::test]
997 async fn cmd_query_history_returns_all_newest_first() {
998 let chat = NamedTempFile::new().unwrap();
999 let cursor_dir = TempDir::new().unwrap();
1000 let token_dir = TempDir::new().unwrap();
1001
1002 let room_id = format!("test-cqh-{}", std::process::id());
1003 write_token_file(&token_dir, &room_id, "alice-cqh", "tok-cqh");
1004 write_meta_file(&room_id, chat.path());
1005
1006 for i in 0..3u32 {
1007 crate::history::append(
1008 chat.path(),
1009 &make_message(&room_id, "alice-cqh", format!("{i}")),
1010 )
1011 .await
1012 .unwrap();
1013 }
1014
1015 let filter = QueryFilter {
1016 rooms: vec![room_id.clone()],
1017 ascending: false,
1018 ..Default::default()
1019 };
1020 let opts = QueryOptions {
1021 new_only: false,
1022 wait: false,
1023 interval_secs: 5,
1024 mentions_only: false,
1025 since_uuid: None,
1026 };
1027
1028 let cursor_path = crate::paths::cursor_path(&room_id, "alice-cqh");
1030 let _ = std::fs::remove_file(&cursor_path);
1031
1032 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqh", filter, opts, &cursor_dir)
1034 .await
1035 .unwrap();
1036
1037 assert!(
1039 !cursor_path.exists(),
1040 "historical query must not write a cursor file"
1041 );
1042
1043 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1044 let _ = std::fs::remove_file(&global_token_path("alice-cqh"));
1045 }
1046
1047 #[tokio::test]
1049 async fn cmd_query_new_advances_cursor() {
1050 let chat = NamedTempFile::new().unwrap();
1051 let cursor_dir = TempDir::new().unwrap();
1052 let token_dir = TempDir::new().unwrap();
1053
1054 let room_id = format!("test-cqn-{}", std::process::id());
1055 write_token_file(&token_dir, &room_id, "alice-cqn", "tok-cqn");
1056 write_meta_file(&room_id, chat.path());
1057
1058 let msg = make_message(&room_id, "bob", "hello");
1059 crate::history::append(chat.path(), &msg).await.unwrap();
1060
1061 let filter = QueryFilter {
1062 rooms: vec![room_id.clone()],
1063 ascending: true,
1064 ..Default::default()
1065 };
1066 let opts = QueryOptions {
1067 new_only: true,
1068 wait: false,
1069 interval_secs: 5,
1070 mentions_only: false,
1071 since_uuid: None,
1072 };
1073
1074 let result = oneshot_cmd_query_to_vec(
1076 &[room_id.clone()],
1077 "tok-cqn",
1078 filter.clone(),
1079 opts.clone(),
1080 &cursor_dir,
1081 )
1082 .await
1083 .unwrap();
1084 assert_eq!(result.len(), 1);
1085
1086 let result2 =
1088 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
1089 .await
1090 .unwrap();
1091 assert!(
1092 result2.is_empty(),
1093 "second query should return nothing (cursor advanced)"
1094 );
1095
1096 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1097 let _ = std::fs::remove_file(&global_token_path("alice-cqn"));
1098 }
1099
1100 #[tokio::test]
1102 async fn cmd_query_content_search_filters() {
1103 let chat = NamedTempFile::new().unwrap();
1104 let cursor_dir = TempDir::new().unwrap();
1105 let token_dir = TempDir::new().unwrap();
1106
1107 let room_id = format!("test-cqs-{}", std::process::id());
1108 write_token_file(&token_dir, &room_id, "alice-cqs", "tok-cqs");
1109 write_meta_file(&room_id, chat.path());
1110
1111 crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
1112 .await
1113 .unwrap();
1114 crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
1115 .await
1116 .unwrap();
1117
1118 let filter = QueryFilter {
1119 rooms: vec![room_id.clone()],
1120 content_search: Some("hello".into()),
1121 ascending: true,
1122 ..Default::default()
1123 };
1124 let opts = QueryOptions {
1125 new_only: false,
1126 wait: false,
1127 interval_secs: 5,
1128 mentions_only: false,
1129 since_uuid: None,
1130 };
1131
1132 let result =
1133 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
1134 .await
1135 .unwrap();
1136 assert_eq!(result.len(), 1);
1137 assert!(result[0].content().unwrap().contains("hello"));
1138
1139 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1140 let _ = std::fs::remove_file(&global_token_path("alice-cqs"));
1141 }
1142
1143 #[tokio::test]
1145 async fn cmd_query_user_filter() {
1146 let chat = NamedTempFile::new().unwrap();
1147 let cursor_dir = TempDir::new().unwrap();
1148 let token_dir = TempDir::new().unwrap();
1149
1150 let room_id = format!("test-cqu-{}", std::process::id());
1151 write_token_file(&token_dir, &room_id, "alice-cqu", "tok-cqu");
1152 write_meta_file(&room_id, chat.path());
1153
1154 crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
1155 .await
1156 .unwrap();
1157 crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
1158 .await
1159 .unwrap();
1160
1161 let filter = QueryFilter {
1162 rooms: vec![room_id.clone()],
1163 users: vec!["bob".into()],
1164 ascending: true,
1165 ..Default::default()
1166 };
1167 let opts = QueryOptions {
1168 new_only: false,
1169 wait: false,
1170 interval_secs: 5,
1171 mentions_only: false,
1172 since_uuid: None,
1173 };
1174
1175 let result =
1176 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
1177 .await
1178 .unwrap();
1179 assert_eq!(result.len(), 1);
1180 assert_eq!(result[0].user(), "bob");
1181
1182 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1183 let _ = std::fs::remove_file(&global_token_path("alice-cqu"));
1184 }
1185
1186 #[tokio::test]
1188 async fn cmd_query_limit() {
1189 let chat = NamedTempFile::new().unwrap();
1190 let cursor_dir = TempDir::new().unwrap();
1191 let token_dir = TempDir::new().unwrap();
1192
1193 let room_id = format!("test-cql-{}", std::process::id());
1194 write_token_file(&token_dir, &room_id, "alice-cql", "tok-cql");
1195 write_meta_file(&room_id, chat.path());
1196
1197 for i in 0..5u32 {
1198 crate::history::append(
1199 chat.path(),
1200 &make_message(&room_id, "bob", format!("msg {i}")),
1201 )
1202 .await
1203 .unwrap();
1204 }
1205
1206 let filter = QueryFilter {
1207 rooms: vec![room_id.clone()],
1208 limit: Some(2),
1209 ascending: false,
1210 ..Default::default()
1211 };
1212 let opts = QueryOptions {
1213 new_only: false,
1214 wait: false,
1215 interval_secs: 5,
1216 mentions_only: false,
1217 since_uuid: None,
1218 };
1219
1220 let result =
1221 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
1222 .await
1223 .unwrap();
1224 assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
1225
1226 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1227 let _ = std::fs::remove_file(&global_token_path("alice-cql"));
1228 }
1229
1230 fn global_token_path(username: &str) -> PathBuf {
1233 crate::paths::global_token_path(username)
1234 }
1235
1236 fn write_token_file(_dir: &TempDir, _room_id: &str, username: &str, token: &str) {
1237 let path = global_token_path(username);
1238 if let Some(parent) = path.parent() {
1239 std::fs::create_dir_all(parent).unwrap();
1240 }
1241 let data = serde_json::json!({ "username": username, "token": token });
1242 std::fs::write(&path, format!("{data}\n")).unwrap();
1243 }
1244
1245 fn write_meta_file(room_id: &str, chat_path: &Path) {
1246 let meta_path = crate::paths::room_meta_path(room_id);
1247 if let Some(parent) = meta_path.parent() {
1248 std::fs::create_dir_all(parent).unwrap();
1249 }
1250 let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
1251 std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
1252 }
1253
1254 async fn oneshot_cmd_query_to_vec(
1260 room_ids: &[String],
1261 token: &str,
1262 filter: QueryFilter,
1263 opts: QueryOptions,
1264 _cursor_dir: &TempDir,
1265 ) -> anyhow::Result<Vec<Message>> {
1266 let cursor_before = room_ids
1268 .first()
1269 .map(|id| {
1270 super::super::token::username_from_token(token)
1272 .ok()
1273 .map(|u| {
1274 let p = crate::paths::cursor_path(id, &u);
1275 std::fs::read_to_string(&p).ok()
1276 })
1277 .flatten()
1278 })
1279 .flatten();
1280
1281 cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
1283
1284 let cursor_after = room_ids
1286 .first()
1287 .map(|id| {
1288 super::super::token::username_from_token(token)
1289 .ok()
1290 .map(|u| {
1291 let p = crate::paths::cursor_path(id, &u);
1292 std::fs::read_to_string(&p).ok()
1293 })
1294 .flatten()
1295 })
1296 .flatten();
1297
1298 if !opts.new_only && !opts.wait {
1302 let mut all: Vec<Message> = Vec::new();
1304 for room_id in room_ids {
1305 let meta_path = crate::paths::room_meta_path(room_id);
1306 let chat_path = chat_path_from_meta(room_id, &meta_path);
1307 let msgs = history::load(&chat_path).await?;
1308 all.extend(msgs);
1309 }
1310 let username = username_from_token(token).unwrap_or_default();
1311 let mut result: Vec<Message> = all
1312 .into_iter()
1313 .filter(|m| filter.matches(m, m.room()))
1314 .filter(|m| match m {
1315 Message::DirectMessage { user, to, .. } => user == &username || to == &username,
1316 _ => true,
1317 })
1318 .collect();
1319 apply_sort_and_limit(&mut result, &filter);
1320 Ok(result)
1321 } else {
1322 let advanced = cursor_after != cursor_before;
1325 if advanced {
1326 let room_id = &room_ids[0];
1327 let meta_path = crate::paths::room_meta_path(room_id);
1328 let chat_path = chat_path_from_meta(room_id, &meta_path);
1329 let all = history::load(&chat_path).await?;
1330 let start = match &cursor_before {
1332 Some(id) => all
1333 .iter()
1334 .position(|m| m.id() == id.trim())
1335 .map(|i| i + 1)
1336 .unwrap_or(0),
1337 None => 0,
1338 };
1339 let filtered: Vec<Message> = all[start..]
1340 .iter()
1341 .filter(|m| filter.matches(m, m.room()))
1342 .cloned()
1343 .collect();
1344 Ok(filtered)
1345 } else {
1346 Ok(vec![])
1347 }
1348 }
1349 }
1350
1351 #[test]
1353 fn unknown_token_returns_error() {
1354 let result = super::super::token::username_from_token("bad-token-nonexistent");
1355 assert!(result.is_err());
1356 assert!(result
1357 .unwrap_err()
1358 .to_string()
1359 .contains("token not recognised"));
1360 }
1361
1362 #[test]
1366 fn load_user_tier_missing_file_returns_full() {
1367 let tier = load_user_tier("nonexistent-room-tier-test", "alice");
1369 assert_eq!(tier, SubscriptionTier::Full);
1370 }
1371
1372 #[test]
1374 fn load_user_tier_returns_persisted_tier() {
1375 let state_dir = crate::paths::room_state_dir();
1376 let _ = std::fs::create_dir_all(&state_dir);
1377 let room_id = format!("test-tier-load-{}", std::process::id());
1378 let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1379
1380 let mut map = std::collections::HashMap::new();
1381 map.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1382 map.insert("bob".to_string(), SubscriptionTier::Unsubscribed);
1383 let json = serde_json::to_string_pretty(&map).unwrap();
1384 std::fs::write(&sub_path, json).unwrap();
1385
1386 assert_eq!(
1387 load_user_tier(&room_id, "alice"),
1388 SubscriptionTier::MentionsOnly
1389 );
1390 assert_eq!(
1391 load_user_tier(&room_id, "bob"),
1392 SubscriptionTier::Unsubscribed
1393 );
1394 assert_eq!(load_user_tier(&room_id, "carol"), SubscriptionTier::Full);
1396
1397 let _ = std::fs::remove_file(&sub_path);
1398 }
1399
1400 #[test]
1402 fn apply_tier_filter_full_keeps_all() {
1403 let mut msgs = vec![
1404 make_message("r", "alice", "hello"),
1405 make_message("r", "bob", "world"),
1406 ];
1407 apply_tier_filter(&mut msgs, SubscriptionTier::Full, "carol");
1408 assert_eq!(msgs.len(), 2);
1409 }
1410
1411 #[test]
1413 fn apply_tier_filter_mentions_only_filters() {
1414 let mut msgs = vec![
1415 make_message("r", "alice", "hey @carol check this"),
1416 make_message("r", "bob", "unrelated message"),
1417 make_message("r", "dave", "also @carol"),
1418 ];
1419 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1420 assert_eq!(msgs.len(), 2);
1421 assert!(msgs[0].content().unwrap().contains("@carol"));
1422 assert!(msgs[1].content().unwrap().contains("@carol"));
1423 }
1424
1425 #[test]
1427 fn apply_tier_filter_unsubscribed_clears_all() {
1428 let mut msgs = vec![
1429 make_message("r", "alice", "hey @carol"),
1430 make_message("r", "bob", "world"),
1431 ];
1432 apply_tier_filter(&mut msgs, SubscriptionTier::Unsubscribed, "carol");
1433 assert!(msgs.is_empty());
1434 }
1435
1436 #[test]
1438 fn apply_tier_filter_mentions_only_no_mentions_returns_empty() {
1439 let mut msgs = vec![
1440 make_message("r", "alice", "hello"),
1441 make_message("r", "bob", "world"),
1442 ];
1443 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1444 assert!(msgs.is_empty());
1445 }
1446
1447 #[tokio::test]
1449 async fn cmd_query_public_bypasses_tier() {
1450 let chat = NamedTempFile::new().unwrap();
1451 let token_dir = TempDir::new().unwrap();
1452 let cursor_dir = TempDir::new().unwrap();
1453
1454 let room_id = format!("test-pub-tier-{}", std::process::id());
1455 write_token_file(&token_dir, &room_id, "alice-pub", "tok-pub-tier");
1456 write_meta_file(&room_id, chat.path());
1457
1458 let state_dir = crate::paths::room_state_dir();
1460 let _ = std::fs::create_dir_all(&state_dir);
1461 let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1462 let mut map = std::collections::HashMap::new();
1463 map.insert("alice-pub".to_string(), SubscriptionTier::Unsubscribed);
1464 std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
1465
1466 crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
1468 .await
1469 .unwrap();
1470
1471 let filter = QueryFilter {
1473 rooms: vec![room_id.clone()],
1474 public_only: true,
1475 ascending: true,
1476 ..Default::default()
1477 };
1478 let opts = QueryOptions {
1479 new_only: false,
1480 wait: false,
1481 interval_secs: 5,
1482 mentions_only: false,
1483 since_uuid: None,
1484 };
1485
1486 let result = oneshot_cmd_query_to_vec(
1487 &[room_id.clone()],
1488 "tok-pub-tier",
1489 filter,
1490 opts,
1491 &cursor_dir,
1492 )
1493 .await
1494 .unwrap();
1495 assert_eq!(
1496 result.len(),
1497 1,
1498 "public flag should bypass Unsubscribed tier"
1499 );
1500
1501 let _ = std::fs::remove_file(&sub_path);
1502 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1503 let _ = std::fs::remove_file(&global_token_path("alice-pub"));
1504 }
1505
1506 #[test]
1508 fn mentions_only_tier_sets_mention_user_on_filter() {
1509 let mut filter = QueryFilter::default();
1512 let tier = SubscriptionTier::MentionsOnly;
1513
1514 match tier {
1516 SubscriptionTier::MentionsOnly => {
1517 if filter.mention_user.is_none() {
1518 filter.mention_user = Some("alice".to_string());
1519 }
1520 }
1521 _ => {}
1522 }
1523
1524 assert_eq!(filter.mention_user, Some("alice".to_string()));
1525
1526 let mut msgs = vec![
1528 make_message("r", "bob", "hey @alice look"),
1529 make_message("r", "bob", "unrelated chatter"),
1530 ];
1531 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "alice");
1532 assert_eq!(msgs.len(), 1);
1533 assert!(msgs[0].content().unwrap().contains("@alice"));
1534 }
1535
1536 #[test]
1538 fn mentions_only_tier_preserves_existing_mention_user() {
1539 let mut filter = QueryFilter {
1540 mention_user: Some("bob".to_string()),
1541 ..Default::default()
1542 };
1543
1544 match SubscriptionTier::MentionsOnly {
1546 SubscriptionTier::MentionsOnly => {
1547 if filter.mention_user.is_none() {
1548 filter.mention_user = Some("alice".to_string());
1549 }
1550 }
1551 _ => {}
1552 }
1553
1554 assert_eq!(
1555 filter.mention_user,
1556 Some("bob".to_string()),
1557 "existing mention_user filter should be preserved"
1558 );
1559 }
1560
1561 #[test]
1564 fn per_room_tier_filter_full_keeps_all() {
1565 let mut msgs = vec![
1566 make_message("dev", "alice", "hello from dev"),
1567 make_message("lobby", "bob", "hello from lobby"),
1568 ];
1569 let rooms = vec![
1571 "nonexistent-perroom-full-1".to_string(),
1572 "nonexistent-perroom-full-2".to_string(),
1573 ];
1574 apply_per_room_tier_filter(&mut msgs, &rooms, "carol");
1575 assert_eq!(msgs.len(), 2);
1576 }
1577
1578 #[test]
1579 fn per_room_tier_filter_mixed_tiers() {
1580 let state_dir = crate::paths::room_state_dir();
1581 let _ = std::fs::create_dir_all(&state_dir);
1582
1583 let room_full = format!("perroom-mixed-full-{}", std::process::id());
1584 let room_unsub = format!("perroom-mixed-unsub-{}", std::process::id());
1585 let room_mentions = format!("perroom-mixed-ment-{}", std::process::id());
1586
1587 let sub_unsub = crate::paths::broker_subscriptions_path(&state_dir, &room_unsub);
1589 let mut map_unsub = std::collections::HashMap::new();
1590 map_unsub.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1591 std::fs::write(&sub_unsub, serde_json::to_string(&map_unsub).unwrap()).unwrap();
1592
1593 let sub_ment = crate::paths::broker_subscriptions_path(&state_dir, &room_mentions);
1594 let mut map_ment = std::collections::HashMap::new();
1595 map_ment.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1596 std::fs::write(&sub_ment, serde_json::to_string(&map_ment).unwrap()).unwrap();
1597
1598 let mut msgs = vec![
1599 make_message(&room_full, "bob", "visible in full room"),
1600 make_message(&room_unsub, "bob", "invisible — unsubscribed"),
1601 make_message(&room_mentions, "bob", "no mention — filtered"),
1602 make_message(&room_mentions, "bob", "hey @alice check this"),
1603 ];
1604
1605 let rooms = vec![room_full.clone(), room_unsub.clone(), room_mentions.clone()];
1606 apply_per_room_tier_filter(&mut msgs, &rooms, "alice");
1607
1608 assert_eq!(msgs.len(), 2);
1610 assert!(msgs[0].content().unwrap().contains("visible in full room"));
1611 assert!(msgs[1].content().unwrap().contains("@alice"));
1612
1613 let _ = std::fs::remove_file(&sub_unsub);
1615 let _ = std::fs::remove_file(&sub_ment);
1616 }
1617
1618 #[test]
1619 fn per_room_tier_filter_unknown_room_defaults_to_full() {
1620 let mut msgs = vec![make_message("mystery", "bob", "hello")];
1621 apply_per_room_tier_filter(&mut msgs, &["other".to_string()], "alice");
1623 assert_eq!(msgs.len(), 1);
1624 }
1625
1626 #[tokio::test]
1628 async fn pull_messages_returns_tail_without_cursor_change() {
1629 let chat = NamedTempFile::new().unwrap();
1630 let cursor_dir = TempDir::new().unwrap();
1631 let cursor = cursor_dir.path().join("cursor");
1632
1633 for i in 0..5u32 {
1634 crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
1635 .await
1636 .unwrap();
1637 }
1638
1639 let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
1640 assert_eq!(pulled.len(), 3);
1641
1642 let polled = poll_messages(chat.path(), &cursor, None, None, None)
1644 .await
1645 .unwrap();
1646 assert_eq!(polled.len(), 5);
1647 }
1648}