1use std::collections::HashMap;
22use std::path::{Path, PathBuf};
23
24use anyhow::{Context, Result};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28const BODY_PREVIEW_CHARS: usize = 120;
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct InboxEvent {
35 pub peer: String,
36 pub event_id: String,
37 pub kind: String,
38 pub body_preview: String,
39 pub verified: bool,
40 pub timestamp: String,
41 pub raw: Value,
43}
44
45impl InboxEvent {
46 pub(crate) fn from_signed(peer: &str, signed: Value, verified: bool) -> Self {
47 let event_id = signed
48 .get("event_id")
49 .and_then(Value::as_str)
50 .unwrap_or("")
51 .to_string();
52 let kind = signed
53 .get("type")
54 .and_then(Value::as_str)
55 .map(str::to_string)
56 .unwrap_or_else(|| {
57 signed
58 .get("kind")
59 .map(|k| k.to_string())
60 .unwrap_or_default()
61 });
62 let timestamp = signed
63 .get("timestamp")
64 .and_then(Value::as_str)
65 .unwrap_or("")
66 .to_string();
67 let body_raw = signed.get("body").cloned().unwrap_or(Value::Null);
68 let body_str = match &body_raw {
69 Value::String(s) => s.clone(),
70 other => serde_json::to_string(other).unwrap_or_default(),
71 };
72 let body_preview: String = body_str.chars().take(BODY_PREVIEW_CHARS).collect();
73 InboxEvent {
74 peer: peer.to_string(),
75 event_id,
76 kind,
77 body_preview,
78 verified,
79 timestamp,
80 raw: signed,
81 }
82 }
83}
84
85pub struct InboxWatcher {
91 cursors: HashMap<String, u64>,
92 inbox_dir: PathBuf,
93}
94
95impl InboxWatcher {
96 pub fn from_dir_and_cursor(inbox_dir: PathBuf, cursor_path: &Path) -> Result<Self> {
101 let cursors = if cursor_path.exists() {
102 let bytes = std::fs::read(cursor_path)
103 .with_context(|| format!("reading cursor file {cursor_path:?}"))?;
104 serde_json::from_slice(&bytes).unwrap_or_default()
105 } else {
106 HashMap::new()
107 };
108 Ok(Self { cursors, inbox_dir })
109 }
110
111 pub fn from_dir_head(inbox_dir: PathBuf) -> Result<Self> {
116 let mut cursors = HashMap::new();
117 if inbox_dir.exists() {
118 for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
119 let path = entry.path();
120 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
121 continue;
122 }
123 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
124 let len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
125 cursors.insert(stem.to_string(), len);
126 }
127 }
128 }
129 Ok(Self { cursors, inbox_dir })
130 }
131
132 pub fn from_cursor_file(cursor_path: &Path) -> Result<Self> {
135 Self::from_dir_and_cursor(crate::config::inbox_dir()?, cursor_path)
136 }
137
138 pub fn from_head() -> Result<Self> {
140 Self::from_dir_head(crate::config::inbox_dir()?)
141 }
142
143 pub fn save_cursors(&self, cursor_path: &Path) -> Result<()> {
146 if let Some(parent) = cursor_path.parent() {
147 std::fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
148 }
149 let bytes = serde_json::to_vec(&self.cursors)?;
150 std::fs::write(cursor_path, bytes)
151 .with_context(|| format!("writing cursor file {cursor_path:?}"))?;
152 Ok(())
153 }
154
155 pub fn poll(&mut self) -> Result<Vec<InboxEvent>> {
160 let mut out = Vec::new();
161 if !self.inbox_dir.exists() {
162 return Ok(out);
163 }
164
165 let trust = crate::config::read_trust().unwrap_or(Value::Null);
166
167 for entry in std::fs::read_dir(&self.inbox_dir)?.flatten() {
168 let path = entry.path();
169 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
170 continue;
171 }
172 let peer = match path.file_stem().and_then(|s| s.to_str()) {
173 Some(s) => s.to_string(),
174 None => continue,
175 };
176 let meta = match std::fs::metadata(&path) {
177 Ok(m) => m,
178 Err(_) => continue,
179 };
180 let cur_len = meta.len();
181 let start_at = *self.cursors.get(&peer).unwrap_or(&0);
182
183 if cur_len <= start_at {
184 self.cursors.insert(peer.clone(), start_at);
185 continue;
186 }
187
188 const READ_CAP: u64 = 8 * 1024 * 1024;
193 let bytes = if cur_len <= READ_CAP {
194 std::fs::read(&path)?
195 } else {
196 let mut f = std::fs::File::open(&path)?;
198 use std::io::{Read, Seek, SeekFrom};
199 f.seek(SeekFrom::Start(start_at))?;
200 let mut tail = Vec::new();
201 f.take(READ_CAP).read_to_end(&mut tail)?;
202 self.cursors
203 .insert(peer.clone(), start_at + tail.len() as u64);
204 tail
205 };
206
207 let slice: &[u8] = if cur_len <= READ_CAP {
209 &bytes[start_at as usize..]
210 } else {
211 &bytes[..]
212 };
213
214 let mut consumed: u64 = start_at;
217 let mut cursor_in_slice: usize = 0;
218 while let Some(nl) = slice[cursor_in_slice..].iter().position(|&b| b == b'\n') {
219 let line = &slice[cursor_in_slice..cursor_in_slice + nl];
220 cursor_in_slice += nl + 1;
221 consumed += (nl + 1) as u64;
222 if line.is_empty() {
223 continue;
224 }
225 let event: Value = match serde_json::from_slice(line) {
226 Ok(v) => v,
227 Err(_) => continue,
228 };
229 let verified = crate::signing::verify_message_v31(&event, &trust).is_ok();
230 out.push(InboxEvent::from_signed(&peer, event, verified));
231 }
232 self.cursors.insert(peer, consumed);
233 }
234 Ok(out)
235 }
236}
237
238#[cfg(test)]
239mod tests {
240 use super::*;
241 use std::io::Write;
242
243 fn fresh_home() -> PathBuf {
244 let pid = std::process::id();
245 let n = std::time::SystemTime::now()
246 .duration_since(std::time::UNIX_EPOCH)
247 .unwrap()
248 .subsec_nanos();
249 let path = std::env::temp_dir().join(format!("wire-watch-{pid}-{n}"));
250 let _ = std::fs::remove_dir_all(&path);
251 std::fs::create_dir_all(&path).unwrap();
252 path
253 }
254
255 fn write_event(inbox_dir: &Path, peer: &str, kind: &str, body: &str) {
258 std::fs::create_dir_all(inbox_dir).unwrap();
259 let path = inbox_dir.join(format!("{peer}.jsonl"));
260 let mut f = std::fs::OpenOptions::new()
261 .create(true)
262 .append(true)
263 .open(&path)
264 .unwrap();
265 let event = serde_json::json!({
266 "event_id": format!("test-{}-{}", peer, body.len()),
267 "from": format!("did:wire:{peer}"),
268 "to": "did:wire:self",
269 "type": kind,
270 "kind": 1,
271 "timestamp": "2026-05-10T00:00:00Z",
272 "body": body,
273 "sig": "fake",
274 });
275 writeln!(f, "{}", serde_json::to_string(&event).unwrap()).unwrap();
276 }
277
278 #[test]
279 fn from_head_starts_at_eof_skips_history() {
280 let home = fresh_home();
281 let inbox = home.join("inbox");
282 write_event(&inbox, "paul", "decision", "old event");
283 let mut w = InboxWatcher::from_dir_head(inbox.clone()).unwrap();
284 assert!(w.poll().unwrap().is_empty(), "from_head must skip history");
285 write_event(&inbox, "paul", "decision", "new event");
286 let evs = w.poll().unwrap();
287 assert_eq!(evs.len(), 1);
288 assert_eq!(evs[0].peer, "paul");
289 assert_eq!(evs[0].kind, "decision");
290 assert!(evs[0].body_preview.contains("new event"));
291 }
292
293 #[test]
294 fn cursor_file_resumes_across_restarts() {
295 let home = fresh_home();
296 let inbox = home.join("inbox");
297 let cursor = home.join("notify.cursor");
298
299 write_event(&inbox, "paul", "decision", "first");
300 let mut w1 = InboxWatcher::from_dir_and_cursor(inbox.clone(), &cursor).unwrap();
301 let evs1 = w1.poll().unwrap();
302 assert_eq!(evs1.len(), 1);
303 w1.save_cursors(&cursor).unwrap();
304 drop(w1);
305
306 write_event(&inbox, "paul", "decision", "second");
307 let mut w2 = InboxWatcher::from_dir_and_cursor(inbox, &cursor).unwrap();
308 let evs2 = w2.poll().unwrap();
309 assert_eq!(evs2.len(), 1, "should see only the new event");
310 assert!(evs2[0].body_preview.contains("second"));
311 }
312
313 #[test]
314 fn body_preview_truncated_at_limit() {
315 let home = fresh_home();
316 let inbox = home.join("inbox");
317 let body = "x".repeat(500);
318 write_event(&inbox, "paul", "decision", &body);
319 let mut w = InboxWatcher::from_dir_and_cursor(inbox, &home.join("notify.cursor")).unwrap();
320 let evs = w.poll().unwrap();
321 assert_eq!(evs[0].body_preview.chars().count(), BODY_PREVIEW_CHARS);
322 }
323
324 #[test]
325 fn multi_peer_files_handled_independently() {
326 let home = fresh_home();
327 let inbox = home.join("inbox");
328 write_event(&inbox, "paul", "decision", "p1");
329 write_event(&inbox, "willard", "decision", "w1");
330 let mut w =
331 InboxWatcher::from_dir_and_cursor(inbox.clone(), &home.join("notify.cursor")).unwrap();
332 let evs = w.poll().unwrap();
333 assert_eq!(evs.len(), 2);
334 let peers: std::collections::HashSet<_> = evs.iter().map(|e| e.peer.clone()).collect();
335 assert!(peers.contains("paul"));
336 assert!(peers.contains("willard"));
337
338 write_event(&inbox, "paul", "decision", "p2");
340 let evs2 = w.poll().unwrap();
341 assert_eq!(evs2.len(), 1);
342 assert_eq!(evs2[0].peer, "paul");
343 assert!(evs2[0].body_preview.contains("p2"));
344 }
345}