1use std::collections::HashMap;
22use std::path::{Path, PathBuf};
23
24use anyhow::{Context, Result};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28const BODY_PREVIEW_CHARS: usize = 120;
31
32fn self_seed() -> Option<&'static [u8; 32]> {
35 static SEED: std::sync::OnceLock<Option<[u8; 32]>> = std::sync::OnceLock::new();
36 SEED.get_or_init(|| {
37 let v = crate::config::read_private_key().ok()?;
38 let s = v.get(..32)?;
39 let mut a = [0u8; 32];
40 a.copy_from_slice(s);
41 Some(a)
42 })
43 .as_ref()
44}
45
46fn decrypt_body_for_display(signed: &Value) -> Value {
51 self_seed()
52 .and_then(|seed| {
53 let trust = crate::config::read_trust().ok()?;
54 crate::enc::wire_x25519::open_event_body(signed, &trust, seed)
55 .ok()
56 .flatten()
57 })
58 .unwrap_or_else(|| Value::String("<encrypted: cannot read>".to_string()))
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct InboxEvent {
64 pub peer: String,
65 pub event_id: String,
66 pub kind: String,
67 pub body_preview: String,
68 pub verified: bool,
69 pub timestamp: String,
70 pub raw: Value,
72}
73
74impl InboxEvent {
75 pub(crate) fn from_signed(peer: &str, signed: Value, verified: bool) -> Self {
76 let event_id = signed
77 .get("event_id")
78 .and_then(Value::as_str)
79 .unwrap_or("")
80 .to_string();
81 let kind = signed
82 .get("type")
83 .and_then(Value::as_str)
84 .map(str::to_string)
85 .unwrap_or_else(|| {
86 signed
87 .get("kind")
88 .map(|k| k.to_string())
89 .unwrap_or_default()
90 });
91 let timestamp = signed
92 .get("timestamp")
93 .and_then(Value::as_str)
94 .unwrap_or("")
95 .to_string();
96 let body_raw = if signed.get("enc").and_then(Value::as_str)
102 == Some(crate::enc::wire_x25519::ENC_DISCRIMINATOR)
103 {
104 decrypt_body_for_display(&signed)
105 } else {
106 signed.get("body").cloned().unwrap_or(Value::Null)
107 };
108 let body_str = match &body_raw {
109 Value::String(s) => s.clone(),
110 other => serde_json::to_string(other).unwrap_or_default(),
111 };
112 let body_preview: String = body_str.chars().take(BODY_PREVIEW_CHARS).collect();
113 InboxEvent {
114 peer: peer.to_string(),
115 event_id,
116 kind,
117 body_preview,
118 verified,
119 timestamp,
120 raw: signed,
121 }
122 }
123}
124
125pub struct InboxWatcher {
131 cursors: HashMap<String, u64>,
132 inbox_dir: PathBuf,
133}
134
135impl InboxWatcher {
136 pub fn from_dir_and_cursor(inbox_dir: PathBuf, cursor_path: &Path) -> Result<Self> {
141 let cursors = if cursor_path.exists() {
142 let bytes = std::fs::read(cursor_path)
143 .with_context(|| format!("reading cursor file {cursor_path:?}"))?;
144 serde_json::from_slice(&bytes).unwrap_or_default()
145 } else {
146 HashMap::new()
147 };
148 Ok(Self { cursors, inbox_dir })
149 }
150
151 pub fn from_dir_head(inbox_dir: PathBuf) -> Result<Self> {
156 let mut cursors = HashMap::new();
157 if inbox_dir.exists() {
158 for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
159 let path = entry.path();
160 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
161 continue;
162 }
163 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
164 let len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
165 cursors.insert(stem.to_string(), len);
166 }
167 }
168 }
169 Ok(Self { cursors, inbox_dir })
170 }
171
172 pub fn from_cursor_file(cursor_path: &Path) -> Result<Self> {
175 Self::from_dir_and_cursor(crate::config::inbox_dir()?, cursor_path)
176 }
177
178 pub fn from_head() -> Result<Self> {
180 Self::from_dir_head(crate::config::inbox_dir()?)
181 }
182
183 pub fn save_cursors(&self, cursor_path: &Path) -> Result<()> {
186 if let Some(parent) = cursor_path.parent() {
187 std::fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
188 }
189 let bytes = serde_json::to_vec(&self.cursors)?;
190 std::fs::write(cursor_path, bytes)
191 .with_context(|| format!("writing cursor file {cursor_path:?}"))?;
192 Ok(())
193 }
194
195 pub fn poll(&mut self) -> Result<Vec<InboxEvent>> {
200 let mut out = Vec::new();
201 if !self.inbox_dir.exists() {
202 return Ok(out);
203 }
204
205 let trust = crate::config::read_trust().unwrap_or(Value::Null);
206
207 for entry in std::fs::read_dir(&self.inbox_dir)?.flatten() {
208 let path = entry.path();
209 if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
210 continue;
211 }
212 let peer = match path.file_stem().and_then(|s| s.to_str()) {
213 Some(s) => s.to_string(),
214 None => continue,
215 };
216 let meta = match std::fs::metadata(&path) {
217 Ok(m) => m,
218 Err(_) => continue,
219 };
220 let cur_len = meta.len();
221 let start_at = *self.cursors.get(&peer).unwrap_or(&0);
222
223 if cur_len <= start_at {
224 self.cursors.insert(peer.clone(), start_at);
225 continue;
226 }
227
228 const READ_CAP: u64 = 8 * 1024 * 1024;
233 let bytes = if cur_len <= READ_CAP {
234 std::fs::read(&path)?
235 } else {
236 let mut f = std::fs::File::open(&path)?;
238 use std::io::{Read, Seek, SeekFrom};
239 f.seek(SeekFrom::Start(start_at))?;
240 let mut tail = Vec::new();
241 f.take(READ_CAP).read_to_end(&mut tail)?;
242 self.cursors
243 .insert(peer.clone(), start_at + tail.len() as u64);
244 tail
245 };
246
247 let slice: &[u8] = if cur_len <= READ_CAP {
249 &bytes[start_at as usize..]
250 } else {
251 &bytes[..]
252 };
253
254 let mut consumed: u64 = start_at;
257 let mut cursor_in_slice: usize = 0;
258 while let Some(nl) = slice[cursor_in_slice..].iter().position(|&b| b == b'\n') {
259 let line = &slice[cursor_in_slice..cursor_in_slice + nl];
260 cursor_in_slice += nl + 1;
261 consumed += (nl + 1) as u64;
262 if line.is_empty() {
263 continue;
264 }
265 let event: Value = match serde_json::from_slice(line) {
266 Ok(v) => v,
267 Err(_) => continue,
268 };
269 let verified = crate::signing::verify_message_v31(&event, &trust).is_ok();
270 out.push(InboxEvent::from_signed(&peer, event, verified));
271 }
272 self.cursors.insert(peer, consumed);
273 }
274 Ok(out)
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use std::io::Write;
282
283 fn fresh_home() -> PathBuf {
284 let pid = std::process::id();
285 let n = std::time::SystemTime::now()
286 .duration_since(std::time::UNIX_EPOCH)
287 .unwrap()
288 .subsec_nanos();
289 let path = std::env::temp_dir().join(format!("wire-watch-{pid}-{n}"));
290 let _ = std::fs::remove_dir_all(&path);
291 std::fs::create_dir_all(&path).unwrap();
292 path
293 }
294
295 fn write_event(inbox_dir: &Path, peer: &str, kind: &str, body: &str) {
298 std::fs::create_dir_all(inbox_dir).unwrap();
299 let path = inbox_dir.join(format!("{peer}.jsonl"));
300 let mut f = std::fs::OpenOptions::new()
301 .create(true)
302 .append(true)
303 .open(&path)
304 .unwrap();
305 let event = serde_json::json!({
306 "event_id": format!("test-{}-{}", peer, body.len()),
307 "from": format!("did:wire:{peer}"),
308 "to": "did:wire:self",
309 "type": kind,
310 "kind": 1,
311 "timestamp": "2026-05-10T00:00:00Z",
312 "body": body,
313 "sig": "fake",
314 });
315 writeln!(f, "{}", serde_json::to_string(&event).unwrap()).unwrap();
316 }
317
318 #[test]
319 fn from_head_starts_at_eof_skips_history() {
320 let home = fresh_home();
321 let inbox = home.join("inbox");
322 write_event(&inbox, "paul", "decision", "old event");
323 let mut w = InboxWatcher::from_dir_head(inbox.clone()).unwrap();
324 assert!(w.poll().unwrap().is_empty(), "from_head must skip history");
325 write_event(&inbox, "paul", "decision", "new event");
326 let evs = w.poll().unwrap();
327 assert_eq!(evs.len(), 1);
328 assert_eq!(evs[0].peer, "paul");
329 assert_eq!(evs[0].kind, "decision");
330 assert!(evs[0].body_preview.contains("new event"));
331 }
332
333 #[test]
334 fn cursor_file_resumes_across_restarts() {
335 let home = fresh_home();
336 let inbox = home.join("inbox");
337 let cursor = home.join("notify.cursor");
338
339 write_event(&inbox, "paul", "decision", "first");
340 let mut w1 = InboxWatcher::from_dir_and_cursor(inbox.clone(), &cursor).unwrap();
341 let evs1 = w1.poll().unwrap();
342 assert_eq!(evs1.len(), 1);
343 w1.save_cursors(&cursor).unwrap();
344 drop(w1);
345
346 write_event(&inbox, "paul", "decision", "second");
347 let mut w2 = InboxWatcher::from_dir_and_cursor(inbox, &cursor).unwrap();
348 let evs2 = w2.poll().unwrap();
349 assert_eq!(evs2.len(), 1, "should see only the new event");
350 assert!(evs2[0].body_preview.contains("second"));
351 }
352
353 #[test]
354 fn body_preview_truncated_at_limit() {
355 let home = fresh_home();
356 let inbox = home.join("inbox");
357 let body = "x".repeat(500);
358 write_event(&inbox, "paul", "decision", &body);
359 let mut w = InboxWatcher::from_dir_and_cursor(inbox, &home.join("notify.cursor")).unwrap();
360 let evs = w.poll().unwrap();
361 assert_eq!(evs[0].body_preview.chars().count(), BODY_PREVIEW_CHARS);
362 }
363
364 #[test]
365 fn multi_peer_files_handled_independently() {
366 let home = fresh_home();
367 let inbox = home.join("inbox");
368 write_event(&inbox, "paul", "decision", "p1");
369 write_event(&inbox, "willard", "decision", "w1");
370 let mut w =
371 InboxWatcher::from_dir_and_cursor(inbox.clone(), &home.join("notify.cursor")).unwrap();
372 let evs = w.poll().unwrap();
373 assert_eq!(evs.len(), 2);
374 let peers: std::collections::HashSet<_> = evs.iter().map(|e| e.peer.clone()).collect();
375 assert!(peers.contains("paul"));
376 assert!(peers.contains("willard"));
377
378 write_event(&inbox, "paul", "decision", "p2");
380 let evs2 = w.poll().unwrap();
381 assert_eq!(evs2.len(), 1);
382 assert_eq!(evs2[0].peer, "paul");
383 assert!(evs2[0].body_preview.contains("p2"));
384 }
385}