Skip to main content

wire/
pull.rs

1//! Pull-event processing — pure logic shared by `wire pull` and the daemon
2//! sync loop.
3//!
4//! P0.1 (0.5.11): refuse to silently advance cursor past events the running
5//! binary cannot process. The cursor only advances to the last event in the
6//! contiguous prefix that was either successfully written or rejected for a
7//! TERMINAL reason. Events rejected for TRANSIENT reasons (unknown kind,
8//! signer not yet pinned) block the cursor — so the next pull re-sees them
9//! and a future binary version or freshly-pinned peer can pick up where we
10//! left off.
11//!
12//! Without this rule, an old daemon running against a newer relay silently
13//! ate v0.5.x `pair_drop` events (kind=1100) it could neither pin nor verify,
14//! advancing the cursor past them. Today's debug session lost ~30 min to it.
15//!
16//! Adversarial test: `tests/pull_unknown_kind.rs` synthesises a kind=9999
17//! event, runs `process_events`, and asserts the cursor stays put + the
18//! rejection carries `binary_version=` and `unknown_kind=` so the failure is
19//! loud on every retry.
20//!
21//! Cursor advancement rules:
22//!
23//! - terminal reject (bad signature, missing field, event_id mismatch,
24//!   revoked key) → advance past, retry won't help.
25//! - transient reject (unknown kind to THIS binary, signer not in trust) →
26//!   DO NOT advance past, future state may unblock.
27//! - success → advance past.
28//!
29//! The first transient reject "blocks" the cursor; subsequent events in the
30//! batch are still processed for their inbox-write side effect but cannot
31//! advance the cursor beyond the block point. Re-pull observes the same
32//! blocking event again → visible failure mode.
33
34use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40/// Outcome of processing a batch of pulled events.
41pub struct PullResult {
42    pub written: Vec<Value>,
43    pub rejected: Vec<Value>,
44    /// New value for `self.last_pulled_event_id`. `None` means the cursor
45    /// was not advanced (either no events processable beyond the prior
46    /// cursor, or the first event blocked).
47    pub advance_cursor_to: Option<String>,
48    /// True if at least one event in this batch is blocking cursor advance.
49    /// Surfaces to operators in `wire pull` non-JSON output so silent stall
50    /// is visible.
51    pub blocked: bool,
52}
53
54/// Check whether a peer inbox file already contains an event with this
55/// `event_id`. Scan-based — O(file_size) — but inbox files are small and
56/// only the write path consults this (a few times per pull). Avoids
57/// pulling in a separate index file.
58///
59/// Returns false if the file doesn't exist yet, so the first write to a
60/// new peer's inbox is a no-op check.
61fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62    if event_id.is_empty() {
63        return false;
64    }
65    let body = match std::fs::read_to_string(path) {
66        Ok(b) => b,
67        Err(_) => return false,
68    };
69    // Quick substring screen first — if event_id isn't anywhere in the
70    // file, no point parsing every line. event_id appears once per event
71    // as a JSON string value, so the substring is a strong signal.
72    let needle = format!("\"event_id\":\"{event_id}\"");
73    if !body.contains(&needle) {
74        return false;
75    }
76    // Confirm by line-parse — defensive against an event_id substring
77    // appearing inside a body field. JSON parsing rejects that case.
78    for line in body.lines() {
79        let trimmed = line.trim();
80        if trimmed.is_empty() {
81            continue;
82        }
83        if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84            && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85        {
86            return true;
87        }
88    }
89    false
90}
91
92/// Is `kind` known to THIS binary? Used by P0.1 to refuse silent cursor
93/// advance past events from a future protocol version.
94///
95/// The Nostr-compat special cases (kind=1, kind=100) are handled in
96/// `signing::kind_class`; this function mirrors them.
97pub fn is_known_kind(kind: u32) -> bool {
98    if kind == 1 || kind == 100 {
99        return true;
100    }
101    signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104/// Whether a `VerifyError` is transient (peer pinning may complete later)
105/// or terminal (retry won't help).
106fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107    matches!(
108        err,
109        signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110    )
111}
112
113/// Process a pulled-event batch. Mutates inbox files + relay state (via
114/// `pair_invite` side effects) but returns the new cursor target rather
115/// than writing it — caller persists.
116///
117/// `initial_cursor` is the pre-pull value of `self.last_pulled_event_id`.
118/// Returned `advance_cursor_to` is what the caller should write back. If
119/// the first event blocks the cursor, `advance_cursor_to == initial_cursor`
120/// (no change).
121pub fn process_events(
122    events: &[Value],
123    initial_cursor: Option<String>,
124    inbox_dir: &Path,
125) -> Result<PullResult> {
126    let binary_version = env!("CARGO_PKG_VERSION");
127    let trust_snapshot = config::read_trust()?;
128
129    let mut written = Vec::new();
130    let mut rejected = Vec::new();
131    let mut last_advanced = initial_cursor.clone();
132    let mut first_block_idx: Option<usize> = None;
133
134    for (idx, event) in events.iter().enumerate() {
135        let event_id = event
136            .get("event_id")
137            .and_then(Value::as_str)
138            .unwrap_or("")
139            .to_string();
140        let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
141
142        // P0.Z (0.5.11): if the event declares a schema_version, its major
143        // must match ours. Absent field = legacy event (pre-0.5.11), accept
144        // — we can't retroactively stamp old traffic. Mismatched major =
145        // hard reject with both incoming + supported versions in reason.
146        // Format locked with spark: `schema_mismatch=<received> binary_supports=<ours>`.
147        if let Some(declared) = event.get("schema_version").and_then(Value::as_str) {
148            let ours = signing::EVENT_SCHEMA_VERSION;
149            if signing::schema_major(declared) != signing::schema_major(ours) {
150                rejected.push(json!({
151                    "event_id": event_id,
152                    "reason": format!(
153                        "schema_mismatch={declared} binary_supports={ours}"
154                    ),
155                    "blocks_cursor": true,
156                    "transient": true,
157                    "schema_version": declared,
158                }));
159                if first_block_idx.is_none() {
160                    first_block_idx = Some(idx);
161                }
162                continue;
163            }
164        }
165
166        // P0.1: unknown kind → transient, block cursor, fail loud.
167        if !is_known_kind(kind) {
168            let reason = format!("unknown_kind={kind} binary_version={binary_version}");
169            rejected.push(json!({
170                "event_id": event_id,
171                "reason": reason,
172                "blocks_cursor": true,
173                "transient": true,
174            }));
175            if first_block_idx.is_none() {
176                first_block_idx = Some(idx);
177            }
178            continue;
179        }
180
181        // pair_drop / pair_drop_ack — pre-trust side effects that pin sender.
182        let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
183            Ok(Some(_)) => true,
184            Ok(None) => false,
185            Err(e) => {
186                // P0.2: a pair_drop that WAS recognised (kind=1100, type=pair_drop)
187                // but FAILED during consumption is exactly the silent-fail class —
188                // sender expects to be pinned but isn't, and never finds out. Log
189                // + structured record for `wire doctor`.
190                let peer_handle = event
191                    .get("from")
192                    .and_then(Value::as_str)
193                    .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
194                    .unwrap_or_else(|| "<unknown>".to_string());
195                eprintln!(
196                    "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
197                     sender will not be pinned; have them re-add or retry."
198                );
199                pair_invite::record_pair_rejection(
200                    &peer_handle,
201                    "pair_drop_consume_failed",
202                    &e.to_string(),
203                );
204                false
205            }
206        };
207        if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
208            let peer_handle = event
209                .get("from")
210                .and_then(Value::as_str)
211                .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
212                .unwrap_or_else(|| "<unknown>".to_string());
213            eprintln!(
214                "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
215                 their slot_token NOT recorded; we cannot `wire send` to them \
216                 until they retry."
217            );
218            pair_invite::record_pair_rejection(
219                &peer_handle,
220                "pair_drop_ack_consume_failed",
221                &e.to_string(),
222            );
223        }
224        let active_trust = if drop_paired {
225            config::read_trust()?
226        } else {
227            trust_snapshot.clone()
228        };
229
230        match signing::verify_message_v31(event, &active_trust) {
231            Ok(()) => {
232                let from = event
233                    .get("from")
234                    .and_then(Value::as_str)
235                    .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
236                    .unwrap_or_else(|| "unknown".to_string());
237                let path = inbox_dir.join(format!("{from}.jsonl"));
238
239                // P0.X (0.5.11): dedupe-on-write. Spark reported 3 duplicate
240                // pair_drop_ack events landing in their inbox same second,
241                // same event_id. Relay double-store or push retry-after-
242                // success can re-deliver. Inbox should be content-unique by
243                // event_id.
244                if inbox_already_contains(&path, &event_id) {
245                    rejected.push(json!({
246                        "event_id": event_id,
247                        "reason": "duplicate event_id already in inbox",
248                        "blocks_cursor": false,
249                        "transient": false,
250                    }));
251                    if first_block_idx.is_none() {
252                        last_advanced = Some(event_id.clone());
253                    }
254                    continue;
255                }
256
257                use std::io::Write;
258                let mut f = std::fs::OpenOptions::new()
259                    .create(true)
260                    .append(true)
261                    .open(&path)?;
262                let mut line = serde_json::to_vec(event)?;
263                line.push(b'\n');
264                f.write_all(&line)?;
265                written.push(json!({"event_id": event_id, "from": from}));
266                if first_block_idx.is_none() {
267                    last_advanced = Some(event_id.clone());
268                }
269            }
270            Err(e) if verify_error_is_transient(&e) => {
271                rejected.push(json!({
272                    "event_id": event_id,
273                    "reason": e.to_string(),
274                    "blocks_cursor": true,
275                    "transient": true,
276                }));
277                if first_block_idx.is_none() {
278                    first_block_idx = Some(idx);
279                }
280            }
281            Err(e) => {
282                rejected.push(json!({
283                    "event_id": event_id,
284                    "reason": e.to_string(),
285                    "blocks_cursor": false,
286                    "transient": false,
287                }));
288                if first_block_idx.is_none() {
289                    last_advanced = Some(event_id.clone());
290                }
291            }
292        }
293    }
294
295    let result = PullResult {
296        written: written.clone(),
297        rejected: rejected.clone(),
298        advance_cursor_to: last_advanced.clone(),
299        blocked: first_block_idx.is_some(),
300    };
301
302    // P2.10: structured trace. No-op when WIRE_DIAG is not set; one line
303    // per pull when it is. Enough signal for `wire diag tail` to replay
304    // a session.
305    crate::diag::emit(
306        "pull",
307        json!({
308            "events_in": events.len(),
309            "written": result.written.len(),
310            "rejected": result.rejected.len(),
311            "blocked": result.blocked,
312            "advance_to": result.advance_cursor_to,
313        }),
314    );
315
316    Ok(result)
317}
318
319#[cfg(test)]
320mod tests {
321    use super::*;
322    use serde_json::json;
323
324    #[test]
325    fn known_kinds_recognised() {
326        // Special cases.
327        assert!(is_known_kind(1));
328        assert!(is_known_kind(100));
329        // Named v0.1 kinds.
330        assert!(is_known_kind(1000));
331        assert!(is_known_kind(1100));
332        assert!(is_known_kind(1101));
333        assert!(is_known_kind(1201));
334    }
335
336    #[test]
337    fn unknown_kinds_rejected() {
338        assert!(!is_known_kind(0));
339        assert!(!is_known_kind(9999));
340        assert!(!is_known_kind(1099));
341        assert!(!is_known_kind(50000));
342    }
343
344    #[test]
345    fn unknown_kind_rejection_carries_binary_version_and_kind() {
346        // Spark's E. rule: the silent failure must be loud. Reject reason
347        // must name both the offending kind AND the binary version so an
348        // operator running `wire pull --json` sees instantly which side is
349        // behind.
350        crate::config::test_support::with_temp_home(|| {
351            crate::config::ensure_dirs().unwrap();
352            let inbox = crate::config::inbox_dir().unwrap();
353
354            let event = json!({
355                "event_id": "deadbeef",
356                "kind": 9999u32,
357                "type": "speculation",
358                "from": "did:wire:future-peer",
359            });
360
361            let result =
362                process_events(&[event], Some("prior-cursor".to_string()), &inbox).unwrap();
363
364            assert_eq!(result.rejected.len(), 1);
365            let reason = result.rejected[0]["reason"].as_str().unwrap();
366            assert!(
367                reason.contains("unknown_kind=9999"),
368                "reason missing kind: {reason}"
369            );
370            assert!(
371                reason.contains("binary_version="),
372                "reason missing binary_version: {reason}"
373            );
374            assert_eq!(result.rejected[0]["blocks_cursor"], true);
375
376            // Cursor MUST NOT advance past unknown event.
377            assert_eq!(
378                result.advance_cursor_to,
379                Some("prior-cursor".to_string()),
380                "cursor advanced past unknown kind — silent drop regression"
381            );
382            assert!(result.blocked);
383        });
384    }
385
386    #[test]
387    fn schema_mismatch_blocks_cursor_with_reason_shape() {
388        // P0.Z lock-in: format of the rejection reason. Spark + I agreed on
389        // exact shape `schema_mismatch=v3.2 binary_supports=v3.1` so an
390        // operator running `wire pull --json | jq` can grep for it.
391        // Wrong major (v4 vs v3) -> reject.
392        crate::config::test_support::with_temp_home(|| {
393            crate::config::ensure_dirs().unwrap();
394            let inbox = crate::config::inbox_dir().unwrap();
395            let event = json!({
396                "event_id": "future-binary",
397                "schema_version": "v4.0",
398                "kind": 1000u32,
399                "type": "decision",
400                "from": "did:wire:future",
401            });
402            let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
403            assert_eq!(result.rejected.len(), 1);
404            let reason = result.rejected[0]["reason"].as_str().unwrap();
405            assert!(reason.contains("schema_mismatch=v4.0"));
406            assert!(reason.contains("binary_supports=v3.1"));
407            assert_eq!(result.rejected[0]["blocks_cursor"], true);
408            assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
409        });
410    }
411
412    #[test]
413    fn schema_minor_bump_within_same_major_is_accepted() {
414        // v3.2 from a slightly-newer peer is still v3 major — must NOT be
415        // rejected just because the minor differs. Otherwise we lock the
416        // protocol to whoever shipped first.
417        crate::config::test_support::with_temp_home(|| {
418            crate::config::ensure_dirs().unwrap();
419            let inbox = crate::config::inbox_dir().unwrap();
420            let event = json!({
421                "event_id": "minor-bump",
422                "schema_version": "v3.2",
423                "kind": 1000u32,
424                "type": "decision",
425                "from": "did:wire:peer-not-in-trust",
426            });
427            let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
428            // Schema check passes, falls through to verify which rejects
429            // for trust reasons (transient blocks_cursor=true). Either way,
430            // the reason must NOT be a schema_mismatch.
431            let reason = result.rejected[0]["reason"].as_str().unwrap();
432            assert!(
433                !reason.contains("schema_mismatch"),
434                "minor bump should not be schema_mismatch: {reason}"
435            );
436        });
437    }
438
439    #[test]
440    fn legacy_event_without_schema_version_field_is_accepted() {
441        // Pre-0.5.11 events have no schema_version. Reject on absence
442        // would lock us out from every pre-existing inbox + every peer
443        // that hasn't upgraded yet. Absent field = accept (transient
444        // verify-rejection later is fine, just not a schema rejection).
445        crate::config::test_support::with_temp_home(|| {
446            crate::config::ensure_dirs().unwrap();
447            let inbox = crate::config::inbox_dir().unwrap();
448            let event = json!({
449                "event_id": "legacy",
450                "kind": 1000u32,
451                "type": "decision",
452                "from": "did:wire:legacy-peer",
453            });
454            let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
455            let reason = result.rejected[0]["reason"].as_str().unwrap();
456            assert!(!reason.contains("schema_mismatch"));
457        });
458    }
459
460    #[test]
461    fn inbox_dedupe_skips_duplicate_event_id() {
462        // P0.X smoke: spark's bug — same event_id arriving twice in the
463        // same inbox file produces only ONE inbox line. The pull result
464        // surfaces the duplicate as rejected[] with a clear reason so
465        // operators see what's happening (vs silently dropping).
466        let tmp = std::env::temp_dir().join(format!(
467            "wire-dedupe-test-{}-{}",
468            std::process::id(),
469            rand::random::<u32>()
470        ));
471        std::fs::create_dir_all(&tmp).unwrap();
472        let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
473        let existing_line = json!({
474            "event_id": event_id,
475            "from": "did:wire:peer",
476            "type": "claim",
477            "body": "first occurrence",
478        });
479        let path = tmp.join("peer.jsonl");
480        std::fs::write(&path, format!("{existing_line}\n")).unwrap();
481        assert!(inbox_already_contains(&path, event_id));
482        assert!(!inbox_already_contains(&path, "different-event-id"));
483        assert!(!inbox_already_contains(&path, ""));
484    }
485
486    #[test]
487    fn inbox_dedupe_substring_in_body_is_not_false_positive() {
488        // Adversarial: event_id substring inside a body field shouldn't
489        // count as the event already being present.
490        let tmp = std::env::temp_dir().join(format!(
491            "wire-dedupe-substring-{}-{}",
492            std::process::id(),
493            rand::random::<u32>()
494        ));
495        std::fs::create_dir_all(&tmp).unwrap();
496        let target_eid = "deadbeefcafebabe";
497        // Existing line has the target eid AS A STRING INSIDE the body,
498        // NOT as the event_id field.
499        let existing_line = json!({
500            "event_id": "different",
501            "from": "did:wire:peer",
502            "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
503        });
504        let path = tmp.join("peer.jsonl");
505        std::fs::write(&path, format!("{existing_line}\n")).unwrap();
506        // Substring screen sees the eid in the body, but the line-parse
507        // confirmation rejects it.
508        assert!(!inbox_already_contains(&path, target_eid));
509    }
510
511    #[test]
512    fn known_kind_after_unknown_does_not_advance_cursor() {
513        // Block rule: once first event blocks, NO later event can advance
514        // the cursor past it, even if later events would otherwise succeed.
515        // Re-pull observes both → visible.
516        crate::config::test_support::with_temp_home(|| {
517            crate::config::ensure_dirs().unwrap();
518            let inbox = crate::config::inbox_dir().unwrap();
519
520            let events = vec![
521                json!({
522                    "event_id": "evt-unknown",
523                    "kind": 9999u32,
524                    "type": "speculation",
525                    "from": "did:wire:future",
526                }),
527                json!({
528                    "event_id": "evt-known-but-untrusted",
529                    "kind": 1000u32,
530                    "type": "decision",
531                    "from": "did:wire:peer-not-in-trust",
532                }),
533            ];
534
535            let result = process_events(&events, Some("prior".to_string()), &inbox).unwrap();
536
537            assert_eq!(result.rejected.len(), 2);
538            assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
539            assert!(result.blocked);
540        });
541    }
542}