Skip to main content

wire/
inbox_watch.rs

1//! Inbox tail-watcher — the event source for both `wire notify` (OS-level
2//! toasts) and the MCP `wire://inbox/<peer>` resources.
3//!
4//! Implementation choice: polling, not OS-level inotify/FSEvents. Reasons:
5//!   - Cross-platform with zero extra deps.
6//!   - The relay daemon already polls every N seconds, so end-to-end
7//!     latency is dominated by daemon poll, not inotify-vs-stat overhead.
8//!   - JSONL files grow append-only; `metadata().len()` is the only thing
9//!     we need to check. A `stat()` syscall is ~microseconds.
10//!
11//! Cursor strategy: per-consumer. `wire notify` persists its cursor to
12//! `$WIRE_HOME/state/wire/notify.cursor` so restarts don't re-emit history.
13//! The MCP server keeps cursors in-memory (each new MCP session starts from
14//! EOF — agents that want history can call wire_tail explicitly).
15//!
16//! Event shape: `InboxEvent` contains everything the notifier or agent UI
17//! needs to render a single-line toast: peer, kind, short body preview,
18//! verified flag, event_id, timestamp. The full event is also retained so
19//! resources/read can return it unmodified.
20
21use std::collections::HashMap;
22use std::path::{Path, PathBuf};
23
24use anyhow::{Context, Result};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28/// Truncate body previews to this many characters for the OS toast / chat
29/// hint. Full body is still available via `InboxEvent::raw`.
30const BODY_PREVIEW_CHARS: usize = 120;
31
32/// One delivered event surfaced by a watcher.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct InboxEvent {
35    pub peer: String,
36    pub event_id: String,
37    pub kind: String,
38    pub body_preview: String,
39    pub verified: bool,
40    pub timestamp: String,
41    /// Full signed event JSON for tools that want it (e.g. MCP resources/read).
42    pub raw: Value,
43}
44
45impl InboxEvent {
46    pub(crate) fn from_signed(peer: &str, signed: Value, verified: bool) -> Self {
47        let event_id = signed
48            .get("event_id")
49            .and_then(Value::as_str)
50            .unwrap_or("")
51            .to_string();
52        let kind = signed
53            .get("type")
54            .and_then(Value::as_str)
55            .map(str::to_string)
56            .unwrap_or_else(|| {
57                signed
58                    .get("kind")
59                    .map(|k| k.to_string())
60                    .unwrap_or_default()
61            });
62        let timestamp = signed
63            .get("timestamp")
64            .and_then(Value::as_str)
65            .unwrap_or("")
66            .to_string();
67        let body_raw = signed.get("body").cloned().unwrap_or(Value::Null);
68        let body_str = match &body_raw {
69            Value::String(s) => s.clone(),
70            other => serde_json::to_string(other).unwrap_or_default(),
71        };
72        let body_preview: String = body_str.chars().take(BODY_PREVIEW_CHARS).collect();
73        InboxEvent {
74            peer: peer.to_string(),
75            event_id,
76            kind,
77            body_preview,
78            verified,
79            timestamp,
80            raw: signed,
81        }
82    }
83}
84
85/// Polling watcher for the inbox directory.
86///
87/// Tracks one cursor per `<peer>.jsonl` file. `poll()` is a single sweep —
88/// callers wrap in their own loop with whatever interval makes sense (sub-
89/// second for OS toast latency, longer for batchy use cases).
90pub struct InboxWatcher {
91    cursors: HashMap<String, u64>,
92    inbox_dir: PathBuf,
93}
94
95impl InboxWatcher {
96    /// Watcher with explicit inbox dir + cursor-from-file. Resumes from saved
97    /// per-peer cursors; new peer files emit from byte 0 the first time
98    /// they're seen, so the operator never misses an event between daemon
99    /// writes and notifier restart.
100    pub fn from_dir_and_cursor(inbox_dir: PathBuf, cursor_path: &Path) -> Result<Self> {
101        let cursors = if cursor_path.exists() {
102            let bytes = std::fs::read(cursor_path)
103                .with_context(|| format!("reading cursor file {cursor_path:?}"))?;
104            serde_json::from_slice(&bytes).unwrap_or_default()
105        } else {
106            HashMap::new()
107        };
108        Ok(Self { cursors, inbox_dir })
109    }
110
111    /// Watcher with explicit inbox dir, starting from EOF on every peer
112    /// file that exists at construction time. Used by MCP — agents that want
113    /// history call wire_tail. Peer files created AFTER construction emit
114    /// from byte 0 (they represent new conversations starting).
115    pub fn from_dir_head(inbox_dir: PathBuf) -> Result<Self> {
116        let mut cursors = HashMap::new();
117        if inbox_dir.exists() {
118            for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
119                let path = entry.path();
120                if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
121                    continue;
122                }
123                if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
124                    let len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
125                    cursors.insert(stem.to_string(), len);
126                }
127            }
128        }
129        Ok(Self { cursors, inbox_dir })
130    }
131
132    /// Convenience: use the configured wire inbox dir + cursor at the given
133    /// path. Equivalent to `from_dir_and_cursor(config::inbox_dir()?, cursor_path)`.
134    pub fn from_cursor_file(cursor_path: &Path) -> Result<Self> {
135        Self::from_dir_and_cursor(crate::config::inbox_dir()?, cursor_path)
136    }
137
138    /// Convenience: configured inbox dir, fresh from EOF.
139    pub fn from_head() -> Result<Self> {
140        Self::from_dir_head(crate::config::inbox_dir()?)
141    }
142
143    /// Persist cursors to disk so a restart of `wire notify` doesn't re-emit
144    /// already-seen events. JSON shape: `{"peer1": 1234, "peer2": 5678}`.
145    pub fn save_cursors(&self, cursor_path: &Path) -> Result<()> {
146        if let Some(parent) = cursor_path.parent() {
147            std::fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
148        }
149        let bytes = serde_json::to_vec(&self.cursors)?;
150        std::fs::write(cursor_path, bytes)
151            .with_context(|| format!("writing cursor file {cursor_path:?}"))?;
152        Ok(())
153    }
154
155    /// Single poll sweep. Returns all new events across all peer inbox files
156    /// since the previous sweep. Events are re-verified against the current
157    /// trust state — `verified: false` events are still returned (caller
158    /// decides whether to notify), but the flag is honest.
159    pub fn poll(&mut self) -> Result<Vec<InboxEvent>> {
160        let mut out = Vec::new();
161        if !self.inbox_dir.exists() {
162            return Ok(out);
163        }
164
165        let trust = crate::config::read_trust().unwrap_or(Value::Null);
166
167        for entry in std::fs::read_dir(&self.inbox_dir)?.flatten() {
168            let path = entry.path();
169            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
170                continue;
171            }
172            let peer = match path.file_stem().and_then(|s| s.to_str()) {
173                Some(s) => s.to_string(),
174                None => continue,
175            };
176            let meta = match std::fs::metadata(&path) {
177                Ok(m) => m,
178                Err(_) => continue,
179            };
180            let cur_len = meta.len();
181            let start_at = *self.cursors.get(&peer).unwrap_or(&0);
182
183            if cur_len <= start_at {
184                self.cursors.insert(peer.clone(), start_at);
185                continue;
186            }
187
188            // Read the full file rather than seeking — peer JSONL files are
189            // small (one DM channel) and reading is simpler than mid-line
190            // recovery on a partial write. Cap at 8 MiB to avoid runaway
191            // memory on a misbehaving daemon.
192            const READ_CAP: u64 = 8 * 1024 * 1024;
193            let bytes = if cur_len <= READ_CAP {
194                std::fs::read(&path)?
195            } else {
196                // Skip what we've seen; only read the tail.
197                let mut f = std::fs::File::open(&path)?;
198                use std::io::{Read, Seek, SeekFrom};
199                f.seek(SeekFrom::Start(start_at))?;
200                let mut tail = Vec::new();
201                f.take(READ_CAP).read_to_end(&mut tail)?;
202                self.cursors
203                    .insert(peer.clone(), start_at + tail.len() as u64);
204                tail
205            };
206
207            // Slice from start_at if we're reading whole file.
208            let slice: &[u8] = if cur_len <= READ_CAP {
209                &bytes[start_at as usize..]
210            } else {
211                &bytes[..]
212            };
213
214            // Track the last fully-parsed byte offset so a partial trailing
215            // line (writer mid-flight) doesn't get prematurely consumed.
216            let mut consumed: u64 = start_at;
217            let mut cursor_in_slice: usize = 0;
218            while let Some(nl) = slice[cursor_in_slice..].iter().position(|&b| b == b'\n') {
219                let line = &slice[cursor_in_slice..cursor_in_slice + nl];
220                cursor_in_slice += nl + 1;
221                consumed += (nl + 1) as u64;
222                if line.is_empty() {
223                    continue;
224                }
225                let event: Value = match serde_json::from_slice(line) {
226                    Ok(v) => v,
227                    Err(_) => continue,
228                };
229                let verified = crate::signing::verify_message_v31(&event, &trust).is_ok();
230                out.push(InboxEvent::from_signed(&peer, event, verified));
231            }
232            self.cursors.insert(peer, consumed);
233        }
234        Ok(out)
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use super::*;
241    use std::io::Write;
242
243    fn fresh_home() -> PathBuf {
244        let pid = std::process::id();
245        let n = std::time::SystemTime::now()
246            .duration_since(std::time::UNIX_EPOCH)
247            .unwrap()
248            .subsec_nanos();
249        let path = std::env::temp_dir().join(format!("wire-watch-{pid}-{n}"));
250        let _ = std::fs::remove_dir_all(&path);
251        std::fs::create_dir_all(&path).unwrap();
252        path
253    }
254
255    // Tests use explicit inbox dirs so they don't race on WIRE_HOME env var
256    // (cargo runs unit tests in parallel — env mutation is process-global).
257    fn write_event(inbox_dir: &Path, peer: &str, kind: &str, body: &str) {
258        std::fs::create_dir_all(inbox_dir).unwrap();
259        let path = inbox_dir.join(format!("{peer}.jsonl"));
260        let mut f = std::fs::OpenOptions::new()
261            .create(true)
262            .append(true)
263            .open(&path)
264            .unwrap();
265        let event = serde_json::json!({
266            "event_id": format!("test-{}-{}", peer, body.len()),
267            "from": format!("did:wire:{peer}"),
268            "to": "did:wire:self",
269            "type": kind,
270            "kind": 1,
271            "timestamp": "2026-05-10T00:00:00Z",
272            "body": body,
273            "sig": "fake",
274        });
275        writeln!(f, "{}", serde_json::to_string(&event).unwrap()).unwrap();
276    }
277
278    #[test]
279    fn from_head_starts_at_eof_skips_history() {
280        let home = fresh_home();
281        let inbox = home.join("inbox");
282        write_event(&inbox, "paul", "decision", "old event");
283        let mut w = InboxWatcher::from_dir_head(inbox.clone()).unwrap();
284        assert!(w.poll().unwrap().is_empty(), "from_head must skip history");
285        write_event(&inbox, "paul", "decision", "new event");
286        let evs = w.poll().unwrap();
287        assert_eq!(evs.len(), 1);
288        assert_eq!(evs[0].peer, "paul");
289        assert_eq!(evs[0].kind, "decision");
290        assert!(evs[0].body_preview.contains("new event"));
291    }
292
293    #[test]
294    fn cursor_file_resumes_across_restarts() {
295        let home = fresh_home();
296        let inbox = home.join("inbox");
297        let cursor = home.join("notify.cursor");
298
299        write_event(&inbox, "paul", "decision", "first");
300        let mut w1 = InboxWatcher::from_dir_and_cursor(inbox.clone(), &cursor).unwrap();
301        let evs1 = w1.poll().unwrap();
302        assert_eq!(evs1.len(), 1);
303        w1.save_cursors(&cursor).unwrap();
304        drop(w1);
305
306        write_event(&inbox, "paul", "decision", "second");
307        let mut w2 = InboxWatcher::from_dir_and_cursor(inbox, &cursor).unwrap();
308        let evs2 = w2.poll().unwrap();
309        assert_eq!(evs2.len(), 1, "should see only the new event");
310        assert!(evs2[0].body_preview.contains("second"));
311    }
312
313    #[test]
314    fn body_preview_truncated_at_limit() {
315        let home = fresh_home();
316        let inbox = home.join("inbox");
317        let body = "x".repeat(500);
318        write_event(&inbox, "paul", "decision", &body);
319        let mut w = InboxWatcher::from_dir_and_cursor(inbox, &home.join("notify.cursor")).unwrap();
320        let evs = w.poll().unwrap();
321        assert_eq!(evs[0].body_preview.chars().count(), BODY_PREVIEW_CHARS);
322    }
323
324    #[test]
325    fn multi_peer_files_handled_independently() {
326        let home = fresh_home();
327        let inbox = home.join("inbox");
328        write_event(&inbox, "paul", "decision", "p1");
329        write_event(&inbox, "willard", "decision", "w1");
330        let mut w =
331            InboxWatcher::from_dir_and_cursor(inbox.clone(), &home.join("notify.cursor")).unwrap();
332        let evs = w.poll().unwrap();
333        assert_eq!(evs.len(), 2);
334        let peers: std::collections::HashSet<_> = evs.iter().map(|e| e.peer.clone()).collect();
335        assert!(peers.contains("paul"));
336        assert!(peers.contains("willard"));
337
338        // Add to one peer; only that one shows up
339        write_event(&inbox, "paul", "decision", "p2");
340        let evs2 = w.poll().unwrap();
341        assert_eq!(evs2.len(), 1);
342        assert_eq!(evs2[0].peer, "paul");
343        assert!(evs2[0].body_preview.contains("p2"));
344    }
345}