1use std::path::{Path, PathBuf};
2
3use room_protocol::{EventFilter, SubscriptionTier};
4
5use crate::{
6 broker::commands::{load_event_filter_map, load_subscription_map},
7 history,
8 message::Message,
9 paths,
10 query::QueryFilter,
11};
12
13use super::token::{read_cursor, username_from_token, write_cursor};
14
15fn load_user_tier(room_id: &str, username: &str) -> SubscriptionTier {
23 let state_dir = paths::room_state_dir();
24 let sub_path = paths::broker_subscriptions_path(&state_dir, room_id);
25 let map = load_subscription_map(&sub_path);
26 map.get(username).copied().unwrap_or(SubscriptionTier::Full)
27}
28
29fn apply_tier_filter(messages: &mut Vec<Message>, tier: SubscriptionTier, username: &str) {
35 match tier {
36 SubscriptionTier::Full => {}
37 SubscriptionTier::MentionsOnly => {
38 messages.retain(|m| m.mentions().iter().any(|mention| mention == username));
39 }
40 SubscriptionTier::Unsubscribed => {
41 messages.clear();
42 }
43 }
44}
45
46fn apply_per_room_tier_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
52 use std::collections::HashMap;
53 let tiers: HashMap<&str, SubscriptionTier> = room_ids
54 .iter()
55 .map(|r| (r.as_str(), load_user_tier(r, username)))
56 .collect();
57
58 messages.retain(|m| {
59 let tier = tiers
60 .get(m.room())
61 .copied()
62 .unwrap_or(SubscriptionTier::Full);
63 match tier {
64 SubscriptionTier::Full => true,
65 SubscriptionTier::MentionsOnly => {
66 m.mentions().iter().any(|mention| mention == username)
67 }
68 SubscriptionTier::Unsubscribed => false,
69 }
70 });
71}
72
73fn load_user_event_filter(room_id: &str, username: &str) -> EventFilter {
81 let state_dir = paths::room_state_dir();
82 let ef_path = paths::broker_event_filters_path(&state_dir, room_id);
83 let map = load_event_filter_map(&ef_path);
84 map.get(username).cloned().unwrap_or(EventFilter::All)
85}
86
87fn apply_event_filter(messages: &mut Vec<Message>, filter: &EventFilter) {
93 if matches!(filter, EventFilter::All) {
94 return;
95 }
96 messages.retain(|m| match m {
97 Message::Event { event_type, .. } => filter.allows(event_type),
98 _ => true,
99 });
100}
101
102fn apply_per_room_event_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
106 use std::collections::HashMap;
107 let filters: HashMap<&str, EventFilter> = room_ids
108 .iter()
109 .map(|r| (r.as_str(), load_user_event_filter(r, username)))
110 .collect();
111
112 messages.retain(|m| match m {
113 Message::Event {
114 room, event_type, ..
115 } => {
116 let filter = filters.get(room.as_str()).unwrap_or(&EventFilter::All);
117 filter.allows(event_type)
118 }
119 _ => true,
120 });
121}
122
123#[derive(Debug, Clone)]
127pub struct QueryOptions {
128 pub new_only: bool,
131 pub wait: bool,
133 pub interval_secs: u64,
135 pub mentions_only: bool,
138 pub since_uuid: Option<String>,
141}
142
143pub async fn poll_messages(
158 chat_path: &Path,
159 cursor_path: &Path,
160 viewer: Option<&str>,
161 host: Option<&str>,
162 since: Option<&str>,
163) -> anyhow::Result<Vec<Message>> {
164 let effective_since: Option<String> = since
165 .map(|s| s.to_owned())
166 .or_else(|| read_cursor(cursor_path));
167
168 let messages = history::load(chat_path).await?;
169
170 let start = match &effective_since {
171 Some(id) => messages
172 .iter()
173 .position(|m| m.id() == id)
174 .map(|i| i + 1)
175 .unwrap_or(0),
176 None => 0,
177 };
178
179 let result: Vec<Message> = messages[start..]
180 .iter()
181 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
182 .cloned()
183 .collect();
184
185 if let Some(last) = result.last() {
186 write_cursor(cursor_path, last.id())?;
187 }
188
189 Ok(result)
190}
191
192pub async fn pull_messages(
200 chat_path: &Path,
201 n: usize,
202 viewer: Option<&str>,
203 host: Option<&str>,
204) -> anyhow::Result<Vec<Message>> {
205 let clamped = n.min(200);
206 let all = history::tail(chat_path, clamped).await?;
207 let visible: Vec<Message> = all
208 .into_iter()
209 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
210 .collect();
211 Ok(visible)
212}
213
214pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
219 let username = username_from_token(token)?;
220 let meta_path = paths::room_meta_path(room_id);
221 let chat_path = chat_path_from_meta(room_id, &meta_path);
222
223 let host = read_host_from_meta(&meta_path);
224 let mut messages = pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
225 let tier = load_user_tier(room_id, &username);
226 apply_tier_filter(&mut messages, tier, &username);
227 let ef = load_user_event_filter(room_id, &username);
228 apply_event_filter(&mut messages, &ef);
229 for msg in &messages {
230 println!("{}", serde_json::to_string(msg)?);
231 }
232 Ok(())
233}
234
235pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
243 let username = username_from_token(token)?;
244 let meta_path = paths::room_meta_path(room_id);
245 let chat_path = chat_path_from_meta(room_id, &meta_path);
246 let cursor_path = paths::cursor_path(room_id, &username);
247 let host = read_host_from_meta(&meta_path);
248
249 let ef = load_user_event_filter(room_id, &username);
250
251 loop {
252 let mut messages = poll_messages(
253 &chat_path,
254 &cursor_path,
255 Some(&username),
256 host.as_deref(),
257 None,
258 )
259 .await?;
260
261 apply_event_filter(&mut messages, &ef);
262
263 let foreign: Vec<&Message> = messages
264 .iter()
265 .filter(|m| match m {
266 Message::Message { user, .. } | Message::System { user, .. } => user != &username,
267 Message::DirectMessage { to, .. } => to == &username,
268 Message::Event { user, .. } => user != &username,
269 _ => false,
270 })
271 .collect();
272
273 if !foreign.is_empty() {
274 for msg in foreign {
275 println!("{}", serde_json::to_string(msg)?);
276 }
277 return Ok(());
278 }
279
280 tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
281 }
282}
283
284pub async fn cmd_poll(
290 room_id: &str,
291 token: &str,
292 since: Option<String>,
293 mentions_only: bool,
294) -> anyhow::Result<()> {
295 let username = username_from_token(token)?;
296 let meta_path = paths::room_meta_path(room_id);
297 let chat_path = chat_path_from_meta(room_id, &meta_path);
298 let cursor_path = paths::cursor_path(room_id, &username);
299 let host = read_host_from_meta(&meta_path);
300
301 let mut messages = poll_messages(
302 &chat_path,
303 &cursor_path,
304 Some(&username),
305 host.as_deref(),
306 since.as_deref(),
307 )
308 .await?;
309
310 let ef = load_user_event_filter(room_id, &username);
311 apply_event_filter(&mut messages, &ef);
312
313 for msg in &messages {
314 if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
315 continue;
316 }
317 println!("{}", serde_json::to_string(msg)?);
318 }
319 Ok(())
320}
321
322pub async fn poll_messages_multi(
328 rooms: &[(&str, &Path)],
329 username: &str,
330) -> anyhow::Result<Vec<Message>> {
331 let mut all_messages: Vec<Message> = Vec::new();
332
333 for &(room_id, chat_path) in rooms {
334 let cursor_path = paths::cursor_path(room_id, username);
335 let meta_path = paths::room_meta_path(room_id);
336 let host = read_host_from_meta(&meta_path);
337 let msgs = poll_messages(
338 chat_path,
339 &cursor_path,
340 Some(username),
341 host.as_deref(),
342 None,
343 )
344 .await?;
345 all_messages.extend(msgs);
346 }
347
348 all_messages.sort_by(|a, b| a.ts().cmp(b.ts()));
349 Ok(all_messages)
350}
351
352pub async fn cmd_poll_multi(
357 room_ids: &[String],
358 token: &str,
359 mentions_only: bool,
360) -> anyhow::Result<()> {
361 let username = username_from_token(token)?;
363
364 let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
366 for room_id in room_ids {
367 let meta_path = paths::room_meta_path(room_id);
368 let chat_path = chat_path_from_meta(room_id, &meta_path);
369 rooms.push((room_id.as_str(), chat_path));
370 }
371
372 let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
373 let mut messages = poll_messages_multi(&room_refs, &username).await?;
374
375 let room_id_strings: Vec<String> = room_ids.iter().map(|s| s.to_string()).collect();
376 apply_per_room_event_filter(&mut messages, &room_id_strings, &username);
377
378 for msg in &messages {
379 if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
380 continue;
381 }
382 println!("{}", serde_json::to_string(msg)?);
383 }
384 Ok(())
385}
386
387pub async fn cmd_query(
402 room_ids: &[String],
403 token: &str,
404 mut filter: QueryFilter,
405 opts: QueryOptions,
406) -> anyhow::Result<()> {
407 if room_ids.is_empty() {
408 anyhow::bail!("at least one room ID is required");
409 }
410
411 let username = username_from_token(token)?;
412
413 if opts.mentions_only {
415 filter.mention_user = Some(username.clone());
416 }
417
418 if opts.wait || opts.new_only {
419 cmd_query_new(room_ids, &username, filter, opts).await
420 } else {
421 cmd_query_history(room_ids, &username, filter).await
422 }
423}
424
425async fn cmd_query_new(
427 room_ids: &[String],
428 username: &str,
429 filter: QueryFilter,
430 opts: QueryOptions,
431) -> anyhow::Result<()> {
432 loop {
433 let messages: Vec<Message> = if room_ids.len() == 1 {
434 let room_id = &room_ids[0];
435 let meta_path = paths::room_meta_path(room_id);
436 let chat_path = chat_path_from_meta(room_id, &meta_path);
437 let cursor_path = paths::cursor_path(room_id, username);
438 let host = read_host_from_meta(&meta_path);
439 poll_messages(
440 &chat_path,
441 &cursor_path,
442 Some(username),
443 host.as_deref(),
444 opts.since_uuid.as_deref(),
445 )
446 .await?
447 } else {
448 let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
449 for room_id in room_ids {
450 let meta_path = paths::room_meta_path(room_id);
451 let chat_path = chat_path_from_meta(room_id, &meta_path);
452 rooms_info.push((room_id.clone(), chat_path));
453 }
454 let room_refs: Vec<(&str, &Path)> = rooms_info
455 .iter()
456 .map(|(id, p)| (id.as_str(), p.as_path()))
457 .collect();
458 poll_messages_multi(&room_refs, username).await?
459 };
460
461 let mut filtered: Vec<Message> = messages
463 .into_iter()
464 .filter(|m| filter.matches(m, m.room()))
465 .collect();
466
467 if !filter.public_only {
468 apply_per_room_tier_filter(&mut filtered, room_ids, username);
469 apply_per_room_event_filter(&mut filtered, room_ids, username);
470 }
471
472 apply_sort_and_limit(&mut filtered, &filter);
473
474 if opts.wait {
475 let foreign: Vec<&Message> = filtered
477 .iter()
478 .filter(|m| match m {
479 Message::Message { user, .. } | Message::System { user, .. } => {
480 user != username
481 }
482 Message::DirectMessage { to, .. } => to == username,
483 _ => false,
484 })
485 .collect();
486
487 if !foreign.is_empty() {
488 for msg in foreign {
489 println!("{}", serde_json::to_string(msg)?);
490 }
491 return Ok(());
492 }
493 } else {
494 for msg in &filtered {
495 println!("{}", serde_json::to_string(msg)?);
496 }
497 return Ok(());
498 }
499
500 tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
501 }
502}
503
504async fn cmd_query_history(
506 room_ids: &[String],
507 username: &str,
508 filter: QueryFilter,
509) -> anyhow::Result<()> {
510 let mut all_messages: Vec<Message> = Vec::new();
511
512 for room_id in room_ids {
513 let meta_path = paths::room_meta_path(room_id);
514 let chat_path = chat_path_from_meta(room_id, &meta_path);
515 let messages = history::load(&chat_path).await?;
516 all_messages.extend(messages);
517 }
518
519 let mut filtered: Vec<Message> = all_messages
521 .into_iter()
522 .filter(|m| filter.matches(m, m.room()))
523 .filter(|m| match m {
524 Message::DirectMessage { user, to, .. } => user == username || to == username,
525 _ => true,
526 })
527 .collect();
528
529 if !filter.public_only {
530 apply_per_room_tier_filter(&mut filtered, room_ids, username);
531 apply_per_room_event_filter(&mut filtered, room_ids, username);
532 }
533
534 apply_sort_and_limit(&mut filtered, &filter);
535
536 if filtered.is_empty() {
538 if let Some((ref target_room, target_seq)) = filter.target_id {
539 use room_protocol::format_message_id;
540 anyhow::bail!(
541 "message not found: {}",
542 format_message_id(target_room, target_seq)
543 );
544 }
545 }
546
547 for msg in &filtered {
548 println!("{}", serde_json::to_string(msg)?);
549 }
550 Ok(())
551}
552
553fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
555 if filter.ascending {
556 messages.sort_by(|a, b| a.ts().cmp(b.ts()));
557 } else {
558 messages.sort_by(|a, b| b.ts().cmp(a.ts()));
559 }
560 if let Some(limit) = filter.limit {
561 messages.truncate(limit);
562 }
563}
564
565pub(super) fn read_host_from_meta(meta_path: &Path) -> Option<String> {
570 if !meta_path.exists() {
571 return None;
572 }
573 let data = std::fs::read_to_string(meta_path).ok()?;
574 let v: serde_json::Value = serde_json::from_str(&data).ok()?;
575 v["host"].as_str().map(str::to_owned)
576}
577
578pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
579 if meta_path.exists() {
580 if let Ok(data) = std::fs::read_to_string(meta_path) {
581 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
582 if let Some(p) = v["chat_path"].as_str() {
583 return PathBuf::from(p);
584 }
585 }
586 }
587 }
588 history::default_chat_path(room_id)
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594 use crate::message::make_message;
595 use tempfile::{NamedTempFile, TempDir};
596
597 #[tokio::test]
599 async fn poll_messages_no_cursor_returns_all() {
600 let chat = NamedTempFile::new().unwrap();
601 let cursor_dir = TempDir::new().unwrap();
602 let cursor = cursor_dir.path().join("cursor");
603
604 let msg = make_message("r", "alice", "hello");
605 crate::history::append(chat.path(), &msg).await.unwrap();
606
607 let result = poll_messages(chat.path(), &cursor, None, None, None)
608 .await
609 .unwrap();
610 assert_eq!(result.len(), 1);
611 assert_eq!(result[0].id(), msg.id());
612 }
613
614 #[tokio::test]
616 async fn poll_messages_advances_cursor() {
617 let chat = NamedTempFile::new().unwrap();
618 let cursor_dir = TempDir::new().unwrap();
619 let cursor = cursor_dir.path().join("cursor");
620
621 let msg = make_message("r", "alice", "hello");
622 crate::history::append(chat.path(), &msg).await.unwrap();
623
624 poll_messages(chat.path(), &cursor, None, None, None)
625 .await
626 .unwrap();
627
628 let second = poll_messages(chat.path(), &cursor, None, None, None)
629 .await
630 .unwrap();
631 assert!(
632 second.is_empty(),
633 "cursor should have advanced past the first message"
634 );
635 }
636
637 #[tokio::test]
639 async fn poll_messages_filters_dms_by_viewer() {
640 use crate::message::make_dm;
641 let chat = NamedTempFile::new().unwrap();
642 let cursor_dir = TempDir::new().unwrap();
643 let cursor = cursor_dir.path().join("cursor");
644
645 let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
646 let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
647 crate::history::append(chat.path(), &dm_alice_bob)
648 .await
649 .unwrap();
650 crate::history::append(chat.path(), &dm_alice_carol)
651 .await
652 .unwrap();
653
654 let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
656 .await
657 .unwrap();
658 assert_eq!(result.len(), 1);
659 assert_eq!(result[0].id(), dm_alice_bob.id());
660 }
661
662 #[tokio::test]
665 async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
666 use crate::message::make_dm;
667 let chat = NamedTempFile::new().unwrap();
668 let cursor_dir = TempDir::new().unwrap();
669 let cursor = cursor_dir.path().join("cursor");
670
671 let dm = make_dm("r", "alice", "bob", "secret for bob");
673 let msg = make_message("r", "alice", "public hello");
674 crate::history::append(chat.path(), &dm).await.unwrap();
675 crate::history::append(chat.path(), &msg).await.unwrap();
676
677 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
679 .await
680 .unwrap();
681
682 let username = "bob";
683 let foreign: Vec<&Message> = messages
684 .iter()
685 .filter(|m| match m {
686 Message::Message { user, .. } | Message::System { user, .. } => user != username,
687 Message::DirectMessage { to, .. } => to == username,
688 _ => false,
689 })
690 .collect();
691
692 assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
694 assert!(
695 foreign
696 .iter()
697 .any(|m| matches!(m, Message::DirectMessage { .. })),
698 "DM must not be silently consumed"
699 );
700 }
701
702 #[tokio::test]
704 async fn poll_messages_dm_from_viewer_excluded_from_watch() {
705 use crate::message::make_dm;
706 let chat = NamedTempFile::new().unwrap();
707 let cursor_dir = TempDir::new().unwrap();
708 let cursor = cursor_dir.path().join("cursor");
709
710 let dm = make_dm("r", "bob", "alice", "from bob");
712 crate::history::append(chat.path(), &dm).await.unwrap();
713
714 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
715 .await
716 .unwrap();
717
718 let username = "bob";
719 let foreign: Vec<&Message> = messages
720 .iter()
721 .filter(|m| match m {
722 Message::Message { user, .. } | Message::System { user, .. } => user != username,
723 Message::DirectMessage { to, .. } => to == username,
724 _ => false,
725 })
726 .collect();
727
728 assert!(
729 foreign.is_empty(),
730 "DMs sent by the watcher should not wake watch"
731 );
732 }
733
734 #[tokio::test]
736 async fn watch_filter_wakes_on_foreign_system_message() {
737 use room_protocol::make_system;
738 let chat = NamedTempFile::new().unwrap();
739 let cursor_dir = TempDir::new().unwrap();
740 let cursor = cursor_dir.path().join("cursor");
741
742 let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
743 crate::history::append(chat.path(), &sys).await.unwrap();
744
745 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
746 .await
747 .unwrap();
748
749 let username = "bob";
750 let foreign: Vec<&Message> = messages
751 .iter()
752 .filter(|m| match m {
753 Message::Message { user, .. } | Message::System { user, .. } => user != username,
754 Message::DirectMessage { to, .. } => to == username,
755 _ => false,
756 })
757 .collect();
758
759 assert_eq!(
760 foreign.len(),
761 1,
762 "system messages from other users should wake watch"
763 );
764 assert!(matches!(foreign[0], Message::System { .. }));
765 }
766
767 #[tokio::test]
769 async fn watch_filter_ignores_own_system_message() {
770 use room_protocol::make_system;
771 let chat = NamedTempFile::new().unwrap();
772 let cursor_dir = TempDir::new().unwrap();
773 let cursor = cursor_dir.path().join("cursor");
774
775 let sys = make_system("r", "bob", "bob subscribed (tier: full)");
776 crate::history::append(chat.path(), &sys).await.unwrap();
777
778 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
779 .await
780 .unwrap();
781
782 let username = "bob";
783 let foreign: Vec<&Message> = messages
784 .iter()
785 .filter(|m| match m {
786 Message::Message { user, .. } | Message::System { user, .. } => user != username,
787 Message::DirectMessage { to, .. } => to == username,
788 _ => false,
789 })
790 .collect();
791
792 assert!(
793 foreign.is_empty(),
794 "system messages from self should not wake watch"
795 );
796 }
797
798 #[tokio::test]
800 async fn watch_filter_mixed_message_types() {
801 use crate::message::make_dm;
802 use room_protocol::make_system;
803 let chat = NamedTempFile::new().unwrap();
804 let cursor_dir = TempDir::new().unwrap();
805 let cursor = cursor_dir.path().join("cursor");
806
807 let msg = make_message("r", "alice", "hello");
809 let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
811 let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
813 let dm = make_dm("r", "alice", "bob", "private note");
815 let own_msg = make_message("r", "bob", "my own message");
817
818 for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
819 crate::history::append(chat.path(), m).await.unwrap();
820 }
821
822 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
823 .await
824 .unwrap();
825
826 let username = "bob";
827 let foreign: Vec<&Message> = messages
828 .iter()
829 .filter(|m| match m {
830 Message::Message { user, .. } | Message::System { user, .. } => user != username,
831 Message::DirectMessage { to, .. } => to == username,
832 _ => false,
833 })
834 .collect();
835
836 assert_eq!(
837 foreign.len(),
838 3,
839 "should see: foreign message + foreign system + DM to self"
840 );
841 assert!(
842 foreign.iter().any(|m| matches!(m, Message::System { .. })),
843 "system message must appear in watch results"
844 );
845 assert!(
846 foreign.iter().any(|m| matches!(m, Message::Message { .. })),
847 "regular foreign message must appear"
848 );
849 assert!(
850 foreign
851 .iter()
852 .any(|m| matches!(m, Message::DirectMessage { .. })),
853 "DM to self must appear"
854 );
855 }
856
857 #[tokio::test]
859 async fn poll_messages_host_sees_all_dms() {
860 use crate::message::make_dm;
861 let chat = NamedTempFile::new().unwrap();
862 let cursor_dir = TempDir::new().unwrap();
863 let cursor = cursor_dir.path().join("cursor");
864
865 let dm_alice_bob = make_dm("r", "alice", "bob", "private");
866 let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
867 crate::history::append(chat.path(), &dm_alice_bob)
868 .await
869 .unwrap();
870 crate::history::append(chat.path(), &dm_carol_dave)
871 .await
872 .unwrap();
873
874 let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
876 .await
877 .unwrap();
878 assert_eq!(result.len(), 2, "host should see all DMs");
879 }
880
881 #[tokio::test]
883 async fn poll_messages_non_host_cannot_see_unrelated_dms() {
884 use crate::message::make_dm;
885 let chat = NamedTempFile::new().unwrap();
886 let cursor_dir = TempDir::new().unwrap();
887 let cursor = cursor_dir.path().join("cursor");
888
889 let dm = make_dm("r", "alice", "bob", "private");
890 crate::history::append(chat.path(), &dm).await.unwrap();
891
892 let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
894 .await
895 .unwrap();
896 assert!(result.is_empty(), "non-host third party should not see DM");
897 }
898
899 #[tokio::test]
901 async fn pull_messages_host_sees_all_dms() {
902 use crate::message::make_dm;
903 let chat = NamedTempFile::new().unwrap();
904
905 let dm = make_dm("r", "alice", "bob", "secret");
906 crate::history::append(chat.path(), &dm).await.unwrap();
907
908 let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
909 .await
910 .unwrap();
911 assert_eq!(result.len(), 1, "host should see the DM via pull");
912 }
913
914 #[tokio::test]
918 async fn poll_multi_merges_by_timestamp() {
919 let chat_a = NamedTempFile::new().unwrap();
920 let chat_b = NamedTempFile::new().unwrap();
921
922 let rid_a = format!("test-merge-a-{}", std::process::id());
923 let rid_b = format!("test-merge-b-{}", std::process::id());
924
925 let msg_a1 = make_message(&rid_a, "alice", "a1");
927 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
928 let msg_b1 = make_message(&rid_b, "bob", "b1");
929 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
930 let msg_a2 = make_message(&rid_a, "alice", "a2");
931
932 crate::history::append(chat_a.path(), &msg_a1)
933 .await
934 .unwrap();
935 crate::history::append(chat_b.path(), &msg_b1)
936 .await
937 .unwrap();
938 crate::history::append(chat_a.path(), &msg_a2)
939 .await
940 .unwrap();
941
942 let rooms: Vec<(&str, &Path)> = vec![
943 (rid_a.as_str(), chat_a.path()),
944 (rid_b.as_str(), chat_b.path()),
945 ];
946
947 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
948 assert_eq!(result.len(), 3);
949 assert!(result[0].ts() <= result[1].ts());
951 assert!(result[1].ts() <= result[2].ts());
952 assert_eq!(result[0].room(), &rid_a);
954
955 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
957 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
958 }
959
960 #[tokio::test]
962 async fn poll_multi_advances_per_room_cursors() {
963 let chat_a = NamedTempFile::new().unwrap();
964 let chat_b = NamedTempFile::new().unwrap();
965
966 let rid_a = format!("test-cursor-a-{}", std::process::id());
968 let rid_b = format!("test-cursor-b-{}", std::process::id());
969
970 let msg_a = make_message(&rid_a, "alice", "hello a");
971 let msg_b = make_message(&rid_b, "bob", "hello b");
972 crate::history::append(chat_a.path(), &msg_a).await.unwrap();
973 crate::history::append(chat_b.path(), &msg_b).await.unwrap();
974
975 let rooms: Vec<(&str, &Path)> = vec![
976 (rid_a.as_str(), chat_a.path()),
977 (rid_b.as_str(), chat_b.path()),
978 ];
979
980 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
982 assert_eq!(result.len(), 2);
983
984 let result2 = poll_messages_multi(&rooms, "viewer").await.unwrap();
986 assert!(
987 result2.is_empty(),
988 "second multi-poll should return nothing"
989 );
990
991 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
993 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
994 }
995
996 #[tokio::test]
998 async fn poll_multi_one_empty_room() {
999 let chat_a = NamedTempFile::new().unwrap();
1000 let chat_b = NamedTempFile::new().unwrap();
1001
1002 let rid_a = format!("test-empty-a-{}", std::process::id());
1003 let rid_b = format!("test-empty-b-{}", std::process::id());
1004
1005 let msg = make_message(&rid_a, "alice", "only here");
1006 crate::history::append(chat_a.path(), &msg).await.unwrap();
1007 let rooms: Vec<(&str, &Path)> = vec![
1010 (rid_a.as_str(), chat_a.path()),
1011 (rid_b.as_str(), chat_b.path()),
1012 ];
1013
1014 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
1015 assert_eq!(result.len(), 1);
1016 assert_eq!(result[0].room(), &rid_a);
1017
1018 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
1019 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
1020 }
1021
1022 #[tokio::test]
1024 async fn poll_multi_no_rooms() {
1025 let rooms: Vec<(&str, &Path)> = vec![];
1026 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
1027 assert!(result.is_empty());
1028 }
1029
1030 #[tokio::test]
1032 async fn poll_multi_filters_dms_across_rooms() {
1033 use crate::message::make_dm;
1034 let chat_a = NamedTempFile::new().unwrap();
1035 let chat_b = NamedTempFile::new().unwrap();
1036
1037 let rid_a = format!("test-dm-a-{}", std::process::id());
1038 let rid_b = format!("test-dm-b-{}", std::process::id());
1039
1040 let dm_to_bob = make_dm(&rid_a, "alice", "bob", "secret for bob");
1042 let dm_to_carol = make_dm(&rid_b, "alice", "carol", "secret for carol");
1043 crate::history::append(chat_a.path(), &dm_to_bob)
1044 .await
1045 .unwrap();
1046 crate::history::append(chat_b.path(), &dm_to_carol)
1047 .await
1048 .unwrap();
1049
1050 let rooms: Vec<(&str, &Path)> = vec![
1051 (rid_a.as_str(), chat_a.path()),
1052 (rid_b.as_str(), chat_b.path()),
1053 ];
1054
1055 let result = poll_messages_multi(&rooms, "bob").await.unwrap();
1057 assert_eq!(result.len(), 1);
1058 assert_eq!(result[0].room(), &rid_a);
1059
1060 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "bob"));
1061 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "bob"));
1062 }
1063
1064 #[tokio::test]
1068 async fn cmd_query_history_returns_all_newest_first() {
1069 let chat = NamedTempFile::new().unwrap();
1070 let cursor_dir = TempDir::new().unwrap();
1071 let token_dir = TempDir::new().unwrap();
1072
1073 let room_id = format!("test-cqh-{}", std::process::id());
1074 write_token_file(&token_dir, &room_id, "alice-cqh", "tok-cqh");
1075 write_meta_file(&room_id, chat.path());
1076
1077 for i in 0..3u32 {
1078 crate::history::append(
1079 chat.path(),
1080 &make_message(&room_id, "alice-cqh", format!("{i}")),
1081 )
1082 .await
1083 .unwrap();
1084 }
1085
1086 let filter = QueryFilter {
1087 rooms: vec![room_id.clone()],
1088 ascending: false,
1089 ..Default::default()
1090 };
1091 let opts = QueryOptions {
1092 new_only: false,
1093 wait: false,
1094 interval_secs: 5,
1095 mentions_only: false,
1096 since_uuid: None,
1097 };
1098
1099 let cursor_path = crate::paths::cursor_path(&room_id, "alice-cqh");
1101 let _ = std::fs::remove_file(&cursor_path);
1102
1103 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqh", filter, opts, &cursor_dir)
1105 .await
1106 .unwrap();
1107
1108 assert!(
1110 !cursor_path.exists(),
1111 "historical query must not write a cursor file"
1112 );
1113
1114 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1115 let _ = std::fs::remove_file(&global_token_path("alice-cqh"));
1116 }
1117
1118 #[tokio::test]
1120 async fn cmd_query_new_advances_cursor() {
1121 let chat = NamedTempFile::new().unwrap();
1122 let cursor_dir = TempDir::new().unwrap();
1123 let token_dir = TempDir::new().unwrap();
1124
1125 let room_id = format!("test-cqn-{}", std::process::id());
1126 write_token_file(&token_dir, &room_id, "alice-cqn", "tok-cqn");
1127 write_meta_file(&room_id, chat.path());
1128
1129 let msg = make_message(&room_id, "bob", "hello");
1130 crate::history::append(chat.path(), &msg).await.unwrap();
1131
1132 let filter = QueryFilter {
1133 rooms: vec![room_id.clone()],
1134 ascending: true,
1135 ..Default::default()
1136 };
1137 let opts = QueryOptions {
1138 new_only: true,
1139 wait: false,
1140 interval_secs: 5,
1141 mentions_only: false,
1142 since_uuid: None,
1143 };
1144
1145 let result = oneshot_cmd_query_to_vec(
1147 &[room_id.clone()],
1148 "tok-cqn",
1149 filter.clone(),
1150 opts.clone(),
1151 &cursor_dir,
1152 )
1153 .await
1154 .unwrap();
1155 assert_eq!(result.len(), 1);
1156
1157 let result2 =
1159 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
1160 .await
1161 .unwrap();
1162 assert!(
1163 result2.is_empty(),
1164 "second query should return nothing (cursor advanced)"
1165 );
1166
1167 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1168 let _ = std::fs::remove_file(&global_token_path("alice-cqn"));
1169 }
1170
1171 #[tokio::test]
1173 async fn cmd_query_content_search_filters() {
1174 let chat = NamedTempFile::new().unwrap();
1175 let cursor_dir = TempDir::new().unwrap();
1176 let token_dir = TempDir::new().unwrap();
1177
1178 let room_id = format!("test-cqs-{}", std::process::id());
1179 write_token_file(&token_dir, &room_id, "alice-cqs", "tok-cqs");
1180 write_meta_file(&room_id, chat.path());
1181
1182 crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
1183 .await
1184 .unwrap();
1185 crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
1186 .await
1187 .unwrap();
1188
1189 let filter = QueryFilter {
1190 rooms: vec![room_id.clone()],
1191 content_search: Some("hello".into()),
1192 ascending: true,
1193 ..Default::default()
1194 };
1195 let opts = QueryOptions {
1196 new_only: false,
1197 wait: false,
1198 interval_secs: 5,
1199 mentions_only: false,
1200 since_uuid: None,
1201 };
1202
1203 let result =
1204 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
1205 .await
1206 .unwrap();
1207 assert_eq!(result.len(), 1);
1208 assert!(result[0].content().unwrap().contains("hello"));
1209
1210 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1211 let _ = std::fs::remove_file(&global_token_path("alice-cqs"));
1212 }
1213
1214 #[tokio::test]
1216 async fn cmd_query_user_filter() {
1217 let chat = NamedTempFile::new().unwrap();
1218 let cursor_dir = TempDir::new().unwrap();
1219 let token_dir = TempDir::new().unwrap();
1220
1221 let room_id = format!("test-cqu-{}", std::process::id());
1222 write_token_file(&token_dir, &room_id, "alice-cqu", "tok-cqu");
1223 write_meta_file(&room_id, chat.path());
1224
1225 crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
1226 .await
1227 .unwrap();
1228 crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
1229 .await
1230 .unwrap();
1231
1232 let filter = QueryFilter {
1233 rooms: vec![room_id.clone()],
1234 users: vec!["bob".into()],
1235 ascending: true,
1236 ..Default::default()
1237 };
1238 let opts = QueryOptions {
1239 new_only: false,
1240 wait: false,
1241 interval_secs: 5,
1242 mentions_only: false,
1243 since_uuid: None,
1244 };
1245
1246 let result =
1247 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
1248 .await
1249 .unwrap();
1250 assert_eq!(result.len(), 1);
1251 assert_eq!(result[0].user(), "bob");
1252
1253 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1254 let _ = std::fs::remove_file(&global_token_path("alice-cqu"));
1255 }
1256
1257 #[tokio::test]
1259 async fn cmd_query_limit() {
1260 let chat = NamedTempFile::new().unwrap();
1261 let cursor_dir = TempDir::new().unwrap();
1262 let token_dir = TempDir::new().unwrap();
1263
1264 let room_id = format!("test-cql-{}", std::process::id());
1265 write_token_file(&token_dir, &room_id, "alice-cql", "tok-cql");
1266 write_meta_file(&room_id, chat.path());
1267
1268 for i in 0..5u32 {
1269 crate::history::append(
1270 chat.path(),
1271 &make_message(&room_id, "bob", format!("msg {i}")),
1272 )
1273 .await
1274 .unwrap();
1275 }
1276
1277 let filter = QueryFilter {
1278 rooms: vec![room_id.clone()],
1279 limit: Some(2),
1280 ascending: false,
1281 ..Default::default()
1282 };
1283 let opts = QueryOptions {
1284 new_only: false,
1285 wait: false,
1286 interval_secs: 5,
1287 mentions_only: false,
1288 since_uuid: None,
1289 };
1290
1291 let result =
1292 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
1293 .await
1294 .unwrap();
1295 assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
1296
1297 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1298 let _ = std::fs::remove_file(&global_token_path("alice-cql"));
1299 }
1300
1301 fn global_token_path(username: &str) -> PathBuf {
1304 crate::paths::global_token_path(username)
1305 }
1306
1307 fn write_token_file(_dir: &TempDir, _room_id: &str, username: &str, token: &str) {
1308 let path = global_token_path(username);
1309 if let Some(parent) = path.parent() {
1310 std::fs::create_dir_all(parent).unwrap();
1311 }
1312 let data = serde_json::json!({ "username": username, "token": token });
1313 std::fs::write(&path, format!("{data}\n")).unwrap();
1314 }
1315
1316 fn write_meta_file(room_id: &str, chat_path: &Path) {
1317 let meta_path = crate::paths::room_meta_path(room_id);
1318 if let Some(parent) = meta_path.parent() {
1319 std::fs::create_dir_all(parent).unwrap();
1320 }
1321 let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
1322 std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
1323 }
1324
1325 async fn oneshot_cmd_query_to_vec(
1331 room_ids: &[String],
1332 token: &str,
1333 filter: QueryFilter,
1334 opts: QueryOptions,
1335 _cursor_dir: &TempDir,
1336 ) -> anyhow::Result<Vec<Message>> {
1337 let cursor_before = room_ids
1339 .first()
1340 .map(|id| {
1341 super::super::token::username_from_token(token)
1343 .ok()
1344 .map(|u| {
1345 let p = crate::paths::cursor_path(id, &u);
1346 std::fs::read_to_string(&p).ok()
1347 })
1348 .flatten()
1349 })
1350 .flatten();
1351
1352 cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
1354
1355 let cursor_after = room_ids
1357 .first()
1358 .map(|id| {
1359 super::super::token::username_from_token(token)
1360 .ok()
1361 .map(|u| {
1362 let p = crate::paths::cursor_path(id, &u);
1363 std::fs::read_to_string(&p).ok()
1364 })
1365 .flatten()
1366 })
1367 .flatten();
1368
1369 if !opts.new_only && !opts.wait {
1373 let mut all: Vec<Message> = Vec::new();
1375 for room_id in room_ids {
1376 let meta_path = crate::paths::room_meta_path(room_id);
1377 let chat_path = chat_path_from_meta(room_id, &meta_path);
1378 let msgs = history::load(&chat_path).await?;
1379 all.extend(msgs);
1380 }
1381 let username = username_from_token(token).unwrap_or_default();
1382 let mut result: Vec<Message> = all
1383 .into_iter()
1384 .filter(|m| filter.matches(m, m.room()))
1385 .filter(|m| match m {
1386 Message::DirectMessage { user, to, .. } => user == &username || to == &username,
1387 _ => true,
1388 })
1389 .collect();
1390 apply_sort_and_limit(&mut result, &filter);
1391 Ok(result)
1392 } else {
1393 let advanced = cursor_after != cursor_before;
1396 if advanced {
1397 let room_id = &room_ids[0];
1398 let meta_path = crate::paths::room_meta_path(room_id);
1399 let chat_path = chat_path_from_meta(room_id, &meta_path);
1400 let all = history::load(&chat_path).await?;
1401 let start = match &cursor_before {
1403 Some(id) => all
1404 .iter()
1405 .position(|m| m.id() == id.trim())
1406 .map(|i| i + 1)
1407 .unwrap_or(0),
1408 None => 0,
1409 };
1410 let filtered: Vec<Message> = all[start..]
1411 .iter()
1412 .filter(|m| filter.matches(m, m.room()))
1413 .cloned()
1414 .collect();
1415 Ok(filtered)
1416 } else {
1417 Ok(vec![])
1418 }
1419 }
1420 }
1421
1422 #[test]
1424 fn unknown_token_returns_error() {
1425 let result = super::super::token::username_from_token("bad-token-nonexistent");
1426 assert!(result.is_err());
1427 assert!(result
1428 .unwrap_err()
1429 .to_string()
1430 .contains("token not recognised"));
1431 }
1432
1433 #[test]
1437 fn load_user_tier_missing_file_returns_full() {
1438 let tier = load_user_tier("nonexistent-room-tier-test", "alice");
1440 assert_eq!(tier, SubscriptionTier::Full);
1441 }
1442
1443 #[test]
1445 fn load_user_tier_returns_persisted_tier() {
1446 let state_dir = crate::paths::room_state_dir();
1447 let _ = std::fs::create_dir_all(&state_dir);
1448 let room_id = format!("test-tier-load-{}", std::process::id());
1449 let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1450
1451 let mut map = std::collections::HashMap::new();
1452 map.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1453 map.insert("bob".to_string(), SubscriptionTier::Unsubscribed);
1454 let json = serde_json::to_string_pretty(&map).unwrap();
1455 std::fs::write(&sub_path, json).unwrap();
1456
1457 assert_eq!(
1458 load_user_tier(&room_id, "alice"),
1459 SubscriptionTier::MentionsOnly
1460 );
1461 assert_eq!(
1462 load_user_tier(&room_id, "bob"),
1463 SubscriptionTier::Unsubscribed
1464 );
1465 assert_eq!(load_user_tier(&room_id, "carol"), SubscriptionTier::Full);
1467
1468 let _ = std::fs::remove_file(&sub_path);
1469 }
1470
1471 #[test]
1473 fn apply_tier_filter_full_keeps_all() {
1474 let mut msgs = vec![
1475 make_message("r", "alice", "hello"),
1476 make_message("r", "bob", "world"),
1477 ];
1478 apply_tier_filter(&mut msgs, SubscriptionTier::Full, "carol");
1479 assert_eq!(msgs.len(), 2);
1480 }
1481
1482 #[test]
1484 fn apply_tier_filter_mentions_only_filters() {
1485 let mut msgs = vec![
1486 make_message("r", "alice", "hey @carol check this"),
1487 make_message("r", "bob", "unrelated message"),
1488 make_message("r", "dave", "also @carol"),
1489 ];
1490 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1491 assert_eq!(msgs.len(), 2);
1492 assert!(msgs[0].content().unwrap().contains("@carol"));
1493 assert!(msgs[1].content().unwrap().contains("@carol"));
1494 }
1495
1496 #[test]
1498 fn apply_tier_filter_unsubscribed_clears_all() {
1499 let mut msgs = vec![
1500 make_message("r", "alice", "hey @carol"),
1501 make_message("r", "bob", "world"),
1502 ];
1503 apply_tier_filter(&mut msgs, SubscriptionTier::Unsubscribed, "carol");
1504 assert!(msgs.is_empty());
1505 }
1506
1507 #[test]
1509 fn apply_tier_filter_mentions_only_no_mentions_returns_empty() {
1510 let mut msgs = vec![
1511 make_message("r", "alice", "hello"),
1512 make_message("r", "bob", "world"),
1513 ];
1514 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1515 assert!(msgs.is_empty());
1516 }
1517
1518 #[tokio::test]
1520 async fn cmd_query_public_bypasses_tier() {
1521 let chat = NamedTempFile::new().unwrap();
1522 let token_dir = TempDir::new().unwrap();
1523 let cursor_dir = TempDir::new().unwrap();
1524
1525 let room_id = format!("test-pub-tier-{}", std::process::id());
1526 write_token_file(&token_dir, &room_id, "alice-pub", "tok-pub-tier");
1527 write_meta_file(&room_id, chat.path());
1528
1529 let state_dir = crate::paths::room_state_dir();
1531 let _ = std::fs::create_dir_all(&state_dir);
1532 let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1533 let mut map = std::collections::HashMap::new();
1534 map.insert("alice-pub".to_string(), SubscriptionTier::Unsubscribed);
1535 std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
1536
1537 crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
1539 .await
1540 .unwrap();
1541
1542 let filter = QueryFilter {
1544 rooms: vec![room_id.clone()],
1545 public_only: true,
1546 ascending: true,
1547 ..Default::default()
1548 };
1549 let opts = QueryOptions {
1550 new_only: false,
1551 wait: false,
1552 interval_secs: 5,
1553 mentions_only: false,
1554 since_uuid: None,
1555 };
1556
1557 let result = oneshot_cmd_query_to_vec(
1558 &[room_id.clone()],
1559 "tok-pub-tier",
1560 filter,
1561 opts,
1562 &cursor_dir,
1563 )
1564 .await
1565 .unwrap();
1566 assert_eq!(
1567 result.len(),
1568 1,
1569 "public flag should bypass Unsubscribed tier"
1570 );
1571
1572 let _ = std::fs::remove_file(&sub_path);
1573 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1574 let _ = std::fs::remove_file(&global_token_path("alice-pub"));
1575 }
1576
1577 #[test]
1579 fn mentions_only_tier_sets_mention_user_on_filter() {
1580 let mut filter = QueryFilter::default();
1583 let tier = SubscriptionTier::MentionsOnly;
1584
1585 match tier {
1587 SubscriptionTier::MentionsOnly => {
1588 if filter.mention_user.is_none() {
1589 filter.mention_user = Some("alice".to_string());
1590 }
1591 }
1592 _ => {}
1593 }
1594
1595 assert_eq!(filter.mention_user, Some("alice".to_string()));
1596
1597 let mut msgs = vec![
1599 make_message("r", "bob", "hey @alice look"),
1600 make_message("r", "bob", "unrelated chatter"),
1601 ];
1602 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "alice");
1603 assert_eq!(msgs.len(), 1);
1604 assert!(msgs[0].content().unwrap().contains("@alice"));
1605 }
1606
1607 #[test]
1609 fn mentions_only_tier_preserves_existing_mention_user() {
1610 let mut filter = QueryFilter {
1611 mention_user: Some("bob".to_string()),
1612 ..Default::default()
1613 };
1614
1615 match SubscriptionTier::MentionsOnly {
1617 SubscriptionTier::MentionsOnly => {
1618 if filter.mention_user.is_none() {
1619 filter.mention_user = Some("alice".to_string());
1620 }
1621 }
1622 _ => {}
1623 }
1624
1625 assert_eq!(
1626 filter.mention_user,
1627 Some("bob".to_string()),
1628 "existing mention_user filter should be preserved"
1629 );
1630 }
1631
1632 #[test]
1635 fn per_room_tier_filter_full_keeps_all() {
1636 let mut msgs = vec![
1637 make_message("dev", "alice", "hello from dev"),
1638 make_message("lobby", "bob", "hello from lobby"),
1639 ];
1640 let rooms = vec![
1642 "nonexistent-perroom-full-1".to_string(),
1643 "nonexistent-perroom-full-2".to_string(),
1644 ];
1645 apply_per_room_tier_filter(&mut msgs, &rooms, "carol");
1646 assert_eq!(msgs.len(), 2);
1647 }
1648
1649 #[test]
1650 fn per_room_tier_filter_mixed_tiers() {
1651 let state_dir = crate::paths::room_state_dir();
1652 let _ = std::fs::create_dir_all(&state_dir);
1653
1654 let room_full = format!("perroom-mixed-full-{}", std::process::id());
1655 let room_unsub = format!("perroom-mixed-unsub-{}", std::process::id());
1656 let room_mentions = format!("perroom-mixed-ment-{}", std::process::id());
1657
1658 let sub_unsub = crate::paths::broker_subscriptions_path(&state_dir, &room_unsub);
1660 let mut map_unsub = std::collections::HashMap::new();
1661 map_unsub.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1662 std::fs::write(&sub_unsub, serde_json::to_string(&map_unsub).unwrap()).unwrap();
1663
1664 let sub_ment = crate::paths::broker_subscriptions_path(&state_dir, &room_mentions);
1665 let mut map_ment = std::collections::HashMap::new();
1666 map_ment.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1667 std::fs::write(&sub_ment, serde_json::to_string(&map_ment).unwrap()).unwrap();
1668
1669 let mut msgs = vec![
1670 make_message(&room_full, "bob", "visible in full room"),
1671 make_message(&room_unsub, "bob", "invisible — unsubscribed"),
1672 make_message(&room_mentions, "bob", "no mention — filtered"),
1673 make_message(&room_mentions, "bob", "hey @alice check this"),
1674 ];
1675
1676 let rooms = vec![room_full.clone(), room_unsub.clone(), room_mentions.clone()];
1677 apply_per_room_tier_filter(&mut msgs, &rooms, "alice");
1678
1679 assert_eq!(msgs.len(), 2);
1681 assert!(msgs[0].content().unwrap().contains("visible in full room"));
1682 assert!(msgs[1].content().unwrap().contains("@alice"));
1683
1684 let _ = std::fs::remove_file(&sub_unsub);
1686 let _ = std::fs::remove_file(&sub_ment);
1687 }
1688
1689 #[test]
1690 fn per_room_tier_filter_unknown_room_defaults_to_full() {
1691 let mut msgs = vec![make_message("mystery", "bob", "hello")];
1692 apply_per_room_tier_filter(&mut msgs, &["other".to_string()], "alice");
1694 assert_eq!(msgs.len(), 1);
1695 }
1696
1697 fn make_event_msg(room: &str, event_type: room_protocol::EventType) -> Message {
1700 room_protocol::make_event(room, "bot", event_type, "event content", None)
1701 }
1702
1703 #[test]
1704 fn event_filter_all_keeps_everything() {
1705 let mut msgs = vec![
1706 make_message("r", "alice", "hello"),
1707 make_event_msg("r", room_protocol::EventType::TaskPosted),
1708 make_event_msg("r", room_protocol::EventType::TaskFinished),
1709 ];
1710 apply_event_filter(&mut msgs, &EventFilter::All);
1711 assert_eq!(msgs.len(), 3);
1712 }
1713
1714 #[test]
1715 fn event_filter_none_removes_only_events() {
1716 let mut msgs = vec![
1717 make_message("r", "alice", "hello"),
1718 make_event_msg("r", room_protocol::EventType::TaskPosted),
1719 make_event_msg("r", room_protocol::EventType::TaskFinished),
1720 ];
1721 apply_event_filter(&mut msgs, &EventFilter::None);
1722 assert_eq!(msgs.len(), 1);
1723 assert!(msgs[0].content().unwrap().contains("hello"));
1724 }
1725
1726 #[test]
1727 fn event_filter_only_keeps_matching_events() {
1728 let mut types = std::collections::BTreeSet::new();
1729 types.insert(room_protocol::EventType::TaskPosted);
1730 let filter = EventFilter::Only { types };
1731
1732 let mut msgs = vec![
1733 make_message("r", "alice", "hello"),
1734 make_event_msg("r", room_protocol::EventType::TaskPosted),
1735 make_event_msg("r", room_protocol::EventType::TaskFinished),
1736 ];
1737 apply_event_filter(&mut msgs, &filter);
1738 assert_eq!(msgs.len(), 2); }
1740
1741 #[test]
1742 fn event_filter_only_removes_non_matching_events() {
1743 let mut types = std::collections::BTreeSet::new();
1744 types.insert(room_protocol::EventType::TaskFinished);
1745 let filter = EventFilter::Only { types };
1746
1747 let mut msgs = vec![
1748 make_event_msg("r", room_protocol::EventType::TaskPosted),
1749 make_event_msg("r", room_protocol::EventType::TaskAssigned),
1750 make_event_msg("r", room_protocol::EventType::TaskFinished),
1751 ];
1752 apply_event_filter(&mut msgs, &filter);
1753 assert_eq!(msgs.len(), 1);
1754 }
1755
1756 #[test]
1757 fn event_filter_does_not_affect_non_event_messages() {
1758 let mut msgs = vec![
1759 make_message("r", "alice", "hello"),
1760 make_message("r", "bob", "world"),
1761 ];
1762 apply_event_filter(&mut msgs, &EventFilter::None);
1763 assert_eq!(msgs.len(), 2, "non-event messages should not be filtered");
1764 }
1765
1766 #[test]
1767 fn load_user_event_filter_missing_file_returns_all() {
1768 let ef = load_user_event_filter("nonexistent-room-ef-test", "alice");
1769 assert_eq!(ef, EventFilter::All);
1770 }
1771
1772 #[test]
1773 fn load_user_event_filter_returns_persisted() {
1774 let state_dir = crate::paths::room_state_dir();
1775 let _ = std::fs::create_dir_all(&state_dir);
1776 let room_id = format!("test-ef-load-{}", std::process::id());
1777 let ef_path = crate::paths::broker_event_filters_path(&state_dir, &room_id);
1778
1779 let mut map = std::collections::HashMap::new();
1780 map.insert("alice".to_string(), EventFilter::None);
1781 let mut types = std::collections::BTreeSet::new();
1782 types.insert(room_protocol::EventType::TaskPosted);
1783 map.insert("bob".to_string(), EventFilter::Only { types });
1784 let json = serde_json::to_string_pretty(&map).unwrap();
1785 std::fs::write(&ef_path, json).unwrap();
1786
1787 assert_eq!(load_user_event_filter(&room_id, "alice"), EventFilter::None);
1788 let bob_filter = load_user_event_filter(&room_id, "bob");
1789 match bob_filter {
1790 EventFilter::Only { types } => {
1791 assert!(types.contains(&room_protocol::EventType::TaskPosted));
1792 assert_eq!(types.len(), 1);
1793 }
1794 _ => panic!("expected Only filter for bob"),
1795 }
1796 assert_eq!(load_user_event_filter(&room_id, "carol"), EventFilter::All);
1798
1799 let _ = std::fs::remove_file(&ef_path);
1800 }
1801
1802 #[tokio::test]
1804 async fn pull_messages_returns_tail_without_cursor_change() {
1805 let chat = NamedTempFile::new().unwrap();
1806 let cursor_dir = TempDir::new().unwrap();
1807 let cursor = cursor_dir.path().join("cursor");
1808
1809 for i in 0..5u32 {
1810 crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
1811 .await
1812 .unwrap();
1813 }
1814
1815 let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
1816 assert_eq!(pulled.len(), 3);
1817
1818 let polled = poll_messages(chat.path(), &cursor, None, None, None)
1820 .await
1821 .unwrap();
1822 assert_eq!(polled.len(), 5);
1823 }
1824}