1mod commands;
2mod filter_events;
3mod filter_tier;
4pub(super) mod meta;
5mod multi_room;
6
7use std::path::Path;
8
9use crate::{history, message::Message};
10
11use super::token::{read_cursor, write_cursor};
12
13pub use commands::{cmd_poll, cmd_poll_multi, cmd_pull, cmd_query, cmd_watch};
16pub use multi_room::poll_messages_multi;
17
18#[derive(Debug, Clone)]
22pub struct QueryOptions {
23 pub new_only: bool,
26 pub wait: bool,
28 pub interval_secs: u64,
30 pub mentions_only: bool,
33 pub since_uuid: Option<String>,
36 pub timeout_secs: Option<u64>,
40}
41
42pub async fn poll_messages(
57 chat_path: &Path,
58 cursor_path: &Path,
59 viewer: Option<&str>,
60 host: Option<&str>,
61 since: Option<&str>,
62) -> anyhow::Result<Vec<Message>> {
63 let effective_since: Option<String> = since
64 .map(|s| s.to_owned())
65 .or_else(|| read_cursor(cursor_path));
66
67 let messages = history::load(chat_path).await?;
68
69 let start = match &effective_since {
70 Some(id) => messages
71 .iter()
72 .position(|m| m.id() == id)
73 .map(|i| i + 1)
74 .unwrap_or(0),
75 None => 0,
76 };
77
78 let result: Vec<Message> = messages[start..]
79 .iter()
80 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
81 .cloned()
82 .collect();
83
84 if let Some(last) = result.last() {
85 write_cursor(cursor_path, last.id())?;
86 }
87
88 Ok(result)
89}
90
91pub async fn pull_messages(
99 chat_path: &Path,
100 n: usize,
101 viewer: Option<&str>,
102 host: Option<&str>,
103) -> anyhow::Result<Vec<Message>> {
104 let clamped = n.min(200);
105 let all = history::tail(chat_path, clamped).await?;
106 let visible: Vec<Message> = all
107 .into_iter()
108 .filter(|m| viewer.map(|v| m.is_visible_to(v, host)).unwrap_or(true))
109 .collect();
110 Ok(visible)
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::message::make_message;
117 use tempfile::{NamedTempFile, TempDir};
118
119 #[tokio::test]
121 async fn poll_messages_no_cursor_returns_all() {
122 let chat = NamedTempFile::new().unwrap();
123 let cursor_dir = TempDir::new().unwrap();
124 let cursor = cursor_dir.path().join("cursor");
125
126 let msg = make_message("r", "alice", "hello");
127 crate::history::append(chat.path(), &msg).await.unwrap();
128
129 let result = poll_messages(chat.path(), &cursor, None, None, None)
130 .await
131 .unwrap();
132 assert_eq!(result.len(), 1);
133 assert_eq!(result[0].id(), msg.id());
134 }
135
136 #[tokio::test]
138 async fn poll_messages_advances_cursor() {
139 let chat = NamedTempFile::new().unwrap();
140 let cursor_dir = TempDir::new().unwrap();
141 let cursor = cursor_dir.path().join("cursor");
142
143 let msg = make_message("r", "alice", "hello");
144 crate::history::append(chat.path(), &msg).await.unwrap();
145
146 poll_messages(chat.path(), &cursor, None, None, None)
147 .await
148 .unwrap();
149
150 let second = poll_messages(chat.path(), &cursor, None, None, None)
151 .await
152 .unwrap();
153 assert!(
154 second.is_empty(),
155 "cursor should have advanced past the first message"
156 );
157 }
158
159 #[tokio::test]
161 async fn poll_messages_filters_dms_by_viewer() {
162 use crate::message::make_dm;
163 let chat = NamedTempFile::new().unwrap();
164 let cursor_dir = TempDir::new().unwrap();
165 let cursor = cursor_dir.path().join("cursor");
166
167 let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
168 let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
169 crate::history::append(chat.path(), &dm_alice_bob)
170 .await
171 .unwrap();
172 crate::history::append(chat.path(), &dm_alice_carol)
173 .await
174 .unwrap();
175
176 let result = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
178 .await
179 .unwrap();
180 assert_eq!(result.len(), 1);
181 assert_eq!(result[0].id(), dm_alice_bob.id());
182 }
183
184 #[tokio::test]
187 async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
188 use crate::message::make_dm;
189 let chat = NamedTempFile::new().unwrap();
190 let cursor_dir = TempDir::new().unwrap();
191 let cursor = cursor_dir.path().join("cursor");
192
193 let dm = make_dm("r", "alice", "bob", "secret for bob");
195 let msg = make_message("r", "alice", "public hello");
196 crate::history::append(chat.path(), &dm).await.unwrap();
197 crate::history::append(chat.path(), &msg).await.unwrap();
198
199 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
201 .await
202 .unwrap();
203
204 let username = "bob";
205 let foreign: Vec<&Message> = messages
206 .iter()
207 .filter(|m| match m {
208 Message::Message { user, .. } | Message::System { user, .. } => user != username,
209 Message::DirectMessage { to, .. } => to == username,
210 _ => false,
211 })
212 .collect();
213
214 assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
216 assert!(
217 foreign
218 .iter()
219 .any(|m| matches!(m, Message::DirectMessage { .. })),
220 "DM must not be silently consumed"
221 );
222 }
223
224 #[tokio::test]
226 async fn poll_messages_dm_from_viewer_excluded_from_watch() {
227 use crate::message::make_dm;
228 let chat = NamedTempFile::new().unwrap();
229 let cursor_dir = TempDir::new().unwrap();
230 let cursor = cursor_dir.path().join("cursor");
231
232 let dm = make_dm("r", "bob", "alice", "from bob");
234 crate::history::append(chat.path(), &dm).await.unwrap();
235
236 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
237 .await
238 .unwrap();
239
240 let username = "bob";
241 let foreign: Vec<&Message> = messages
242 .iter()
243 .filter(|m| match m {
244 Message::Message { user, .. } | Message::System { user, .. } => user != username,
245 Message::DirectMessage { to, .. } => to == username,
246 _ => false,
247 })
248 .collect();
249
250 assert!(
251 foreign.is_empty(),
252 "DMs sent by the watcher should not wake watch"
253 );
254 }
255
256 #[tokio::test]
258 async fn watch_filter_wakes_on_foreign_system_message() {
259 use room_protocol::make_system;
260 let chat = NamedTempFile::new().unwrap();
261 let cursor_dir = TempDir::new().unwrap();
262 let cursor = cursor_dir.path().join("cursor");
263
264 let sys = make_system("r", "plugin:taskboard", "task tb-001 approved");
265 crate::history::append(chat.path(), &sys).await.unwrap();
266
267 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
268 .await
269 .unwrap();
270
271 let username = "bob";
272 let foreign: Vec<&Message> = messages
273 .iter()
274 .filter(|m| match m {
275 Message::Message { user, .. } | Message::System { user, .. } => user != username,
276 Message::DirectMessage { to, .. } => to == username,
277 _ => false,
278 })
279 .collect();
280
281 assert_eq!(
282 foreign.len(),
283 1,
284 "system messages from other users should wake watch"
285 );
286 assert!(matches!(foreign[0], Message::System { .. }));
287 }
288
289 #[tokio::test]
291 async fn watch_filter_ignores_own_system_message() {
292 use room_protocol::make_system;
293 let chat = NamedTempFile::new().unwrap();
294 let cursor_dir = TempDir::new().unwrap();
295 let cursor = cursor_dir.path().join("cursor");
296
297 let sys = make_system("r", "bob", "bob subscribed (tier: full)");
298 crate::history::append(chat.path(), &sys).await.unwrap();
299
300 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
301 .await
302 .unwrap();
303
304 let username = "bob";
305 let foreign: Vec<&Message> = messages
306 .iter()
307 .filter(|m| match m {
308 Message::Message { user, .. } | Message::System { user, .. } => user != username,
309 Message::DirectMessage { to, .. } => to == username,
310 _ => false,
311 })
312 .collect();
313
314 assert!(
315 foreign.is_empty(),
316 "system messages from self should not wake watch"
317 );
318 }
319
320 #[tokio::test]
322 async fn watch_filter_mixed_message_types() {
323 use crate::message::make_dm;
324 use room_protocol::make_system;
325 let chat = NamedTempFile::new().unwrap();
326 let cursor_dir = TempDir::new().unwrap();
327 let cursor = cursor_dir.path().join("cursor");
328
329 let msg = make_message("r", "alice", "hello");
331 let sys = make_system("r", "plugin:taskboard", "task tb-001 claimed by alice");
333 let own_sys = make_system("r", "bob", "bob subscribed (tier: full)");
335 let dm = make_dm("r", "alice", "bob", "private note");
337 let own_msg = make_message("r", "bob", "my own message");
339
340 for m in [&msg, &sys, &own_sys, &dm, &own_msg] {
341 crate::history::append(chat.path(), m).await.unwrap();
342 }
343
344 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None, None)
345 .await
346 .unwrap();
347
348 let username = "bob";
349 let foreign: Vec<&Message> = messages
350 .iter()
351 .filter(|m| match m {
352 Message::Message { user, .. } | Message::System { user, .. } => user != username,
353 Message::DirectMessage { to, .. } => to == username,
354 _ => false,
355 })
356 .collect();
357
358 assert_eq!(
359 foreign.len(),
360 3,
361 "should see: foreign message + foreign system + DM to self"
362 );
363 assert!(
364 foreign.iter().any(|m| matches!(m, Message::System { .. })),
365 "system message must appear in watch results"
366 );
367 assert!(
368 foreign.iter().any(|m| matches!(m, Message::Message { .. })),
369 "regular foreign message must appear"
370 );
371 assert!(
372 foreign
373 .iter()
374 .any(|m| matches!(m, Message::DirectMessage { .. })),
375 "DM to self must appear"
376 );
377 }
378
379 #[tokio::test]
381 async fn poll_messages_host_sees_all_dms() {
382 use crate::message::make_dm;
383 let chat = NamedTempFile::new().unwrap();
384 let cursor_dir = TempDir::new().unwrap();
385 let cursor = cursor_dir.path().join("cursor");
386
387 let dm_alice_bob = make_dm("r", "alice", "bob", "private");
388 let dm_carol_dave = make_dm("r", "carol", "dave", "also private");
389 crate::history::append(chat.path(), &dm_alice_bob)
390 .await
391 .unwrap();
392 crate::history::append(chat.path(), &dm_carol_dave)
393 .await
394 .unwrap();
395
396 let result = poll_messages(chat.path(), &cursor, Some("eve"), Some("eve"), None)
398 .await
399 .unwrap();
400 assert_eq!(result.len(), 2, "host should see all DMs");
401 }
402
403 #[tokio::test]
405 async fn poll_messages_non_host_cannot_see_unrelated_dms() {
406 use crate::message::make_dm;
407 let chat = NamedTempFile::new().unwrap();
408 let cursor_dir = TempDir::new().unwrap();
409 let cursor = cursor_dir.path().join("cursor");
410
411 let dm = make_dm("r", "alice", "bob", "private");
412 crate::history::append(chat.path(), &dm).await.unwrap();
413
414 let result = poll_messages(chat.path(), &cursor, Some("carol"), None, None)
416 .await
417 .unwrap();
418 assert!(result.is_empty(), "non-host third party should not see DM");
419 }
420
421 #[tokio::test]
423 async fn pull_messages_host_sees_all_dms() {
424 use crate::message::make_dm;
425 let chat = NamedTempFile::new().unwrap();
426
427 let dm = make_dm("r", "alice", "bob", "secret");
428 crate::history::append(chat.path(), &dm).await.unwrap();
429
430 let result = pull_messages(chat.path(), 10, Some("eve"), Some("eve"))
431 .await
432 .unwrap();
433 assert_eq!(result.len(), 1, "host should see the DM via pull");
434 }
435
436 #[tokio::test]
438 async fn pull_messages_returns_tail_without_cursor_change() {
439 let chat = NamedTempFile::new().unwrap();
440 let cursor_dir = TempDir::new().unwrap();
441 let cursor = cursor_dir.path().join("cursor");
442
443 for i in 0..5u32 {
444 crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
445 .await
446 .unwrap();
447 }
448
449 let pulled = pull_messages(chat.path(), 3, None, None).await.unwrap();
450 assert_eq!(pulled.len(), 3);
451
452 let polled = poll_messages(chat.path(), &cursor, None, None, None)
454 .await
455 .unwrap();
456 assert_eq!(polled.len(), 5);
457 }
458}