1use std::path::{Path, PathBuf};
2
3use crate::{history, message::Message};
4
5use super::token::{read_cursor, username_from_token, write_cursor};
6
7pub async fn poll_messages(
18 chat_path: &Path,
19 cursor_path: &Path,
20 viewer: Option<&str>,
21 since: Option<&str>,
22) -> anyhow::Result<Vec<Message>> {
23 let effective_since: Option<String> = since
24 .map(|s| s.to_owned())
25 .or_else(|| read_cursor(cursor_path));
26
27 let messages = history::load(chat_path).await?;
28
29 let start = match &effective_since {
30 Some(id) => messages
31 .iter()
32 .position(|m| m.id() == id)
33 .map(|i| i + 1)
34 .unwrap_or(0),
35 None => 0,
36 };
37
38 let result: Vec<Message> = messages[start..]
39 .iter()
40 .filter(|m| match m {
41 Message::DirectMessage { user, to, .. } => viewer
42 .map(|v| v == user.as_str() || v == to.as_str())
43 .unwrap_or(true),
44 _ => true,
45 })
46 .cloned()
47 .collect();
48
49 if let Some(last) = result.last() {
50 write_cursor(cursor_path, last.id())?;
51 }
52
53 Ok(result)
54}
55
56pub async fn pull_messages(
61 chat_path: &Path,
62 n: usize,
63 viewer: Option<&str>,
64) -> anyhow::Result<Vec<Message>> {
65 let clamped = n.min(200);
66 let all = history::tail(chat_path, clamped).await?;
67 let visible: Vec<Message> = all
68 .into_iter()
69 .filter(|m| match m {
70 Message::DirectMessage { user, to, .. } => viewer
71 .map(|v| v == user.as_str() || v == to.as_str())
72 .unwrap_or(true),
73 _ => true,
74 })
75 .collect();
76 Ok(visible)
77}
78
79pub async fn cmd_pull(room_id: &str, token: &str, n: usize) -> anyhow::Result<()> {
84 let username = username_from_token(room_id, token)?;
85 let meta_path = PathBuf::from(format!("/tmp/room-{room_id}.meta"));
86 let chat_path = chat_path_from_meta(room_id, &meta_path);
87
88 let messages = pull_messages(&chat_path, n, Some(&username)).await?;
89 for msg in &messages {
90 println!("{}", serde_json::to_string(msg)?);
91 }
92 Ok(())
93}
94
95pub async fn cmd_watch(room_id: &str, token: &str, interval_secs: u64) -> anyhow::Result<()> {
103 let username = username_from_token(room_id, token)?;
104 let meta_path = PathBuf::from(format!("/tmp/room-{room_id}.meta"));
105 let chat_path = chat_path_from_meta(room_id, &meta_path);
106 let cursor_path = PathBuf::from(format!("/tmp/room-{room_id}-{username}.cursor"));
107
108 loop {
109 let messages = poll_messages(&chat_path, &cursor_path, Some(&username), None).await?;
110
111 let foreign: Vec<&Message> = messages
112 .iter()
113 .filter(|m| match m {
114 Message::Message { user, .. } => user != &username,
115 Message::DirectMessage { to, .. } => to == &username,
116 _ => false,
117 })
118 .collect();
119
120 if !foreign.is_empty() {
121 for msg in foreign {
122 println!("{}", serde_json::to_string(msg)?);
123 }
124 return Ok(());
125 }
126
127 tokio::time::sleep(tokio::time::Duration::from_secs(interval_secs)).await;
128 }
129}
130
131pub async fn cmd_poll(room_id: &str, token: &str, since: Option<String>) -> anyhow::Result<()> {
135 let username = username_from_token(room_id, token)?;
136 let meta_path = PathBuf::from(format!("/tmp/room-{room_id}.meta"));
137 let chat_path = chat_path_from_meta(room_id, &meta_path);
138 let cursor_path = PathBuf::from(format!("/tmp/room-{room_id}-{username}.cursor"));
139
140 let messages =
141 poll_messages(&chat_path, &cursor_path, Some(&username), since.as_deref()).await?;
142 for msg in &messages {
143 println!("{}", serde_json::to_string(msg)?);
144 }
145 Ok(())
146}
147
148pub(super) fn chat_path_from_meta(room_id: &str, meta_path: &Path) -> PathBuf {
149 if meta_path.exists() {
150 if let Ok(data) = std::fs::read_to_string(meta_path) {
151 if let Ok(v) = serde_json::from_str::<serde_json::Value>(&data) {
152 if let Some(p) = v["chat_path"].as_str() {
153 return PathBuf::from(p);
154 }
155 }
156 }
157 }
158 history::default_chat_path(room_id)
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use crate::message::make_message;
165 use tempfile::{NamedTempFile, TempDir};
166
167 #[tokio::test]
169 async fn poll_messages_no_cursor_returns_all() {
170 let chat = NamedTempFile::new().unwrap();
171 let cursor_dir = TempDir::new().unwrap();
172 let cursor = cursor_dir.path().join("cursor");
173
174 let msg = make_message("r", "alice", "hello");
175 crate::history::append(chat.path(), &msg).await.unwrap();
176
177 let result = poll_messages(chat.path(), &cursor, None, None)
178 .await
179 .unwrap();
180 assert_eq!(result.len(), 1);
181 assert_eq!(result[0].id(), msg.id());
182 }
183
184 #[tokio::test]
186 async fn poll_messages_advances_cursor() {
187 let chat = NamedTempFile::new().unwrap();
188 let cursor_dir = TempDir::new().unwrap();
189 let cursor = cursor_dir.path().join("cursor");
190
191 let msg = make_message("r", "alice", "hello");
192 crate::history::append(chat.path(), &msg).await.unwrap();
193
194 poll_messages(chat.path(), &cursor, None, None)
195 .await
196 .unwrap();
197
198 let second = poll_messages(chat.path(), &cursor, None, None)
199 .await
200 .unwrap();
201 assert!(
202 second.is_empty(),
203 "cursor should have advanced past the first message"
204 );
205 }
206
207 #[tokio::test]
209 async fn poll_messages_filters_dms_by_viewer() {
210 use crate::message::make_dm;
211 let chat = NamedTempFile::new().unwrap();
212 let cursor_dir = TempDir::new().unwrap();
213 let cursor = cursor_dir.path().join("cursor");
214
215 let dm_alice_bob = make_dm("r", "alice", "bob", "secret");
216 let dm_alice_carol = make_dm("r", "alice", "carol", "other secret");
217 crate::history::append(chat.path(), &dm_alice_bob)
218 .await
219 .unwrap();
220 crate::history::append(chat.path(), &dm_alice_carol)
221 .await
222 .unwrap();
223
224 let result = poll_messages(chat.path(), &cursor, Some("bob"), None)
226 .await
227 .unwrap();
228 assert_eq!(result.len(), 1);
229 assert_eq!(result[0].id(), dm_alice_bob.id());
230 }
231
232 #[tokio::test]
235 async fn poll_messages_dm_to_viewer_is_not_consumed_silently() {
236 use crate::message::make_dm;
237 let chat = NamedTempFile::new().unwrap();
238 let cursor_dir = TempDir::new().unwrap();
239 let cursor = cursor_dir.path().join("cursor");
240
241 let dm = make_dm("r", "alice", "bob", "secret for bob");
243 let msg = make_message("r", "alice", "public hello");
244 crate::history::append(chat.path(), &dm).await.unwrap();
245 crate::history::append(chat.path(), &msg).await.unwrap();
246
247 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None)
249 .await
250 .unwrap();
251
252 let username = "bob";
253 let foreign: Vec<&Message> = messages
254 .iter()
255 .filter(|m| match m {
256 Message::Message { user, .. } => user != username,
257 Message::DirectMessage { to, .. } => to == username,
258 _ => false,
259 })
260 .collect();
261
262 assert_eq!(foreign.len(), 2, "watch should see DMs + foreign messages");
264 assert!(
265 foreign
266 .iter()
267 .any(|m| matches!(m, Message::DirectMessage { .. })),
268 "DM must not be silently consumed"
269 );
270 }
271
272 #[tokio::test]
274 async fn poll_messages_dm_from_viewer_excluded_from_watch() {
275 use crate::message::make_dm;
276 let chat = NamedTempFile::new().unwrap();
277 let cursor_dir = TempDir::new().unwrap();
278 let cursor = cursor_dir.path().join("cursor");
279
280 let dm = make_dm("r", "bob", "alice", "from bob");
282 crate::history::append(chat.path(), &dm).await.unwrap();
283
284 let messages = poll_messages(chat.path(), &cursor, Some("bob"), None)
285 .await
286 .unwrap();
287
288 let username = "bob";
289 let foreign: Vec<&Message> = messages
290 .iter()
291 .filter(|m| match m {
292 Message::Message { user, .. } => user != username,
293 Message::DirectMessage { to, .. } => to == username,
294 _ => false,
295 })
296 .collect();
297
298 assert!(
299 foreign.is_empty(),
300 "DMs sent by the watcher should not wake watch"
301 );
302 }
303
304 #[tokio::test]
306 async fn pull_messages_returns_tail_without_cursor_change() {
307 let chat = NamedTempFile::new().unwrap();
308 let cursor_dir = TempDir::new().unwrap();
309 let cursor = cursor_dir.path().join("cursor");
310
311 for i in 0..5u32 {
312 crate::history::append(chat.path(), &make_message("r", "u", format!("msg {i}")))
313 .await
314 .unwrap();
315 }
316
317 let pulled = pull_messages(chat.path(), 3, None).await.unwrap();
318 assert_eq!(pulled.len(), 3);
319
320 let polled = poll_messages(chat.path(), &cursor, None, None)
322 .await
323 .unwrap();
324 assert_eq!(polled.len(), 5);
325 }
326}