Skip to main content

wire/
pull.rs

1//! Pull-event processing — pure logic shared by `wire pull` and the daemon
2//! sync loop.
3//!
4//! P0.1 (0.5.11): refuse to silently advance cursor past events the running
5//! binary cannot process. The cursor only advances to the last event in the
6//! contiguous prefix that was either successfully written or rejected for a
7//! TERMINAL reason. Events rejected for TRANSIENT reasons (unknown kind,
8//! signer not yet pinned) block the cursor — so the next pull re-sees them
9//! and a future binary version or freshly-pinned peer can pick up where we
10//! left off.
11//!
12//! Without this rule, an old daemon running against a newer relay silently
13//! ate v0.5.x `pair_drop` events (kind=1100) it could neither pin nor verify,
14//! advancing the cursor past them. Today's debug session lost ~30 min to it.
15//!
16//! Adversarial test: `tests/pull_unknown_kind.rs` synthesises a kind=9999
17//! event, runs `process_events`, and asserts the cursor stays put + the
18//! rejection carries `binary_version=` and `unknown_kind=` so the failure is
19//! loud on every retry.
20//!
21//! Cursor advancement rules:
22//!
23//! - terminal reject (bad signature, missing field, event_id mismatch,
24//!   revoked key) → advance past, retry won't help.
25//! - transient reject (unknown kind to THIS binary, signer not in trust) →
26//!   DO NOT advance past, future state may unblock.
27//! - success → advance past.
28//!
29//! The first transient reject "blocks" the cursor; subsequent events in the
30//! batch are still processed for their inbox-write side effect but cannot
31//! advance the cursor beyond the block point. Re-pull observes the same
32//! blocking event again → visible failure mode.
33
34use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40/// Outcome of processing a batch of pulled events.
41pub struct PullResult {
42    pub written: Vec<Value>,
43    pub rejected: Vec<Value>,
44    /// New value for `self.last_pulled_event_id`. `None` means the cursor
45    /// was not advanced (either no events processable beyond the prior
46    /// cursor, or the first event blocked).
47    pub advance_cursor_to: Option<String>,
48    /// True if at least one event in this batch is blocking cursor advance.
49    /// Surfaces to operators in `wire pull` non-JSON output so silent stall
50    /// is visible.
51    pub blocked: bool,
52}
53
54/// Check whether a peer inbox file already contains an event with this
55/// `event_id`. Scan-based — O(file_size) — but inbox files are small and
56/// only the write path consults this (a few times per pull). Avoids
57/// pulling in a separate index file.
58///
59/// Returns false if the file doesn't exist yet, so the first write to a
60/// new peer's inbox is a no-op check.
61fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62    if event_id.is_empty() {
63        return false;
64    }
65    let body = match std::fs::read_to_string(path) {
66        Ok(b) => b,
67        Err(_) => return false,
68    };
69    // Quick substring screen first — if event_id isn't anywhere in the
70    // file, no point parsing every line. event_id appears once per event
71    // as a JSON string value, so the substring is a strong signal.
72    let needle = format!("\"event_id\":\"{event_id}\"");
73    if !body.contains(&needle) {
74        return false;
75    }
76    // Confirm by line-parse — defensive against an event_id substring
77    // appearing inside a body field. JSON parsing rejects that case.
78    for line in body.lines() {
79        let trimmed = line.trim();
80        if trimmed.is_empty() {
81            continue;
82        }
83        if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84            && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85        {
86            return true;
87        }
88    }
89    false
90}
91
92/// Is `kind` known to THIS binary? Used by P0.1 to refuse silent cursor
93/// advance past events from a future protocol version.
94///
95/// The Nostr-compat special cases (kind=1, kind=100) are handled in
96/// `signing::kind_class`; this function mirrors them.
97pub fn is_known_kind(kind: u32) -> bool {
98    if kind == 1 || kind == 100 {
99        return true;
100    }
101    signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104/// Whether a `VerifyError` is transient (peer pinning may complete later)
105/// or terminal (retry won't help).
106fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107    matches!(
108        err,
109        signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110    )
111}
112
113/// Process a pulled-event batch. Mutates inbox files + relay state (via
114/// `pair_invite` side effects) but returns the new cursor target rather
115/// than writing it — caller persists.
116///
117/// `initial_cursor` is the pre-pull value of `self.last_pulled_event_id`.
118/// Returned `advance_cursor_to` is what the caller should write back. If
119/// the first event blocks the cursor, `advance_cursor_to == initial_cursor`
120/// (no change).
121pub fn process_events(
122    events: &[Value],
123    initial_cursor: Option<String>,
124    inbox_dir: &Path,
125) -> Result<PullResult> {
126    let binary_version = env!("CARGO_PKG_VERSION");
127    let trust_snapshot = config::read_trust()?;
128
129    let mut written = Vec::new();
130    let mut rejected = Vec::new();
131    let mut last_advanced = initial_cursor.clone();
132    let mut first_block_idx: Option<usize> = None;
133
134    for (idx, event) in events.iter().enumerate() {
135        let event_id = event
136            .get("event_id")
137            .and_then(Value::as_str)
138            .unwrap_or("")
139            .to_string();
140        let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
141
142        // P0.Z (0.5.11): if the event declares a schema_version, its major
143        // must match ours. Absent field = legacy event (pre-0.5.11), accept
144        // — we can't retroactively stamp old traffic. Mismatched major =
145        // hard reject with both incoming + supported versions in reason.
146        // Format locked with spark: `schema_mismatch=<received> binary_supports=<ours>`.
147        if let Some(declared) = event
148            .get("schema_version")
149            .and_then(Value::as_str)
150        {
151            let ours = signing::EVENT_SCHEMA_VERSION;
152            if signing::schema_major(declared) != signing::schema_major(ours) {
153                rejected.push(json!({
154                    "event_id": event_id,
155                    "reason": format!(
156                        "schema_mismatch={declared} binary_supports={ours}"
157                    ),
158                    "blocks_cursor": true,
159                    "transient": true,
160                    "schema_version": declared,
161                }));
162                if first_block_idx.is_none() {
163                    first_block_idx = Some(idx);
164                }
165                continue;
166            }
167        }
168
169        // P0.1: unknown kind → transient, block cursor, fail loud.
170        if !is_known_kind(kind) {
171            let reason = format!(
172                "unknown_kind={kind} binary_version={binary_version}"
173            );
174            rejected.push(json!({
175                "event_id": event_id,
176                "reason": reason,
177                "blocks_cursor": true,
178                "transient": true,
179            }));
180            if first_block_idx.is_none() {
181                first_block_idx = Some(idx);
182            }
183            continue;
184        }
185
186        // pair_drop / pair_drop_ack — pre-trust side effects that pin sender.
187        let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
188            Ok(Some(_)) => true,
189            Ok(None) => false,
190            Err(e) => {
191                // P0.2: a pair_drop that WAS recognised (kind=1100, type=pair_drop)
192                // but FAILED during consumption is exactly the silent-fail class —
193                // sender expects to be pinned but isn't, and never finds out. Log
194                // + structured record for `wire doctor`.
195                let peer_handle = event
196                    .get("from")
197                    .and_then(Value::as_str)
198                    .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
199                    .unwrap_or_else(|| "<unknown>".to_string());
200                eprintln!(
201                    "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
202                     sender will not be pinned; have them re-add or retry."
203                );
204                pair_invite::record_pair_rejection(
205                    &peer_handle,
206                    "pair_drop_consume_failed",
207                    &e.to_string(),
208                );
209                false
210            }
211        };
212        if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
213            let peer_handle = event
214                .get("from")
215                .and_then(Value::as_str)
216                .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
217                .unwrap_or_else(|| "<unknown>".to_string());
218            eprintln!(
219                "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
220                 their slot_token NOT recorded; we cannot `wire send` to them \
221                 until they retry."
222            );
223            pair_invite::record_pair_rejection(
224                &peer_handle,
225                "pair_drop_ack_consume_failed",
226                &e.to_string(),
227            );
228        }
229        let active_trust = if drop_paired {
230            config::read_trust()?
231        } else {
232            trust_snapshot.clone()
233        };
234
235        match signing::verify_message_v31(event, &active_trust) {
236            Ok(()) => {
237                let from = event
238                    .get("from")
239                    .and_then(Value::as_str)
240                    .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
241                    .unwrap_or_else(|| "unknown".to_string());
242                let path = inbox_dir.join(format!("{from}.jsonl"));
243
244                // P0.X (0.5.11): dedupe-on-write. Spark reported 3 duplicate
245                // pair_drop_ack events landing in their inbox same second,
246                // same event_id. Relay double-store or push retry-after-
247                // success can re-deliver. Inbox should be content-unique by
248                // event_id.
249                if inbox_already_contains(&path, &event_id) {
250                    rejected.push(json!({
251                        "event_id": event_id,
252                        "reason": "duplicate event_id already in inbox",
253                        "blocks_cursor": false,
254                        "transient": false,
255                    }));
256                    if first_block_idx.is_none() {
257                        last_advanced = Some(event_id.clone());
258                    }
259                    continue;
260                }
261
262                use std::io::Write;
263                let mut f = std::fs::OpenOptions::new()
264                    .create(true)
265                    .append(true)
266                    .open(&path)?;
267                let mut line = serde_json::to_vec(event)?;
268                line.push(b'\n');
269                f.write_all(&line)?;
270                written.push(json!({"event_id": event_id, "from": from}));
271                if first_block_idx.is_none() {
272                    last_advanced = Some(event_id.clone());
273                }
274            }
275            Err(e) if verify_error_is_transient(&e) => {
276                rejected.push(json!({
277                    "event_id": event_id,
278                    "reason": e.to_string(),
279                    "blocks_cursor": true,
280                    "transient": true,
281                }));
282                if first_block_idx.is_none() {
283                    first_block_idx = Some(idx);
284                }
285            }
286            Err(e) => {
287                rejected.push(json!({
288                    "event_id": event_id,
289                    "reason": e.to_string(),
290                    "blocks_cursor": false,
291                    "transient": false,
292                }));
293                if first_block_idx.is_none() {
294                    last_advanced = Some(event_id.clone());
295                }
296            }
297        }
298    }
299
300    let result = PullResult {
301        written: written.clone(),
302        rejected: rejected.clone(),
303        advance_cursor_to: last_advanced.clone(),
304        blocked: first_block_idx.is_some(),
305    };
306
307    // P2.10: structured trace. No-op when WIRE_DIAG is not set; one line
308    // per pull when it is. Enough signal for `wire diag tail` to replay
309    // a session.
310    crate::diag::emit(
311        "pull",
312        json!({
313            "events_in": events.len(),
314            "written": result.written.len(),
315            "rejected": result.rejected.len(),
316            "blocked": result.blocked,
317            "advance_to": result.advance_cursor_to,
318        }),
319    );
320
321    Ok(result)
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    use serde_json::json;
328
329    #[test]
330    fn known_kinds_recognised() {
331        // Special cases.
332        assert!(is_known_kind(1));
333        assert!(is_known_kind(100));
334        // Named v0.1 kinds.
335        assert!(is_known_kind(1000));
336        assert!(is_known_kind(1100));
337        assert!(is_known_kind(1101));
338        assert!(is_known_kind(1201));
339    }
340
341    #[test]
342    fn unknown_kinds_rejected() {
343        assert!(!is_known_kind(0));
344        assert!(!is_known_kind(9999));
345        assert!(!is_known_kind(1099));
346        assert!(!is_known_kind(50000));
347    }
348
349    #[test]
350    fn unknown_kind_rejection_carries_binary_version_and_kind() {
351        // Spark's E. rule: the silent failure must be loud. Reject reason
352        // must name both the offending kind AND the binary version so an
353        // operator running `wire pull --json` sees instantly which side is
354        // behind.
355        crate::config::test_support::with_temp_home(|| {
356            crate::config::ensure_dirs().unwrap();
357            let inbox = crate::config::inbox_dir().unwrap();
358
359            let event = json!({
360                "event_id": "deadbeef",
361                "kind": 9999u32,
362                "type": "speculation",
363                "from": "did:wire:future-peer",
364            });
365
366            let result = process_events(
367                &[event],
368                Some("prior-cursor".to_string()),
369                &inbox,
370            )
371            .unwrap();
372
373            assert_eq!(result.rejected.len(), 1);
374            let reason = result.rejected[0]["reason"].as_str().unwrap();
375            assert!(
376                reason.contains("unknown_kind=9999"),
377                "reason missing kind: {reason}"
378            );
379            assert!(
380                reason.contains("binary_version="),
381                "reason missing binary_version: {reason}"
382            );
383            assert_eq!(result.rejected[0]["blocks_cursor"], true);
384
385            // Cursor MUST NOT advance past unknown event.
386            assert_eq!(
387                result.advance_cursor_to,
388                Some("prior-cursor".to_string()),
389                "cursor advanced past unknown kind — silent drop regression"
390            );
391            assert!(result.blocked);
392        });
393    }
394
395    #[test]
396    fn schema_mismatch_blocks_cursor_with_reason_shape() {
397        // P0.Z lock-in: format of the rejection reason. Spark + I agreed on
398        // exact shape `schema_mismatch=v3.2 binary_supports=v3.1` so an
399        // operator running `wire pull --json | jq` can grep for it.
400        // Wrong major (v4 vs v3) -> reject.
401        crate::config::test_support::with_temp_home(|| {
402            crate::config::ensure_dirs().unwrap();
403            let inbox = crate::config::inbox_dir().unwrap();
404            let event = json!({
405                "event_id": "future-binary",
406                "schema_version": "v4.0",
407                "kind": 1000u32,
408                "type": "decision",
409                "from": "did:wire:future",
410            });
411            let result = process_events(&[event], Some("prior".to_string()), &inbox)
412                .unwrap();
413            assert_eq!(result.rejected.len(), 1);
414            let reason = result.rejected[0]["reason"].as_str().unwrap();
415            assert!(reason.contains("schema_mismatch=v4.0"));
416            assert!(reason.contains("binary_supports=v3.1"));
417            assert_eq!(result.rejected[0]["blocks_cursor"], true);
418            assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
419        });
420    }
421
422    #[test]
423    fn schema_minor_bump_within_same_major_is_accepted() {
424        // v3.2 from a slightly-newer peer is still v3 major — must NOT be
425        // rejected just because the minor differs. Otherwise we lock the
426        // protocol to whoever shipped first.
427        crate::config::test_support::with_temp_home(|| {
428            crate::config::ensure_dirs().unwrap();
429            let inbox = crate::config::inbox_dir().unwrap();
430            let event = json!({
431                "event_id": "minor-bump",
432                "schema_version": "v3.2",
433                "kind": 1000u32,
434                "type": "decision",
435                "from": "did:wire:peer-not-in-trust",
436            });
437            let result = process_events(&[event], Some("prior".to_string()), &inbox)
438                .unwrap();
439            // Schema check passes, falls through to verify which rejects
440            // for trust reasons (transient blocks_cursor=true). Either way,
441            // the reason must NOT be a schema_mismatch.
442            let reason = result.rejected[0]["reason"].as_str().unwrap();
443            assert!(
444                !reason.contains("schema_mismatch"),
445                "minor bump should not be schema_mismatch: {reason}"
446            );
447        });
448    }
449
450    #[test]
451    fn legacy_event_without_schema_version_field_is_accepted() {
452        // Pre-0.5.11 events have no schema_version. Reject on absence
453        // would lock us out from every pre-existing inbox + every peer
454        // that hasn't upgraded yet. Absent field = accept (transient
455        // verify-rejection later is fine, just not a schema rejection).
456        crate::config::test_support::with_temp_home(|| {
457            crate::config::ensure_dirs().unwrap();
458            let inbox = crate::config::inbox_dir().unwrap();
459            let event = json!({
460                "event_id": "legacy",
461                "kind": 1000u32,
462                "type": "decision",
463                "from": "did:wire:legacy-peer",
464            });
465            let result = process_events(&[event], Some("prior".to_string()), &inbox)
466                .unwrap();
467            let reason = result.rejected[0]["reason"].as_str().unwrap();
468            assert!(!reason.contains("schema_mismatch"));
469        });
470    }
471
472    #[test]
473    fn inbox_dedupe_skips_duplicate_event_id() {
474        // P0.X smoke: spark's bug — same event_id arriving twice in the
475        // same inbox file produces only ONE inbox line. The pull result
476        // surfaces the duplicate as rejected[] with a clear reason so
477        // operators see what's happening (vs silently dropping).
478        let tmp = std::env::temp_dir().join(format!(
479            "wire-dedupe-test-{}-{}",
480            std::process::id(),
481            rand::random::<u32>()
482        ));
483        std::fs::create_dir_all(&tmp).unwrap();
484        let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
485        let existing_line = json!({
486            "event_id": event_id,
487            "from": "did:wire:peer",
488            "type": "claim",
489            "body": "first occurrence",
490        });
491        let path = tmp.join("peer.jsonl");
492        std::fs::write(&path, format!("{existing_line}\n")).unwrap();
493        assert!(inbox_already_contains(&path, event_id));
494        assert!(!inbox_already_contains(&path, "different-event-id"));
495        assert!(!inbox_already_contains(&path, ""));
496    }
497
498    #[test]
499    fn inbox_dedupe_substring_in_body_is_not_false_positive() {
500        // Adversarial: event_id substring inside a body field shouldn't
501        // count as the event already being present.
502        let tmp = std::env::temp_dir().join(format!(
503            "wire-dedupe-substring-{}-{}",
504            std::process::id(),
505            rand::random::<u32>()
506        ));
507        std::fs::create_dir_all(&tmp).unwrap();
508        let target_eid = "deadbeefcafebabe";
509        // Existing line has the target eid AS A STRING INSIDE the body,
510        // NOT as the event_id field.
511        let existing_line = json!({
512            "event_id": "different",
513            "from": "did:wire:peer",
514            "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
515        });
516        let path = tmp.join("peer.jsonl");
517        std::fs::write(&path, format!("{existing_line}\n")).unwrap();
518        // Substring screen sees the eid in the body, but the line-parse
519        // confirmation rejects it.
520        assert!(!inbox_already_contains(&path, target_eid));
521    }
522
523    #[test]
524    fn known_kind_after_unknown_does_not_advance_cursor() {
525        // Block rule: once first event blocks, NO later event can advance
526        // the cursor past it, even if later events would otherwise succeed.
527        // Re-pull observes both → visible.
528        crate::config::test_support::with_temp_home(|| {
529            crate::config::ensure_dirs().unwrap();
530            let inbox = crate::config::inbox_dir().unwrap();
531
532            let events = vec![
533                json!({
534                    "event_id": "evt-unknown",
535                    "kind": 9999u32,
536                    "type": "speculation",
537                    "from": "did:wire:future",
538                }),
539                json!({
540                    "event_id": "evt-known-but-untrusted",
541                    "kind": 1000u32,
542                    "type": "decision",
543                    "from": "did:wire:peer-not-in-trust",
544                }),
545            ];
546
547            let result = process_events(
548                &events,
549                Some("prior".to_string()),
550                &inbox,
551            )
552            .unwrap();
553
554            assert_eq!(result.rejected.len(), 2);
555            assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
556            assert!(result.blocked);
557        });
558    }
559}