1use std::path::{Path, PathBuf};
2
3use room_protocol::SubscriptionTier;
4
5use crate::{
6 broker::commands::load_subscription_map, history, message::Message, paths, query::QueryFilter,
7};
8
9use super::token::{read_cursor, username_from_token, write_cursor};
10
11fn load_user_tier(room_id: &str, username: &str) -> SubscriptionTier {
19 let state_dir = paths::room_state_dir();
20 let sub_path = paths::broker_subscriptions_path(&state_dir, room_id);
21 let map = load_subscription_map(&sub_path);
22 map.get(username).copied().unwrap_or(SubscriptionTier::Full)
23}
24
25fn apply_tier_filter(messages: &mut Vec<Message>, tier: SubscriptionTier, username: &str) {
31 match tier {
32 SubscriptionTier::Full => {}
33 SubscriptionTier::MentionsOnly => {
34 messages.retain(|m| m.mentions().iter().any(|mention| mention == username));
35 }
36 SubscriptionTier::Unsubscribed => {
37 messages.clear();
38 }
39 }
40}
41
42fn apply_per_room_tier_filter(messages: &mut Vec<Message>, room_ids: &[String], username: &str) {
48 use std::collections::HashMap;
49 let tiers: HashMap<&str, SubscriptionTier> = room_ids
50 .iter()
51 .map(|r| (r.as_str(), load_user_tier(r, username)))
52 .collect();
53
54 messages.retain(|m| {
55 let tier = tiers
56 .get(m.room())
57 .copied()
58 .unwrap_or(SubscriptionTier::Full);
59 match tier {
60 SubscriptionTier::Full => true,
61 SubscriptionTier::MentionsOnly => {
62 m.mentions().iter().any(|mention| mention == username)
63 }
64 SubscriptionTier::Unsubscribed => false,
65 }
66 });
67}
68
69#[derive(Debug, Clone)]
73pub struct QueryOptions {
74 pub new_only: bool,
77 pub wait: bool,
79 pub interval_secs: u64,
81 pub mentions_only: bool,
84 pub since_uuid: Option<String>,
87}
88
89pub async fn poll_messages(
104 chat_path: &Path,
105 cursor_path: &Path,
106 viewer: Option<&str>,
107 host: Option<&str>,
108 since: Option<&str>,
109) -> anyhow::Result<Vec<Message>> {
110 let effective_since: Option<String> = since
111 .map(|s| s.to_owned())
112 .or_else(|| read_cursor(cursor_path));
113
114 let messages = history::load(chat_path).await?;
115
116 let start = match &effective_since {
117 Some(id) => messages
118 .iter()
119 .position(|m| m.id() == id)
120 .map(|i| i + 1)
121 .unwrap_or(0),
122 None => 0,
123 };
124
125 let result: Vec<Message> = messages[start..]
126 .iter()
127 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
128 .cloned()
129 .collect();
130
131 if let Some(last) = result.last() {
132 write_cursor(cursor_path, last.id())?;
133 }
134
135 Ok(result)
136}
137
138pub async fn pull_messages(
146 chat_path: &Path,
147 n: usize,
148 viewer: Option<&str>,
149 host: Option<&str>,
150) -> anyhow::Result<Vec<Message>> {
151 let clamped = n.min(200);
152 let all = history::tail(chat_path, clamped).await?;
153 let visible: Vec<Message> = all
154 .into_iter()
155 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
156 .collect();
157 Ok(visible)
158}
159
160pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
165 let username = username_from_token(room_id, token)?;
166 let meta_path = paths::room_meta_path(room_id);
167 let chat_path = chat_path_from_meta(room_id, &meta_path);
168
169 let host = read_host_from_meta(&meta_path);
170 let mut messages = pull_messages(&chat_path, n, Some(&username), host.as_deref()).await?;
171 let tier = load_user_tier(room_id, &username);
172 apply_tier_filter(&mut messages, tier, &username);
173 for msg in &messages {
174 println!("{}", serde_json::to_string(msg)?);
175 }
176 Ok(())
177}
178
179pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
187 let username = username_from_token(room_id, token)?;
188 let meta_path = paths::room_meta_path(room_id);
189 let chat_path = chat_path_from_meta(room_id, &meta_path);
190 let cursor_path = paths::cursor_path(room_id, &username);
191 let host = read_host_from_meta(&meta_path);
192
193 loop {
194 let messages = poll_messages(
195 &chat_path,
196 &cursor_path,
197 Some(&username),
198 host.as_deref(),
199 None,
200 )
201 .await?;
202
203 let foreign: Vec<&Message> = messages
204 .iter()
205 .filter(|m| match m {
206 Message::Message { user, .. } => user != &username,
207 Message::DirectMessage { to, .. } => to == &username,
208 _ => false,
209 })
210 .collect();
211
212 if !foreign.is_empty() {
213 for msg in foreign {
214 println!("{}", serde_json::to_string(msg)?);
215 }
216 return Ok(());
217 }
218
219 tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
220 }
221}
222
223pub async fn cmd_poll(
229 room_id: &str,
230 token: &str,
231 since: Option<String>,
232 mentions_only: bool,
233) -> anyhow::Result<()> {
234 let username = username_from_token(room_id, token)?;
235 let meta_path = paths::room_meta_path(room_id);
236 let chat_path = chat_path_from_meta(room_id, &meta_path);
237 let cursor_path = paths::cursor_path(room_id, &username);
238 let host = read_host_from_meta(&meta_path);
239
240 let messages = poll_messages(
241 &chat_path,
242 &cursor_path,
243 Some(&username),
244 host.as_deref(),
245 since.as_deref(),
246 )
247 .await?;
248 for msg in &messages {
249 if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
250 continue;
251 }
252 println!("{}", serde_json::to_string(msg)?);
253 }
254 Ok(())
255}
256
257pub async fn poll_messages_multi(
263 rooms: &[(&str, &Path)],
264 username: &str,
265) -> anyhow::Result<Vec<Message>> {
266 let mut all_messages: Vec<Message> = Vec::new();
267
268 for &(room_id, chat_path) in rooms {
269 let cursor_path = paths::cursor_path(room_id, username);
270 let meta_path = paths::room_meta_path(room_id);
271 let host = read_host_from_meta(&meta_path);
272 let msgs = poll_messages(
273 chat_path,
274 &cursor_path,
275 Some(username),
276 host.as_deref(),
277 None,
278 )
279 .await?;
280 all_messages.extend(msgs);
281 }
282
283 all_messages.sort_by(|a, b| a.ts().cmp(b.ts()));
284 Ok(all_messages)
285}
286
287pub async fn cmd_poll_multi(
292 room_ids: &[String],
293 token: &str,
294 mentions_only: bool,
295) -> anyhow::Result<()> {
296 let username = resolve_username_from_rooms(room_ids, token)?;
298
299 let mut rooms: Vec<(&str, PathBuf)> = Vec::new();
301 for room_id in room_ids {
302 let meta_path = paths::room_meta_path(room_id);
303 let chat_path = chat_path_from_meta(room_id, &meta_path);
304 rooms.push((room_id.as_str(), chat_path));
305 }
306
307 let room_refs: Vec<(&str, &Path)> = rooms.iter().map(|(id, p)| (*id, p.as_path())).collect();
308 let messages = poll_messages_multi(&room_refs, &username).await?;
309 for msg in &messages {
310 if mentions_only && !msg.mentions().iter().any(|m| m == &username) {
311 continue;
312 }
313 println!("{}", serde_json::to_string(msg)?);
314 }
315 Ok(())
316}
317
318pub async fn cmd_query(
333 room_ids: &[String],
334 token: &str,
335 mut filter: QueryFilter,
336 opts: QueryOptions,
337) -> anyhow::Result<()> {
338 if room_ids.is_empty() {
339 anyhow::bail!("at least one room ID is required");
340 }
341
342 let username = resolve_username_from_rooms(room_ids, token)?;
343
344 if opts.mentions_only {
346 filter.mention_user = Some(username.clone());
347 }
348
349 if opts.wait || opts.new_only {
350 cmd_query_new(room_ids, &username, filter, opts).await
351 } else {
352 cmd_query_history(room_ids, &username, filter).await
353 }
354}
355
356async fn cmd_query_new(
358 room_ids: &[String],
359 username: &str,
360 filter: QueryFilter,
361 opts: QueryOptions,
362) -> anyhow::Result<()> {
363 loop {
364 let messages: Vec<Message> = if room_ids.len() == 1 {
365 let room_id = &room_ids[0];
366 let meta_path = paths::room_meta_path(room_id);
367 let chat_path = chat_path_from_meta(room_id, &meta_path);
368 let cursor_path = paths::cursor_path(room_id, username);
369 let host = read_host_from_meta(&meta_path);
370 poll_messages(
371 &chat_path,
372 &cursor_path,
373 Some(username),
374 host.as_deref(),
375 opts.since_uuid.as_deref(),
376 )
377 .await?
378 } else {
379 let mut rooms_info: Vec<(String, PathBuf)> = Vec::new();
380 for room_id in room_ids {
381 let meta_path = paths::room_meta_path(room_id);
382 let chat_path = chat_path_from_meta(room_id, &meta_path);
383 rooms_info.push((room_id.clone(), chat_path));
384 }
385 let room_refs: Vec<(&str, &Path)> = rooms_info
386 .iter()
387 .map(|(id, p)| (id.as_str(), p.as_path()))
388 .collect();
389 poll_messages_multi(&room_refs, username).await?
390 };
391
392 let mut filtered: Vec<Message> = messages
394 .into_iter()
395 .filter(|m| filter.matches(m, m.room()))
396 .collect();
397
398 if !filter.public_only {
399 apply_per_room_tier_filter(&mut filtered, room_ids, username);
400 }
401
402 apply_sort_and_limit(&mut filtered, &filter);
403
404 if opts.wait {
405 let foreign: Vec<&Message> = filtered
407 .iter()
408 .filter(|m| match m {
409 Message::Message { user, .. } => user != username,
410 Message::DirectMessage { to, .. } => to == username,
411 _ => false,
412 })
413 .collect();
414
415 if !foreign.is_empty() {
416 for msg in foreign {
417 println!("{}", serde_json::to_string(msg)?);
418 }
419 return Ok(());
420 }
421 } else {
422 for msg in &filtered {
423 println!("{}", serde_json::to_string(msg)?);
424 }
425 return Ok(());
426 }
427
428 tokio::time::sleep(tokio::time::Duration::from_secs(opts.interval_secs)).await;
429 }
430}
431
432async fn cmd_query_history(
434 room_ids: &[String],
435 username: &str,
436 filter: QueryFilter,
437) -> anyhow::Result<()> {
438 let mut all_messages: Vec<Message> = Vec::new();
439
440 for room_id in room_ids {
441 let meta_path = paths::room_meta_path(room_id);
442 let chat_path = chat_path_from_meta(room_id, &meta_path);
443 let messages = history::load(&chat_path).await?;
444 all_messages.extend(messages);
445 }
446
447 let mut filtered: Vec<Message> = all_messages
449 .into_iter()
450 .filter(|m| filter.matches(m, m.room()))
451 .filter(|m| match m {
452 Message::DirectMessage { user, to, .. } => user == username || to == username,
453 _ => true,
454 })
455 .collect();
456
457 if !filter.public_only {
458 apply_per_room_tier_filter(&mut filtered, room_ids, username);
459 }
460
461 apply_sort_and_limit(&mut filtered, &filter);
462
463 if filtered.is_empty() {
465 if let Some((ref target_room, target_seq)) = filter.target_id {
466 use room_protocol::format_message_id;
467 anyhow::bail!(
468 "message not found: {}",
469 format_message_id(target_room, target_seq)
470 );
471 }
472 }
473
474 for msg in &filtered {
475 println!("{}", serde_json::to_string(msg)?);
476 }
477 Ok(())
478}
479
480fn apply_sort_and_limit(messages: &mut Vec<Message>, filter: &QueryFilter) {
482 if filter.ascending {
483 messages.sort_by(|a, b| a.ts().cmp(b.ts()));
484 } else {
485 messages.sort_by(|a, b| b.ts().cmp(a.ts()));
486 }
487 if let Some(limit) = filter.limit {
488 messages.truncate(limit);
489 }
490}
491
492fn resolve_username_from_rooms(room_ids: &[String], token: &str) -> anyhow::Result<String> {
496 for room_id in room_ids {
497 if let Ok(username) = username_from_token(room_id, token) {
498 return Ok(username);
499 }
500 }
501 anyhow::bail!("token not recognised in any of the specified rooms — run: room join <username>")
502}
503
504pub(super) fn read_host_from_meta(meta_path: &Path) -> Option<String> {
509 if !meta_path.exists() {
510 return None;
511 }
512 let data = std::fs::read_to_string(meta_path).ok()?;
513 let v: serde_json::Value = serde_json::from_str(&data).ok()?;
514 v["host"].as_str().map(str::to_owned)
515}
516
517pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
518 if meta_path.exists() {
519 if let Ok(data) = std::fs::read_to_string(meta_path) {
520 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
521 if let Some(p) = v["chat_path"].as_str() {
522 return PathBuf::from(p);
523 }
524 }
525 }
526 }
527 history::default_chat_path(room_id)
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use crate::message::make_message;
534 use tempfile::{NamedTempFile, TempDir};
535
536 #[tokio::test]
538 async fn poll_messages_no_cursor_returns_all() {
539 let chat = NamedTempFile::new().unwrap();
540 let cursor_dir = TempDir::new().unwrap();
541 let cursor = cursor_dir.path().join("cursor");
542
543 let msg = make_message("r", "alice", "hello");
544 crate::history::append(chat.path(), &msg).await.unwrap();
545
546 let result = poll_messages(chat.path(), &cursor, None, None, None)
547 .await
548 .unwrap();
549 assert_eq!(result.len(), 1);
550 assert_eq!(result[0].id(), msg.id());
551 }
552
553 #[tokio::test]
555 async fn poll_messages_advances_cursor() {
556 let chat = NamedTempFile::new().unwrap();
557 let cursor_dir = TempDir::new().unwrap();
558 let cursor = cursor_dir.path().join("cursor");
559
560 let msg = make_message("r", "alice", "hello");
561 crate::history::append(chat.path(), &msg).await.unwrap();
562
563 poll_messages(chat.path(), &cursor, None, None, None)
564 .await
565 .unwrap();
566
567 let second = poll_messages(chat.path(), &cursor, None, None, None)
568 .await
569 .unwrap();
570 assert!(
571 second.is_empty(),
572 "cursor should have advanced past the first message"
573 );
574 }
575
576 #[tokio::test]
578 async fn poll_messages_filters_dms_by_viewer() {
579 use crate::message::make_dm;
580 let chat = NamedTempFile::new().unwrap();
581 let cursor_dir = TempDir::new().unwrap();
582 let cursor = cursor_dir.path().join("cursor");
583
584 let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
585 let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
586 crate::history::append(chat.path(), &dm_alice_bob)
587 .await
588 .unwrap();
589 crate::history::append(chat.path(), &dm_alice_carol)
590 .await
591 .unwrap();
592
593 let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
595 .await
596 .unwrap();
597 assert_eq!(result.len(), 1);
598 assert_eq!(result[0].id(), dm_alice_bob.id());
599 }
600
601 #[tokio::test]
604 async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
605 use crate::message::make_dm;
606 let chat = NamedTempFile::new().unwrap();
607 let cursor_dir = TempDir::new().unwrap();
608 let cursor = cursor_dir.path().join("cursor");
609
610 let dm = make_dm("r", "alice", "bob", "secret for bob");
612 let msg = make_message("r", "alice", "public hello");
613 crate::history::append(chat.path(), &dm).await.unwrap();
614 crate::history::append(chat.path(), &msg).await.unwrap();
615
616 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
618 .await
619 .unwrap();
620
621 let username = "bob";
622 let foreign: Vec<&Message> = messages
623 .iter()
624 .filter(|m| match m {
625 Message::Message { user, .. } => user != username,
626 Message::DirectMessage { to, .. } => to == username,
627 _ => false,
628 })
629 .collect();
630
631 assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
633 assert!(
634 foreign
635 .iter()
636 .any(|m| matches!(m, Message::DirectMessage { .. })),
637 "DM must not be silently consumed"
638 );
639 }
640
641 #[tokio::test]
643 async fn poll_messages_dm_from_viewer_excluded_from_watch() {
644 use crate::message::make_dm;
645 let chat = NamedTempFile::new().unwrap();
646 let cursor_dir = TempDir::new().unwrap();
647 let cursor = cursor_dir.path().join("cursor");
648
649 let dm = make_dm("r", "bob", "alice", "from bob");
651 crate::history::append(chat.path(), &dm).await.unwrap();
652
653 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
654 .await
655 .unwrap();
656
657 let username = "bob";
658 let foreign: Vec<&Message> = messages
659 .iter()
660 .filter(|m| match m {
661 Message::Message { user, .. } => user != username,
662 Message::DirectMessage { to, .. } => to == username,
663 _ => false,
664 })
665 .collect();
666
667 assert!(
668 foreign.is_empty(),
669 "DMs sent by the watcher should not wake watch"
670 );
671 }
672
673 #[tokio::test]
675 async fn poll_messages_host_sees_all_dms() {
676 use crate::message::make_dm;
677 let chat = NamedTempFile::new().unwrap();
678 let cursor_dir = TempDir::new().unwrap();
679 let cursor = cursor_dir.path().join("cursor");
680
681 let dm_alice_bob = make_dm("r", "alice", "bob", "private");
682 let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
683 crate::history::append(chat.path(), &dm_alice_bob)
684 .await
685 .unwrap();
686 crate::history::append(chat.path(), &dm_carol_dave)
687 .await
688 .unwrap();
689
690 let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
692 .await
693 .unwrap();
694 assert_eq!(result.len(), 2, "host should see all DMs");
695 }
696
697 #[tokio::test]
699 async fn poll_messages_non_host_cannot_see_unrelated_dms() {
700 use crate::message::make_dm;
701 let chat = NamedTempFile::new().unwrap();
702 let cursor_dir = TempDir::new().unwrap();
703 let cursor = cursor_dir.path().join("cursor");
704
705 let dm = make_dm("r", "alice", "bob", "private");
706 crate::history::append(chat.path(), &dm).await.unwrap();
707
708 let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
710 .await
711 .unwrap();
712 assert!(result.is_empty(), "non-host third party should not see DM");
713 }
714
715 #[tokio::test]
717 async fn pull_messages_host_sees_all_dms() {
718 use crate::message::make_dm;
719 let chat = NamedTempFile::new().unwrap();
720
721 let dm = make_dm("r", "alice", "bob", "secret");
722 crate::history::append(chat.path(), &dm).await.unwrap();
723
724 let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
725 .await
726 .unwrap();
727 assert_eq!(result.len(), 1, "host should see the DM via pull");
728 }
729
730 #[tokio::test]
734 async fn poll_multi_merges_by_timestamp() {
735 let chat_a = NamedTempFile::new().unwrap();
736 let chat_b = NamedTempFile::new().unwrap();
737
738 let rid_a = format!("test-merge-a-{}", std::process::id());
739 let rid_b = format!("test-merge-b-{}", std::process::id());
740
741 let msg_a1 = make_message(&rid_a, "alice", "a1");
743 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
744 let msg_b1 = make_message(&rid_b, "bob", "b1");
745 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
746 let msg_a2 = make_message(&rid_a, "alice", "a2");
747
748 crate::history::append(chat_a.path(), &msg_a1)
749 .await
750 .unwrap();
751 crate::history::append(chat_b.path(), &msg_b1)
752 .await
753 .unwrap();
754 crate::history::append(chat_a.path(), &msg_a2)
755 .await
756 .unwrap();
757
758 let rooms: Vec<(&str, &Path)> = vec![
759 (rid_a.as_str(), chat_a.path()),
760 (rid_b.as_str(), chat_b.path()),
761 ];
762
763 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
764 assert_eq!(result.len(), 3);
765 assert!(result[0].ts() <= result[1].ts());
767 assert!(result[1].ts() <= result[2].ts());
768 assert_eq!(result[0].room(), &rid_a);
770
771 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
773 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
774 }
775
776 #[tokio::test]
778 async fn poll_multi_advances_per_room_cursors() {
779 let chat_a = NamedTempFile::new().unwrap();
780 let chat_b = NamedTempFile::new().unwrap();
781
782 let rid_a = format!("test-cursor-a-{}", std::process::id());
784 let rid_b = format!("test-cursor-b-{}", std::process::id());
785
786 let msg_a = make_message(&rid_a, "alice", "hello a");
787 let msg_b = make_message(&rid_b, "bob", "hello b");
788 crate::history::append(chat_a.path(), &msg_a).await.unwrap();
789 crate::history::append(chat_b.path(), &msg_b).await.unwrap();
790
791 let rooms: Vec<(&str, &Path)> = vec![
792 (rid_a.as_str(), chat_a.path()),
793 (rid_b.as_str(), chat_b.path()),
794 ];
795
796 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
798 assert_eq!(result.len(), 2);
799
800 let result2 = poll_messages_multi(&rooms, "viewer").await.unwrap();
802 assert!(
803 result2.is_empty(),
804 "second multi-poll should return nothing"
805 );
806
807 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
809 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
810 }
811
812 #[tokio::test]
814 async fn poll_multi_one_empty_room() {
815 let chat_a = NamedTempFile::new().unwrap();
816 let chat_b = NamedTempFile::new().unwrap();
817
818 let rid_a = format!("test-empty-a-{}", std::process::id());
819 let rid_b = format!("test-empty-b-{}", std::process::id());
820
821 let msg = make_message(&rid_a, "alice", "only here");
822 crate::history::append(chat_a.path(), &msg).await.unwrap();
823 let rooms: Vec<(&str, &Path)> = vec![
826 (rid_a.as_str(), chat_a.path()),
827 (rid_b.as_str(), chat_b.path()),
828 ];
829
830 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
831 assert_eq!(result.len(), 1);
832 assert_eq!(result[0].room(), &rid_a);
833
834 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "viewer"));
835 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "viewer"));
836 }
837
838 #[tokio::test]
840 async fn poll_multi_no_rooms() {
841 let rooms: Vec<(&str, &Path)> = vec![];
842 let result = poll_messages_multi(&rooms, "viewer").await.unwrap();
843 assert!(result.is_empty());
844 }
845
846 #[tokio::test]
848 async fn poll_multi_filters_dms_across_rooms() {
849 use crate::message::make_dm;
850 let chat_a = NamedTempFile::new().unwrap();
851 let chat_b = NamedTempFile::new().unwrap();
852
853 let rid_a = format!("test-dm-a-{}", std::process::id());
854 let rid_b = format!("test-dm-b-{}", std::process::id());
855
856 let dm_to_bob = make_dm(&rid_a, "alice", "bob", "secret for bob");
858 let dm_to_carol = make_dm(&rid_b, "alice", "carol", "secret for carol");
859 crate::history::append(chat_a.path(), &dm_to_bob)
860 .await
861 .unwrap();
862 crate::history::append(chat_b.path(), &dm_to_carol)
863 .await
864 .unwrap();
865
866 let rooms: Vec<(&str, &Path)> = vec![
867 (rid_a.as_str(), chat_a.path()),
868 (rid_b.as_str(), chat_b.path()),
869 ];
870
871 let result = poll_messages_multi(&rooms, "bob").await.unwrap();
873 assert_eq!(result.len(), 1);
874 assert_eq!(result[0].room(), &rid_a);
875
876 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_a, "bob"));
877 let _ = std::fs::remove_file(crate::paths::cursor_path(&rid_b, "bob"));
878 }
879
880 #[tokio::test]
884 async fn cmd_query_history_returns_all_newest_first() {
885 let chat = NamedTempFile::new().unwrap();
886 let cursor_dir = TempDir::new().unwrap();
887 let token_dir = TempDir::new().unwrap();
888
889 let room_id = format!("test-cqh-{}", std::process::id());
890 write_token_file(&token_dir, &room_id, "alice", "tok-alice");
891 write_meta_file(&room_id, chat.path());
892
893 for i in 0..3u32 {
894 crate::history::append(
895 chat.path(),
896 &make_message(&room_id, "alice", format!("{i}")),
897 )
898 .await
899 .unwrap();
900 }
901
902 let filter = QueryFilter {
903 rooms: vec![room_id.clone()],
904 ascending: false,
905 ..Default::default()
906 };
907 let opts = QueryOptions {
908 new_only: false,
909 wait: false,
910 interval_secs: 5,
911 mentions_only: false,
912 since_uuid: None,
913 };
914
915 let cursor_path = crate::paths::cursor_path(&room_id, "alice");
917 let _ = std::fs::remove_file(&cursor_path);
918
919 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-alice", filter, opts, &cursor_dir)
921 .await
922 .unwrap();
923
924 assert!(
926 !cursor_path.exists(),
927 "historical query must not write a cursor file"
928 );
929
930 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
931 let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
932 }
933
934 #[tokio::test]
936 async fn cmd_query_new_advances_cursor() {
937 let chat = NamedTempFile::new().unwrap();
938 let cursor_dir = TempDir::new().unwrap();
939 let token_dir = TempDir::new().unwrap();
940
941 let room_id = format!("test-cqn-{}", std::process::id());
942 write_token_file(&token_dir, &room_id, "alice", "tok-cqn");
943 write_meta_file(&room_id, chat.path());
944
945 let msg = make_message(&room_id, "bob", "hello");
946 crate::history::append(chat.path(), &msg).await.unwrap();
947
948 let filter = QueryFilter {
949 rooms: vec![room_id.clone()],
950 ascending: true,
951 ..Default::default()
952 };
953 let opts = QueryOptions {
954 new_only: true,
955 wait: false,
956 interval_secs: 5,
957 mentions_only: false,
958 since_uuid: None,
959 };
960
961 let result = oneshot_cmd_query_to_vec(
963 &[room_id.clone()],
964 "tok-cqn",
965 filter.clone(),
966 opts.clone(),
967 &cursor_dir,
968 )
969 .await
970 .unwrap();
971 assert_eq!(result.len(), 1);
972
973 let result2 =
975 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqn", filter, opts, &cursor_dir)
976 .await
977 .unwrap();
978 assert!(
979 result2.is_empty(),
980 "second query should return nothing (cursor advanced)"
981 );
982
983 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
984 let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
985 }
986
987 #[tokio::test]
989 async fn cmd_query_content_search_filters() {
990 let chat = NamedTempFile::new().unwrap();
991 let cursor_dir = TempDir::new().unwrap();
992 let token_dir = TempDir::new().unwrap();
993
994 let room_id = format!("test-cqs-{}", std::process::id());
995 write_token_file(&token_dir, &room_id, "alice", "tok-cqs");
996 write_meta_file(&room_id, chat.path());
997
998 crate::history::append(chat.path(), &make_message(&room_id, "bob", "hello world"))
999 .await
1000 .unwrap();
1001 crate::history::append(chat.path(), &make_message(&room_id, "bob", "goodbye"))
1002 .await
1003 .unwrap();
1004
1005 let filter = QueryFilter {
1006 rooms: vec![room_id.clone()],
1007 content_search: Some("hello".into()),
1008 ascending: true,
1009 ..Default::default()
1010 };
1011 let opts = QueryOptions {
1012 new_only: false,
1013 wait: false,
1014 interval_secs: 5,
1015 mentions_only: false,
1016 since_uuid: None,
1017 };
1018
1019 let result =
1020 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqs", filter, opts, &cursor_dir)
1021 .await
1022 .unwrap();
1023 assert_eq!(result.len(), 1);
1024 assert!(result[0].content().unwrap().contains("hello"));
1025
1026 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1027 let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1028 }
1029
1030 #[tokio::test]
1032 async fn cmd_query_user_filter() {
1033 let chat = NamedTempFile::new().unwrap();
1034 let cursor_dir = TempDir::new().unwrap();
1035 let token_dir = TempDir::new().unwrap();
1036
1037 let room_id = format!("test-cqu-{}", std::process::id());
1038 write_token_file(&token_dir, &room_id, "alice", "tok-cqu");
1039 write_meta_file(&room_id, chat.path());
1040
1041 crate::history::append(chat.path(), &make_message(&room_id, "alice", "from alice"))
1042 .await
1043 .unwrap();
1044 crate::history::append(chat.path(), &make_message(&room_id, "bob", "from bob"))
1045 .await
1046 .unwrap();
1047
1048 let filter = QueryFilter {
1049 rooms: vec![room_id.clone()],
1050 users: vec!["bob".into()],
1051 ascending: true,
1052 ..Default::default()
1053 };
1054 let opts = QueryOptions {
1055 new_only: false,
1056 wait: false,
1057 interval_secs: 5,
1058 mentions_only: false,
1059 since_uuid: None,
1060 };
1061
1062 let result =
1063 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cqu", filter, opts, &cursor_dir)
1064 .await
1065 .unwrap();
1066 assert_eq!(result.len(), 1);
1067 assert_eq!(result[0].user(), "bob");
1068
1069 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1070 let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1071 }
1072
1073 #[tokio::test]
1075 async fn cmd_query_limit() {
1076 let chat = NamedTempFile::new().unwrap();
1077 let cursor_dir = TempDir::new().unwrap();
1078 let token_dir = TempDir::new().unwrap();
1079
1080 let room_id = format!("test-cql-{}", std::process::id());
1081 write_token_file(&token_dir, &room_id, "alice", "tok-cql");
1082 write_meta_file(&room_id, chat.path());
1083
1084 for i in 0..5u32 {
1085 crate::history::append(
1086 chat.path(),
1087 &make_message(&room_id, "bob", format!("msg {i}")),
1088 )
1089 .await
1090 .unwrap();
1091 }
1092
1093 let filter = QueryFilter {
1094 rooms: vec![room_id.clone()],
1095 limit: Some(2),
1096 ascending: false,
1097 ..Default::default()
1098 };
1099 let opts = QueryOptions {
1100 new_only: false,
1101 wait: false,
1102 interval_secs: 5,
1103 mentions_only: false,
1104 since_uuid: None,
1105 };
1106
1107 let result =
1108 oneshot_cmd_query_to_vec(&[room_id.clone()], "tok-cql", filter, opts, &cursor_dir)
1109 .await
1110 .unwrap();
1111 assert_eq!(result.len(), 2, "limit should restrict to 2 messages");
1112
1113 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1114 let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1115 }
1116
1117 fn token_path(room_id: &str, username: &str) -> PathBuf {
1120 crate::paths::token_path(room_id, username)
1121 }
1122
1123 fn write_token_file(_dir: &TempDir, room_id: &str, username: &str, token: &str) {
1124 let path = token_path(room_id, username);
1125 if let Some(parent) = path.parent() {
1126 std::fs::create_dir_all(parent).unwrap();
1127 }
1128 let data = serde_json::json!({ "username": username, "token": token });
1129 std::fs::write(&path, format!("{data}\n")).unwrap();
1130 }
1131
1132 fn write_meta_file(room_id: &str, chat_path: &Path) {
1133 let meta_path = crate::paths::room_meta_path(room_id);
1134 if let Some(parent) = meta_path.parent() {
1135 std::fs::create_dir_all(parent).unwrap();
1136 }
1137 let meta = serde_json::json!({ "chat_path": chat_path.to_string_lossy() });
1138 std::fs::write(&meta_path, format!("{meta}\n")).unwrap();
1139 }
1140
1141 async fn oneshot_cmd_query_to_vec(
1147 room_ids: &[String],
1148 token: &str,
1149 filter: QueryFilter,
1150 opts: QueryOptions,
1151 _cursor_dir: &TempDir,
1152 ) -> anyhow::Result<Vec<Message>> {
1153 let cursor_before = room_ids
1155 .first()
1156 .map(|id| {
1157 super::super::token::username_from_token(id, token)
1159 .ok()
1160 .map(|u| {
1161 let p = crate::paths::cursor_path(id, &u);
1162 std::fs::read_to_string(&p).ok()
1163 })
1164 .flatten()
1165 })
1166 .flatten();
1167
1168 cmd_query(room_ids, token, filter.clone(), opts.clone()).await?;
1170
1171 let cursor_after = room_ids
1173 .first()
1174 .map(|id| {
1175 super::super::token::username_from_token(id, token)
1176 .ok()
1177 .map(|u| {
1178 let p = crate::paths::cursor_path(id, &u);
1179 std::fs::read_to_string(&p).ok()
1180 })
1181 .flatten()
1182 })
1183 .flatten();
1184
1185 if !opts.new_only && !opts.wait {
1189 let mut all: Vec<Message> = Vec::new();
1191 for room_id in room_ids {
1192 let meta_path = crate::paths::room_meta_path(room_id);
1193 let chat_path = chat_path_from_meta(room_id, &meta_path);
1194 let msgs = history::load(&chat_path).await?;
1195 all.extend(msgs);
1196 }
1197 let username = resolve_username_from_rooms(room_ids, token).unwrap_or_default();
1198 let mut result: Vec<Message> = all
1199 .into_iter()
1200 .filter(|m| filter.matches(m, m.room()))
1201 .filter(|m| match m {
1202 Message::DirectMessage { user, to, .. } => user == &username || to == &username,
1203 _ => true,
1204 })
1205 .collect();
1206 apply_sort_and_limit(&mut result, &filter);
1207 Ok(result)
1208 } else {
1209 let advanced = cursor_after != cursor_before;
1212 if advanced {
1213 let room_id = &room_ids[0];
1214 let meta_path = crate::paths::room_meta_path(room_id);
1215 let chat_path = chat_path_from_meta(room_id, &meta_path);
1216 let all = history::load(&chat_path).await?;
1217 let start = match &cursor_before {
1219 Some(id) => all
1220 .iter()
1221 .position(|m| m.id() == id.trim())
1222 .map(|i| i + 1)
1223 .unwrap_or(0),
1224 None => 0,
1225 };
1226 let filtered: Vec<Message> = all[start..]
1227 .iter()
1228 .filter(|m| filter.matches(m, m.room()))
1229 .cloned()
1230 .collect();
1231 Ok(filtered)
1232 } else {
1233 Ok(vec![])
1234 }
1235 }
1236 }
1237
1238 #[test]
1240 fn resolve_username_finds_token_in_second_room() {
1241 let result = resolve_username_from_rooms(&["nonexistent-room-xyz".to_owned()], "bad-token");
1244 assert!(result.is_err());
1245 assert!(result
1246 .unwrap_err()
1247 .to_string()
1248 .contains("token not recognised"));
1249 }
1250
1251 #[test]
1255 fn load_user_tier_missing_file_returns_full() {
1256 let tier = load_user_tier("nonexistent-room-tier-test", "alice");
1258 assert_eq!(tier, SubscriptionTier::Full);
1259 }
1260
1261 #[test]
1263 fn load_user_tier_returns_persisted_tier() {
1264 let state_dir = crate::paths::room_state_dir();
1265 let _ = std::fs::create_dir_all(&state_dir);
1266 let room_id = format!("test-tier-load-{}", std::process::id());
1267 let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1268
1269 let mut map = std::collections::HashMap::new();
1270 map.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1271 map.insert("bob".to_string(), SubscriptionTier::Unsubscribed);
1272 let json = serde_json::to_string_pretty(&map).unwrap();
1273 std::fs::write(&sub_path, json).unwrap();
1274
1275 assert_eq!(
1276 load_user_tier(&room_id, "alice"),
1277 SubscriptionTier::MentionsOnly
1278 );
1279 assert_eq!(
1280 load_user_tier(&room_id, "bob"),
1281 SubscriptionTier::Unsubscribed
1282 );
1283 assert_eq!(load_user_tier(&room_id, "carol"), SubscriptionTier::Full);
1285
1286 let _ = std::fs::remove_file(&sub_path);
1287 }
1288
1289 #[test]
1291 fn apply_tier_filter_full_keeps_all() {
1292 let mut msgs = vec![
1293 make_message("r", "alice", "hello"),
1294 make_message("r", "bob", "world"),
1295 ];
1296 apply_tier_filter(&mut msgs, SubscriptionTier::Full, "carol");
1297 assert_eq!(msgs.len(), 2);
1298 }
1299
1300 #[test]
1302 fn apply_tier_filter_mentions_only_filters() {
1303 let mut msgs = vec![
1304 make_message("r", "alice", "hey @carol check this"),
1305 make_message("r", "bob", "unrelated message"),
1306 make_message("r", "dave", "also @carol"),
1307 ];
1308 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1309 assert_eq!(msgs.len(), 2);
1310 assert!(msgs[0].content().unwrap().contains("@carol"));
1311 assert!(msgs[1].content().unwrap().contains("@carol"));
1312 }
1313
1314 #[test]
1316 fn apply_tier_filter_unsubscribed_clears_all() {
1317 let mut msgs = vec![
1318 make_message("r", "alice", "hey @carol"),
1319 make_message("r", "bob", "world"),
1320 ];
1321 apply_tier_filter(&mut msgs, SubscriptionTier::Unsubscribed, "carol");
1322 assert!(msgs.is_empty());
1323 }
1324
1325 #[test]
1327 fn apply_tier_filter_mentions_only_no_mentions_returns_empty() {
1328 let mut msgs = vec![
1329 make_message("r", "alice", "hello"),
1330 make_message("r", "bob", "world"),
1331 ];
1332 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "carol");
1333 assert!(msgs.is_empty());
1334 }
1335
1336 #[tokio::test]
1338 async fn cmd_query_public_bypasses_tier() {
1339 let chat = NamedTempFile::new().unwrap();
1340 let token_dir = TempDir::new().unwrap();
1341 let cursor_dir = TempDir::new().unwrap();
1342
1343 let room_id = format!("test-pub-tier-{}", std::process::id());
1344 write_token_file(&token_dir, &room_id, "alice", "tok-pub-tier");
1345 write_meta_file(&room_id, chat.path());
1346
1347 let state_dir = crate::paths::room_state_dir();
1349 let _ = std::fs::create_dir_all(&state_dir);
1350 let sub_path = crate::paths::broker_subscriptions_path(&state_dir, &room_id);
1351 let mut map = std::collections::HashMap::new();
1352 map.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1353 std::fs::write(&sub_path, serde_json::to_string(&map).unwrap()).unwrap();
1354
1355 crate::history::append(chat.path(), &make_message(&room_id, "bob", "visible"))
1357 .await
1358 .unwrap();
1359
1360 let filter = QueryFilter {
1362 rooms: vec![room_id.clone()],
1363 public_only: true,
1364 ascending: true,
1365 ..Default::default()
1366 };
1367 let opts = QueryOptions {
1368 new_only: false,
1369 wait: false,
1370 interval_secs: 5,
1371 mentions_only: false,
1372 since_uuid: None,
1373 };
1374
1375 let result = oneshot_cmd_query_to_vec(
1376 &[room_id.clone()],
1377 "tok-pub-tier",
1378 filter,
1379 opts,
1380 &cursor_dir,
1381 )
1382 .await
1383 .unwrap();
1384 assert_eq!(
1385 result.len(),
1386 1,
1387 "public flag should bypass Unsubscribed tier"
1388 );
1389
1390 let _ = std::fs::remove_file(&sub_path);
1391 let _ = std::fs::remove_file(crate::paths::room_meta_path(&room_id));
1392 let _ = std::fs::remove_file(&token_path(&room_id, "alice"));
1393 }
1394
1395 #[test]
1397 fn mentions_only_tier_sets_mention_user_on_filter() {
1398 let mut filter = QueryFilter::default();
1401 let tier = SubscriptionTier::MentionsOnly;
1402
1403 match tier {
1405 SubscriptionTier::MentionsOnly => {
1406 if filter.mention_user.is_none() {
1407 filter.mention_user = Some("alice".to_string());
1408 }
1409 }
1410 _ => {}
1411 }
1412
1413 assert_eq!(filter.mention_user, Some("alice".to_string()));
1414
1415 let mut msgs = vec![
1417 make_message("r", "bob", "hey @alice look"),
1418 make_message("r", "bob", "unrelated chatter"),
1419 ];
1420 apply_tier_filter(&mut msgs, SubscriptionTier::MentionsOnly, "alice");
1421 assert_eq!(msgs.len(), 1);
1422 assert!(msgs[0].content().unwrap().contains("@alice"));
1423 }
1424
1425 #[test]
1427 fn mentions_only_tier_preserves_existing_mention_user() {
1428 let mut filter = QueryFilter {
1429 mention_user: Some("bob".to_string()),
1430 ..Default::default()
1431 };
1432
1433 match SubscriptionTier::MentionsOnly {
1435 SubscriptionTier::MentionsOnly => {
1436 if filter.mention_user.is_none() {
1437 filter.mention_user = Some("alice".to_string());
1438 }
1439 }
1440 _ => {}
1441 }
1442
1443 assert_eq!(
1444 filter.mention_user,
1445 Some("bob".to_string()),
1446 "existing mention_user filter should be preserved"
1447 );
1448 }
1449
1450 #[test]
1453 fn per_room_tier_filter_full_keeps_all() {
1454 let mut msgs = vec![
1455 make_message("dev", "alice", "hello from dev"),
1456 make_message("lobby", "bob", "hello from lobby"),
1457 ];
1458 let rooms = vec![
1460 "nonexistent-perroom-full-1".to_string(),
1461 "nonexistent-perroom-full-2".to_string(),
1462 ];
1463 apply_per_room_tier_filter(&mut msgs, &rooms, "carol");
1464 assert_eq!(msgs.len(), 2);
1465 }
1466
1467 #[test]
1468 fn per_room_tier_filter_mixed_tiers() {
1469 let state_dir = crate::paths::room_state_dir();
1470 let _ = std::fs::create_dir_all(&state_dir);
1471
1472 let room_full = format!("perroom-mixed-full-{}", std::process::id());
1473 let room_unsub = format!("perroom-mixed-unsub-{}", std::process::id());
1474 let room_mentions = format!("perroom-mixed-ment-{}", std::process::id());
1475
1476 let sub_unsub = crate::paths::broker_subscriptions_path(&state_dir, &room_unsub);
1478 let mut map_unsub = std::collections::HashMap::new();
1479 map_unsub.insert("alice".to_string(), SubscriptionTier::Unsubscribed);
1480 std::fs::write(&sub_unsub, serde_json::to_string(&map_unsub).unwrap()).unwrap();
1481
1482 let sub_ment = crate::paths::broker_subscriptions_path(&state_dir, &room_mentions);
1483 let mut map_ment = std::collections::HashMap::new();
1484 map_ment.insert("alice".to_string(), SubscriptionTier::MentionsOnly);
1485 std::fs::write(&sub_ment, serde_json::to_string(&map_ment).unwrap()).unwrap();
1486
1487 let mut msgs = vec![
1488 make_message(&room_full, "bob", "visible in full room"),
1489 make_message(&room_unsub, "bob", "invisible — unsubscribed"),
1490 make_message(&room_mentions, "bob", "no mention — filtered"),
1491 make_message(&room_mentions, "bob", "hey @alice check this"),
1492 ];
1493
1494 let rooms = vec![room_full.clone(), room_unsub.clone(), room_mentions.clone()];
1495 apply_per_room_tier_filter(&mut msgs, &rooms, "alice");
1496
1497 assert_eq!(msgs.len(), 2);
1499 assert!(msgs[0].content().unwrap().contains("visible in full room"));
1500 assert!(msgs[1].content().unwrap().contains("@alice"));
1501
1502 let _ = std::fs::remove_file(&sub_unsub);
1504 let _ = std::fs::remove_file(&sub_ment);
1505 }
1506
1507 #[test]
1508 fn per_room_tier_filter_unknown_room_defaults_to_full() {
1509 let mut msgs = vec![make_message("mystery", "bob", "hello")];
1510 apply_per_room_tier_filter(&mut msgs, &["other".to_string()], "alice");
1512 assert_eq!(msgs.len(), 1);
1513 }
1514
1515 #[tokio::test]
1517 async fn pull_messages_returns_tail_without_cursor_change() {
1518 let chat = NamedTempFile::new().unwrap();
1519 let cursor_dir = TempDir::new().unwrap();
1520 let cursor = cursor_dir.path().join("cursor");
1521
1522 for i in 0..5u32 {
1523 crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
1524 .await
1525 .unwrap();
1526 }
1527
1528 let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
1529 assert_eq!(pulled.len(), 3);
1530
1531 let polled = poll_messages(chat.path(), &cursor, None, None, None)
1533 .await
1534 .unwrap();
1535 assert_eq!(polled.len(), 5);
1536 }
1537}