Skip to main content

wire/
inbox_watch.rs

1//! Inbox tail-watcher — the event source for both `wire notify` (OS-level
2//! toasts) and the MCP `wire://inbox/<peer>` resources.
3//!
4//! Implementation choice: polling, not OS-level inotify/FSEvents. Reasons:
5//!   - Cross-platform with zero extra deps.
6//!   - The relay daemon already polls every N seconds, so end-to-end
7//!     latency is dominated by daemon poll, not inotify-vs-stat overhead.
8//!   - JSONL files grow append-only; `metadata().len()` is the only thing
9//!     we need to check. A `stat()` syscall is ~microseconds.
10//!
11//! Cursor strategy: per-consumer. `wire notify` persists its cursor to
12//! `$WIRE_HOME/state/wire/notify.cursor` so restarts don't re-emit history.
13//! The MCP server keeps cursors in-memory (each new MCP session starts from
14//! EOF — agents that want history can call wire_tail explicitly).
15//!
16//! Event shape: `InboxEvent` contains everything the notifier or agent UI
17//! needs to render a single-line toast: peer, kind, short body preview,
18//! verified flag, event_id, timestamp. The full event is also retained so
19//! resources/read can return it unmodified.
20
21use std::collections::HashMap;
22use std::path::{Path, PathBuf};
23
24use anyhow::{Context, Result};
25use serde::{Deserialize, Serialize};
26use serde_json::Value;
27
28/// Truncate body previews to this many characters for the OS toast / chat
29/// hint. Full body is still available via `InboxEvent::raw`.
30const BODY_PREVIEW_CHARS: usize = 120;
31
32/// Our Ed25519 seed, loaded once (immutable per process). `None` if this home
33/// is uninitialized — then enc-bearing events simply render the sentinel.
34fn self_seed() -> Option<&'static [u8; 32]> {
35    static SEED: std::sync::OnceLock<Option<[u8; 32]>> = std::sync::OnceLock::new();
36    SEED.get_or_init(|| {
37        let v = crate::config::read_private_key().ok()?;
38        let s = v.get(..32)?;
39        let mut a = [0u8; 32];
40        a.copy_from_slice(s);
41        Some(a)
42    })
43    .as_ref()
44}
45
46/// Decrypt an enc-bearing event's body for DISPLAY (D1). Trust is read fresh so
47/// a newly-pinned peer's messages decrypt without a process restart. Returns a
48/// sentinel `Value` on any failure — never errors out the inbox render. The
49/// persisted event is untouched (ciphertext at rest).
50fn decrypt_body_for_display(signed: &Value) -> Value {
51    self_seed()
52        .and_then(|seed| {
53            let trust = crate::config::read_trust().ok()?;
54            crate::enc::wire_x25519::open_event_body(signed, &trust, seed)
55                .ok()
56                .flatten()
57        })
58        .unwrap_or_else(|| Value::String("<encrypted: cannot read>".to_string()))
59}
60
61/// One delivered event surfaced by a watcher.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct InboxEvent {
64    pub peer: String,
65    pub event_id: String,
66    pub kind: String,
67    pub body_preview: String,
68    pub verified: bool,
69    pub timestamp: String,
70    /// Full signed event JSON for tools that want it (e.g. MCP resources/read).
71    pub raw: Value,
72}
73
74impl InboxEvent {
75    pub(crate) fn from_signed(peer: &str, signed: Value, verified: bool) -> Self {
76        let event_id = signed
77            .get("event_id")
78            .and_then(Value::as_str)
79            .unwrap_or("")
80            .to_string();
81        let kind = signed
82            .get("type")
83            .and_then(Value::as_str)
84            .map(str::to_string)
85            .unwrap_or_else(|| {
86                signed
87                    .get("kind")
88                    .map(|k| k.to_string())
89                    .unwrap_or_default()
90            });
91        let timestamp = signed
92            .get("timestamp")
93            .and_then(Value::as_str)
94            .unwrap_or("")
95            .to_string();
96        // D1 (RFC-006): decrypt for DISPLAY only — the persisted `raw` event
97        // (and at-rest inbox JSONL) stays the verbatim ciphertext, preserving
98        // event_id/signature integrity. `open_event_body` re-verifies the
99        // signature before decrypting (structural gate) and binds from/to from
100        // the event. Only enc-bearing events pay the trust read.
101        let body_raw = if signed.get("enc").and_then(Value::as_str)
102            == Some(crate::enc::wire_x25519::ENC_DISCRIMINATOR)
103        {
104            decrypt_body_for_display(&signed)
105        } else {
106            signed.get("body").cloned().unwrap_or(Value::Null)
107        };
108        let body_str = match &body_raw {
109            Value::String(s) => s.clone(),
110            other => serde_json::to_string(other).unwrap_or_default(),
111        };
112        let body_preview: String = body_str.chars().take(BODY_PREVIEW_CHARS).collect();
113        InboxEvent {
114            peer: peer.to_string(),
115            event_id,
116            kind,
117            body_preview,
118            verified,
119            timestamp,
120            raw: signed,
121        }
122    }
123}
124
125/// Polling watcher for the inbox directory.
126///
127/// Tracks one cursor per `<peer>.jsonl` file. `poll()` is a single sweep —
128/// callers wrap in their own loop with whatever interval makes sense (sub-
129/// second for OS toast latency, longer for batchy use cases).
130pub struct InboxWatcher {
131    cursors: HashMap<String, u64>,
132    inbox_dir: PathBuf,
133}
134
135impl InboxWatcher {
136    /// Watcher with explicit inbox dir + cursor-from-file. Resumes from saved
137    /// per-peer cursors; new peer files emit from byte 0 the first time
138    /// they're seen, so the operator never misses an event between daemon
139    /// writes and notifier restart.
140    pub fn from_dir_and_cursor(inbox_dir: PathBuf, cursor_path: &Path) -> Result<Self> {
141        let cursors = if cursor_path.exists() {
142            let bytes = std::fs::read(cursor_path)
143                .with_context(|| format!("reading cursor file {cursor_path:?}"))?;
144            serde_json::from_slice(&bytes).unwrap_or_default()
145        } else {
146            HashMap::new()
147        };
148        Ok(Self { cursors, inbox_dir })
149    }
150
151    /// Watcher with explicit inbox dir, starting from EOF on every peer
152    /// file that exists at construction time. Used by MCP — agents that want
153    /// history call wire_tail. Peer files created AFTER construction emit
154    /// from byte 0 (they represent new conversations starting).
155    pub fn from_dir_head(inbox_dir: PathBuf) -> Result<Self> {
156        let mut cursors = HashMap::new();
157        if inbox_dir.exists() {
158            for entry in std::fs::read_dir(&inbox_dir)?.flatten() {
159                let path = entry.path();
160                if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
161                    continue;
162                }
163                if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
164                    let len = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
165                    cursors.insert(stem.to_string(), len);
166                }
167            }
168        }
169        Ok(Self { cursors, inbox_dir })
170    }
171
172    /// Convenience: use the configured wire inbox dir + cursor at the given
173    /// path. Equivalent to `from_dir_and_cursor(config::inbox_dir()?, cursor_path)`.
174    pub fn from_cursor_file(cursor_path: &Path) -> Result<Self> {
175        Self::from_dir_and_cursor(crate::config::inbox_dir()?, cursor_path)
176    }
177
178    /// Convenience: configured inbox dir, fresh from EOF.
179    pub fn from_head() -> Result<Self> {
180        Self::from_dir_head(crate::config::inbox_dir()?)
181    }
182
183    /// Persist cursors to disk so a restart of `wire notify` doesn't re-emit
184    /// already-seen events. JSON shape: `{"peer1": 1234, "peer2": 5678}`.
185    pub fn save_cursors(&self, cursor_path: &Path) -> Result<()> {
186        if let Some(parent) = cursor_path.parent() {
187            std::fs::create_dir_all(parent).with_context(|| format!("creating {parent:?}"))?;
188        }
189        let bytes = serde_json::to_vec(&self.cursors)?;
190        std::fs::write(cursor_path, bytes)
191            .with_context(|| format!("writing cursor file {cursor_path:?}"))?;
192        Ok(())
193    }
194
195    /// Single poll sweep. Returns all new events across all peer inbox files
196    /// since the previous sweep. Events are re-verified against the current
197    /// trust state — `verified: false` events are still returned (caller
198    /// decides whether to notify), but the flag is honest.
199    pub fn poll(&mut self) -> Result<Vec<InboxEvent>> {
200        let mut out = Vec::new();
201        if !self.inbox_dir.exists() {
202            return Ok(out);
203        }
204
205        let trust = crate::config::read_trust().unwrap_or(Value::Null);
206
207        for entry in std::fs::read_dir(&self.inbox_dir)?.flatten() {
208            let path = entry.path();
209            if path.extension().and_then(|x| x.to_str()) != Some("jsonl") {
210                continue;
211            }
212            let peer = match path.file_stem().and_then(|s| s.to_str()) {
213                Some(s) => s.to_string(),
214                None => continue,
215            };
216            let meta = match std::fs::metadata(&path) {
217                Ok(m) => m,
218                Err(_) => continue,
219            };
220            let cur_len = meta.len();
221            let start_at = *self.cursors.get(&peer).unwrap_or(&0);
222
223            if cur_len <= start_at {
224                self.cursors.insert(peer.clone(), start_at);
225                continue;
226            }
227
228            // Read the full file rather than seeking — peer JSONL files are
229            // small (one DM channel) and reading is simpler than mid-line
230            // recovery on a partial write. Cap at 8 MiB to avoid runaway
231            // memory on a misbehaving daemon.
232            const READ_CAP: u64 = 8 * 1024 * 1024;
233            let bytes = if cur_len <= READ_CAP {
234                std::fs::read(&path)?
235            } else {
236                // Skip what we've seen; only read the tail.
237                let mut f = std::fs::File::open(&path)?;
238                use std::io::{Read, Seek, SeekFrom};
239                f.seek(SeekFrom::Start(start_at))?;
240                let mut tail = Vec::new();
241                f.take(READ_CAP).read_to_end(&mut tail)?;
242                self.cursors
243                    .insert(peer.clone(), start_at + tail.len() as u64);
244                tail
245            };
246
247            // Slice from start_at if we're reading whole file.
248            let slice: &[u8] = if cur_len <= READ_CAP {
249                &bytes[start_at as usize..]
250            } else {
251                &bytes[..]
252            };
253
254            // Track the last fully-parsed byte offset so a partial trailing
255            // line (writer mid-flight) doesn't get prematurely consumed.
256            let mut consumed: u64 = start_at;
257            let mut cursor_in_slice: usize = 0;
258            while let Some(nl) = slice[cursor_in_slice..].iter().position(|&b| b == b'\n') {
259                let line = &slice[cursor_in_slice..cursor_in_slice + nl];
260                cursor_in_slice += nl + 1;
261                consumed += (nl + 1) as u64;
262                if line.is_empty() {
263                    continue;
264                }
265                let event: Value = match serde_json::from_slice(line) {
266                    Ok(v) => v,
267                    Err(_) => continue,
268                };
269                let verified = crate::signing::verify_message_v31(&event, &trust).is_ok();
270                out.push(InboxEvent::from_signed(&peer, event, verified));
271            }
272            self.cursors.insert(peer, consumed);
273        }
274        Ok(out)
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use std::io::Write;
282
283    fn fresh_home() -> PathBuf {
284        let pid = std::process::id();
285        let n = std::time::SystemTime::now()
286            .duration_since(std::time::UNIX_EPOCH)
287            .unwrap()
288            .subsec_nanos();
289        let path = std::env::temp_dir().join(format!("wire-watch-{pid}-{n}"));
290        let _ = std::fs::remove_dir_all(&path);
291        std::fs::create_dir_all(&path).unwrap();
292        path
293    }
294
295    // Tests use explicit inbox dirs so they don't race on WIRE_HOME env var
296    // (cargo runs unit tests in parallel — env mutation is process-global).
297    fn write_event(inbox_dir: &Path, peer: &str, kind: &str, body: &str) {
298        std::fs::create_dir_all(inbox_dir).unwrap();
299        let path = inbox_dir.join(format!("{peer}.jsonl"));
300        let mut f = std::fs::OpenOptions::new()
301            .create(true)
302            .append(true)
303            .open(&path)
304            .unwrap();
305        let event = serde_json::json!({
306            "event_id": format!("test-{}-{}", peer, body.len()),
307            "from": format!("did:wire:{peer}"),
308            "to": "did:wire:self",
309            "type": kind,
310            "kind": 1,
311            "timestamp": "2026-05-10T00:00:00Z",
312            "body": body,
313            "sig": "fake",
314        });
315        writeln!(f, "{}", serde_json::to_string(&event).unwrap()).unwrap();
316    }
317
318    #[test]
319    fn from_head_starts_at_eof_skips_history() {
320        let home = fresh_home();
321        let inbox = home.join("inbox");
322        write_event(&inbox, "paul", "decision", "old event");
323        let mut w = InboxWatcher::from_dir_head(inbox.clone()).unwrap();
324        assert!(w.poll().unwrap().is_empty(), "from_head must skip history");
325        write_event(&inbox, "paul", "decision", "new event");
326        let evs = w.poll().unwrap();
327        assert_eq!(evs.len(), 1);
328        assert_eq!(evs[0].peer, "paul");
329        assert_eq!(evs[0].kind, "decision");
330        assert!(evs[0].body_preview.contains("new event"));
331    }
332
333    #[test]
334    fn cursor_file_resumes_across_restarts() {
335        let home = fresh_home();
336        let inbox = home.join("inbox");
337        let cursor = home.join("notify.cursor");
338
339        write_event(&inbox, "paul", "decision", "first");
340        let mut w1 = InboxWatcher::from_dir_and_cursor(inbox.clone(), &cursor).unwrap();
341        let evs1 = w1.poll().unwrap();
342        assert_eq!(evs1.len(), 1);
343        w1.save_cursors(&cursor).unwrap();
344        drop(w1);
345
346        write_event(&inbox, "paul", "decision", "second");
347        let mut w2 = InboxWatcher::from_dir_and_cursor(inbox, &cursor).unwrap();
348        let evs2 = w2.poll().unwrap();
349        assert_eq!(evs2.len(), 1, "should see only the new event");
350        assert!(evs2[0].body_preview.contains("second"));
351    }
352
353    #[test]
354    fn body_preview_truncated_at_limit() {
355        let home = fresh_home();
356        let inbox = home.join("inbox");
357        let body = "x".repeat(500);
358        write_event(&inbox, "paul", "decision", &body);
359        let mut w = InboxWatcher::from_dir_and_cursor(inbox, &home.join("notify.cursor")).unwrap();
360        let evs = w.poll().unwrap();
361        assert_eq!(evs[0].body_preview.chars().count(), BODY_PREVIEW_CHARS);
362    }
363
364    #[test]
365    fn multi_peer_files_handled_independently() {
366        let home = fresh_home();
367        let inbox = home.join("inbox");
368        write_event(&inbox, "paul", "decision", "p1");
369        write_event(&inbox, "willard", "decision", "w1");
370        let mut w =
371            InboxWatcher::from_dir_and_cursor(inbox.clone(), &home.join("notify.cursor")).unwrap();
372        let evs = w.poll().unwrap();
373        assert_eq!(evs.len(), 2);
374        let peers: std::collections::HashSet<_> = evs.iter().map(|e| e.peer.clone()).collect();
375        assert!(peers.contains("paul"));
376        assert!(peers.contains("willard"));
377
378        // Add to one peer; only that one shows up
379        write_event(&inbox, "paul", "decision", "p2");
380        let evs2 = w.poll().unwrap();
381        assert_eq!(evs2.len(), 1);
382        assert_eq!(evs2[0].peer, "paul");
383        assert!(evs2[0].body_preview.contains("p2"));
384    }
385}