1mod commands;
2mod filter_events;
3mod filter_tier;
4pub(super) mod meta;
5mod multi_room;
6
7use std::path::Path;
8
9use crate::{history, message::Message};
10
11use super::token::{read_cursor, write_cursor};
12
13pub use commands::{cmd_poll, cmd_poll_multi, cmd_pull, cmd_query, cmd_watch};
16pub use multi_room::poll_messages_multi;
17
18#[derive(Debug, Clone)]
22pub struct QueryOptions {
23 pub new_only: bool,
26 pub wait: bool,
28 pub interval_secs: u64,
30 pub mentions_only: bool,
33 pub since_uuid: Option<String>,
36}
37
38pub async fn poll_messages(
53 chat_path: &Path,
54 cursor_path: &Path,
55 viewer: Option<&str>,
56 host: Option<&str>,
57 since: Option<&str>,
58) -> anyhow::Result<Vec<Message>> {
59 let effective_since: Option<String> = since
60 .map(|s| s.to_owned())
61 .or_else(|| read_cursor(cursor_path));
62
63 let messages = history::load(chat_path).await?;
64
65 let start = match &effective_since {
66 Some(id) => messages
67 .iter()
68 .position(|m| m.id() == id)
69 .map(|i| i + 1)
70 .unwrap_or(0),
71 None => 0,
72 };
73
74 let result: Vec<Message> = messages[start..]
75 .iter()
76 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
77 .cloned()
78 .collect();
79
80 if let Some(last) = result.last() {
81 write_cursor(cursor_path, last.id())?;
82 }
83
84 Ok(result)
85}
86
87pub async fn pull_messages(
95 chat_path: &Path,
96 n: usize,
97 viewer: Option<&str>,
98 host: Option<&str>,
99) -> anyhow::Result<Vec<Message>> {
100 let clamped = n.min(200);
101 let all = history::tail(chat_path, clamped).await?;
102 let visible: Vec<Message> = all
103 .into_iter()
104 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
105 .collect();
106 Ok(visible)
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use crate::message::make_message;
113 use tempfile::{NamedTempFile, TempDir};
114
115 #[tokio::test]
117 async fn poll_messages_no_cursor_returns_all() {
118 let chat = NamedTempFile::new().unwrap();
119 let cursor_dir = TempDir::new().unwrap();
120 let cursor = cursor_dir.path().join("cursor");
121
122 let msg = make_message("r", "alice", "hello");
123 crate::history::append(chat.path(), &msg).await.unwrap();
124
125 let result = poll_messages(chat.path(), &cursor, None, None, None)
126 .await
127 .unwrap();
128 assert_eq!(result.len(), 1);
129 assert_eq!(result[0].id(), msg.id());
130 }
131
132 #[tokio::test]
134 async fn poll_messages_advances_cursor() {
135 let chat = NamedTempFile::new().unwrap();
136 let cursor_dir = TempDir::new().unwrap();
137 let cursor = cursor_dir.path().join("cursor");
138
139 let msg = make_message("r", "alice", "hello");
140 crate::history::append(chat.path(), &msg).await.unwrap();
141
142 poll_messages(chat.path(), &cursor, None, None, None)
143 .await
144 .unwrap();
145
146 let second = poll_messages(chat.path(), &cursor, None, None, None)
147 .await
148 .unwrap();
149 assert!(
150 second.is_empty(),
151 "cursor should have advanced past the first message"
152 );
153 }
154
155 #[tokio::test]
157 async fn poll_messages_filters_dms_by_viewer() {
158 use crate::message::make_dm;
159 let chat = NamedTempFile::new().unwrap();
160 let cursor_dir = TempDir::new().unwrap();
161 let cursor = cursor_dir.path().join("cursor");
162
163 let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
164 let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
165 crate::history::append(chat.path(), &dm_alice_bob)
166 .await
167 .unwrap();
168 crate::history::append(chat.path(), &dm_alice_carol)
169 .await
170 .unwrap();
171
172 let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
174 .await
175 .unwrap();
176 assert_eq!(result.len(), 1);
177 assert_eq!(result[0].id(), dm_alice_bob.id());
178 }
179
180 #[tokio::test]
183 async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
184 use crate::message::make_dm;
185 let chat = NamedTempFile::new().unwrap();
186 let cursor_dir = TempDir::new().unwrap();
187 let cursor = cursor_dir.path().join("cursor");
188
189 let dm = make_dm("r", "alice", "bob", "secret for bob");
191 let msg = make_message("r", "alice", "public hello");
192 crate::history::append(chat.path(), &dm).await.unwrap();
193 crate::history::append(chat.path(), &msg).await.unwrap();
194
195 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
197 .await
198 .unwrap();
199
200 let username = "bob";
201 let foreign: Vec<&Message> = messages
202 .iter()
203 .filter(|m| match m {
204 Message::Message { user, .. } | Message::System { user, .. } => user != username,
205 Message::DirectMessage { to, .. } => to == username,
206 _ => false,
207 })
208 .collect();
209
210 assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
212 assert!(
213 foreign
214 .iter()
215 .any(|m| matches!(m, Message::DirectMessage { .. })),
216 "DM must not be silently consumed"
217 );
218 }
219
220 #[tokio::test]
222 async fn poll_messages_dm_from_viewer_excluded_from_watch() {
223 use crate::message::make_dm;
224 let chat = NamedTempFile::new().unwrap();
225 let cursor_dir = TempDir::new().unwrap();
226 let cursor = cursor_dir.path().join("cursor");
227
228 let dm = make_dm("r", "bob", "alice", "from bob");
230 crate::history::append(chat.path(), &dm).await.unwrap();
231
232 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
233 .await
234 .unwrap();
235
236 let username = "bob";
237 let foreign: Vec<&Message> = messages
238 .iter()
239 .filter(|m| match m {
240 Message::Message { user, .. } | Message::System { user, .. } => user != username,
241 Message::DirectMessage { to, .. } => to == username,
242 _ => false,
243 })
244 .collect();
245
246 assert!(
247 foreign.is_empty(),
248 "DMs sent by the watcher should not wake watch"
249 );
250 }
251
252 #[tokio::test]
254 async fn watch_filter_wakes_on_foreign_system_message() {
255 use room_protocol::make_system;
256 let chat = NamedTempFile::new().unwrap();
257 let cursor_dir = TempDir::new().unwrap();
258 let cursor = cursor_dir.path().join("cursor");
259
260 let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
261 crate::history::append(chat.path(), &sys).await.unwrap();
262
263 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
264 .await
265 .unwrap();
266
267 let username = "bob";
268 let foreign: Vec<&Message> = messages
269 .iter()
270 .filter(|m| match m {
271 Message::Message { user, .. } | Message::System { user, .. } => user != username,
272 Message::DirectMessage { to, .. } => to == username,
273 _ => false,
274 })
275 .collect();
276
277 assert_eq!(
278 foreign.len(),
279 1,
280 "system messages from other users should wake watch"
281 );
282 assert!(matches!(foreign[0], Message::System { .. }));
283 }
284
285 #[tokio::test]
287 async fn watch_filter_ignores_own_system_message() {
288 use room_protocol::make_system;
289 let chat = NamedTempFile::new().unwrap();
290 let cursor_dir = TempDir::new().unwrap();
291 let cursor = cursor_dir.path().join("cursor");
292
293 let sys = make_system("r", "bob", "bob subscribed (tier: full)");
294 crate::history::append(chat.path(), &sys).await.unwrap();
295
296 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
297 .await
298 .unwrap();
299
300 let username = "bob";
301 let foreign: Vec<&Message> = messages
302 .iter()
303 .filter(|m| match m {
304 Message::Message { user, .. } | Message::System { user, .. } => user != username,
305 Message::DirectMessage { to, .. } => to == username,
306 _ => false,
307 })
308 .collect();
309
310 assert!(
311 foreign.is_empty(),
312 "system messages from self should not wake watch"
313 );
314 }
315
316 #[tokio::test]
318 async fn watch_filter_mixed_message_types() {
319 use crate::message::make_dm;
320 use room_protocol::make_system;
321 let chat = NamedTempFile::new().unwrap();
322 let cursor_dir = TempDir::new().unwrap();
323 let cursor = cursor_dir.path().join("cursor");
324
325 let msg = make_message("r", "alice", "hello");
327 let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
329 let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
331 let dm = make_dm("r", "alice", "bob", "private note");
333 let own_msg = make_message("r", "bob", "my own message");
335
336 for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
337 crate::history::append(chat.path(), m).await.unwrap();
338 }
339
340 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
341 .await
342 .unwrap();
343
344 let username = "bob";
345 let foreign: Vec<&Message> = messages
346 .iter()
347 .filter(|m| match m {
348 Message::Message { user, .. } | Message::System { user, .. } => user != username,
349 Message::DirectMessage { to, .. } => to == username,
350 _ => false,
351 })
352 .collect();
353
354 assert_eq!(
355 foreign.len(),
356 3,
357 "should see: foreign message + foreign system + DM to self"
358 );
359 assert!(
360 foreign.iter().any(|m| matches!(m, Message::System { .. })),
361 "system message must appear in watch results"
362 );
363 assert!(
364 foreign.iter().any(|m| matches!(m, Message::Message { .. })),
365 "regular foreign message must appear"
366 );
367 assert!(
368 foreign
369 .iter()
370 .any(|m| matches!(m, Message::DirectMessage { .. })),
371 "DM to self must appear"
372 );
373 }
374
375 #[tokio::test]
377 async fn poll_messages_host_sees_all_dms() {
378 use crate::message::make_dm;
379 let chat = NamedTempFile::new().unwrap();
380 let cursor_dir = TempDir::new().unwrap();
381 let cursor = cursor_dir.path().join("cursor");
382
383 let dm_alice_bob = make_dm("r", "alice", "bob", "private");
384 let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
385 crate::history::append(chat.path(), &dm_alice_bob)
386 .await
387 .unwrap();
388 crate::history::append(chat.path(), &dm_carol_dave)
389 .await
390 .unwrap();
391
392 let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
394 .await
395 .unwrap();
396 assert_eq!(result.len(), 2, "host should see all DMs");
397 }
398
399 #[tokio::test]
401 async fn poll_messages_non_host_cannot_see_unrelated_dms() {
402 use crate::message::make_dm;
403 let chat = NamedTempFile::new().unwrap();
404 let cursor_dir = TempDir::new().unwrap();
405 let cursor = cursor_dir.path().join("cursor");
406
407 let dm = make_dm("r", "alice", "bob", "private");
408 crate::history::append(chat.path(), &dm).await.unwrap();
409
410 let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
412 .await
413 .unwrap();
414 assert!(result.is_empty(), "non-host third party should not see DM");
415 }
416
417 #[tokio::test]
419 async fn pull_messages_host_sees_all_dms() {
420 use crate::message::make_dm;
421 let chat = NamedTempFile::new().unwrap();
422
423 let dm = make_dm("r", "alice", "bob", "secret");
424 crate::history::append(chat.path(), &dm).await.unwrap();
425
426 let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
427 .await
428 .unwrap();
429 assert_eq!(result.len(), 1, "host should see the DM via pull");
430 }
431
432 #[tokio::test]
434 async fn pull_messages_returns_tail_without_cursor_change() {
435 let chat = NamedTempFile::new().unwrap();
436 let cursor_dir = TempDir::new().unwrap();
437 let cursor = cursor_dir.path().join("cursor");
438
439 for i in 0..5u32 {
440 crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
441 .await
442 .unwrap();
443 }
444
445 let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
446 assert_eq!(pulled.len(), 3);
447
448 let polled = poll_messages(chat.path(), &cursor, None, None, None)
450 .await
451 .unwrap();
452 assert_eq!(polled.len(), 5);
453 }
454}