Skip to main content

wire/
pull.rs

1//! Pull-event processing — pure logic shared by `wire pull` and the daemon
2//! sync loop.
3//!
4//! P0.1 (0.5.11): refuse to silently advance cursor past events the running
5//! binary cannot process. The cursor only advances to the last event in the
6//! contiguous prefix that was either successfully written or rejected for a
7//! TERMINAL reason. Events rejected for TRANSIENT reasons (unknown kind,
8//! signer not yet pinned) block the cursor — so the next pull re-sees them
9//! and a future binary version or freshly-pinned peer can pick up where we
10//! left off.
11//!
12//! Without this rule, an old daemon running against a newer relay silently
13//! ate v0.5.x `pair_drop` events (kind=1100) it could neither pin nor verify,
14//! advancing the cursor past them. Today's debug session lost ~30 min to it.
15//!
16//! Adversarial test: `tests/pull_unknown_kind.rs` synthesises a kind=9999
17//! event, runs `process_events`, and asserts the cursor stays put + the
18//! rejection carries `binary_version=` and `unknown_kind=` so the failure is
19//! loud on every retry.
20//!
21//! Cursor advancement rules:
22//!
23//! - terminal reject (bad signature, missing field, event_id mismatch,
24//!   revoked key) → advance past, retry won't help.
25//! - transient reject (unknown kind to THIS binary, signer not in trust) →
26//!   DO NOT advance past, future state may unblock.
27//! - success → advance past.
28//!
29//! The first transient reject "blocks" the cursor; subsequent events in the
30//! batch are still processed for their inbox-write side effect but cannot
31//! advance the cursor beyond the block point. Re-pull observes the same
32//! blocking event again → visible failure mode.
33
34use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40/// Outcome of processing a batch of pulled events.
41pub struct PullResult {
42    pub written: Vec<Value>,
43    pub rejected: Vec<Value>,
44    /// New value for `self.last_pulled_event_id`. `None` means the cursor
45    /// was not advanced (either no events processable beyond the prior
46    /// cursor, or the first event blocked).
47    pub advance_cursor_to: Option<String>,
48    /// True if at least one event in this batch is blocking cursor advance.
49    /// Surfaces to operators in `wire pull` non-JSON output so silent stall
50    /// is visible.
51    pub blocked: bool,
52}
53
54/// Check whether a peer inbox file already contains an event with this
55/// `event_id`. Scan-based — O(file_size) — but inbox files are small and
56/// only the write path consults this (a few times per pull). Avoids
57/// pulling in a separate index file.
58///
59/// Returns false if the file doesn't exist yet, so the first write to a
60/// new peer's inbox is a no-op check.
61fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62    if event_id.is_empty() {
63        return false;
64    }
65    let body = match std::fs::read_to_string(path) {
66        Ok(b) => b,
67        Err(_) => return false,
68    };
69    // Quick substring screen first — if event_id isn't anywhere in the
70    // file, no point parsing every line. event_id appears once per event
71    // as a JSON string value, so the substring is a strong signal.
72    let needle = format!("\"event_id\":\"{event_id}\"");
73    if !body.contains(&needle) {
74        return false;
75    }
76    // Confirm by line-parse — defensive against an event_id substring
77    // appearing inside a body field. JSON parsing rejects that case.
78    for line in body.lines() {
79        let trimmed = line.trim();
80        if trimmed.is_empty() {
81            continue;
82        }
83        if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84            && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85        {
86            return true;
87        }
88    }
89    false
90}
91
92/// Is `kind` known to THIS binary? Used by P0.1 to refuse silent cursor
93/// advance past events from a future protocol version.
94///
95/// The Nostr-compat special cases (kind=1, kind=100) are handled in
96/// `signing::kind_class`; this function mirrors them.
97pub fn is_known_kind(kind: u32) -> bool {
98    if kind == 1 || kind == 100 {
99        return true;
100    }
101    signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104/// Whether a `VerifyError` is transient (peer pinning may complete later)
105/// or terminal (retry won't help).
106fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107    matches!(
108        err,
109        signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110    )
111}
112
113/// Flow-like-water mutual trust for same-machine sisters: if `event`'s signer
114/// DID matches one of THIS operator's local sister sessions, pin that sister's
115/// card into `trust` at VERIFIED and return `true`. Same machine, same uid,
116/// same disk ⇒ same operator ⇒ trusted by construction — no accept step, no
117/// SAS, no gate. Returns `false` (caller keeps the original reject) when the
118/// signer isn't a known local sister, the registry is unreadable, or the
119/// sister's card can't be loaded.
120///
121/// Matching on the full DID — not the display handle — is the safety anchor: a
122/// wire DID is a hash commitment to its key, so a remote peer cannot forge a
123/// DID that collides with a local sister's without holding that sister's key
124/// (which would mean it IS the same operator).
125fn maybe_autopin_local_sister(event: &Value, trust: &mut Value) -> bool {
126    let Some(from_did) = event.get("from").and_then(Value::as_str) else {
127        return false;
128    };
129    let Ok(sessions) = crate::session::list_sessions() else {
130        return false;
131    };
132    for s in sessions {
133        if s.did.as_deref() != Some(from_did) {
134            continue;
135        }
136        let card_path = s
137            .home_dir
138            .join("config")
139            .join("wire")
140            .join("agent-card.json");
141        let Ok(bytes) = std::fs::read(&card_path) else {
142            return false;
143        };
144        let Ok(card) = serde_json::from_slice::<Value>(&bytes) else {
145            return false;
146        };
147        crate::trust::add_agent_card_pin(trust, &card, Some("VERIFIED"));
148
149        // Mutual trust must be mutual REACHABILITY: also register the sister's
150        // relay slot so our reply has somewhere to go — otherwise the receive
151        // direction works but `wire send <sister>` back fails "peer not pinned".
152        // The authoritative source is the sister's OWN relay-state self
153        // endpoints (where their `wire up` recorded the slot — for `--no-local`
154        // federation these are flat fields `self_endpoints()` synthesizes); the
155        // on-disk card carries no endpoints. Same machine, same disk ⇒ read it
156        // directly. Best-effort; failure here doesn't undo the trust pin.
157        let sister_relay_json = s.home_dir.join("config").join("wire").join("relay.json");
158        let sister_endpoints = std::fs::read(&sister_relay_json)
159            .ok()
160            .and_then(|b| serde_json::from_slice::<Value>(&b).ok())
161            .map(|rs| crate::endpoints::self_endpoints(&rs))
162            .unwrap_or_default();
163        if !sister_endpoints.is_empty() {
164            let handle = card
165                .get("handle")
166                .and_then(Value::as_str)
167                .map(str::to_string)
168                .unwrap_or_else(|| {
169                    crate::agent_card::display_handle_from_did(from_did).to_string()
170                });
171            if let Ok(mut relay_state) = crate::config::read_relay_state()
172                && crate::endpoints::pin_peer_endpoints(
173                    &mut relay_state,
174                    &handle,
175                    &sister_endpoints,
176                )
177                .is_ok()
178            {
179                let _ = crate::config::write_relay_state(&relay_state);
180            }
181        }
182        return true;
183    }
184    false
185}
186
187/// Process a pulled-event batch. Mutates inbox files + relay state (via
188/// `pair_invite` side effects) but returns the new cursor target rather
189/// than writing it — caller persists.
190///
191/// `initial_cursor` is the pre-pull value of `self.last_pulled_event_id`.
192/// Returned `advance_cursor_to` is what the caller should write back. If
193/// the first event blocks the cursor, `advance_cursor_to == initial_cursor`
194/// (no change).
195pub fn process_events(
196    events: &[Value],
197    initial_cursor: Option<String>,
198    inbox_dir: &Path,
199) -> Result<PullResult> {
200    let binary_version = env!("CARGO_PKG_VERSION");
201    let trust_snapshot = config::read_trust()?;
202
203    let mut written = Vec::new();
204    let mut rejected = Vec::new();
205    let mut last_advanced = initial_cursor.clone();
206    let mut first_block_idx: Option<usize> = None;
207
208    for (idx, event) in events.iter().enumerate() {
209        let event_id = event
210            .get("event_id")
211            .and_then(Value::as_str)
212            .unwrap_or("")
213            .to_string();
214        let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
215
216        // P0.Z (0.5.11): if the event declares a schema_version, its major
217        // must match ours. Absent field = legacy event (pre-0.5.11), accept
218        // — we can't retroactively stamp old traffic. Mismatched major =
219        // hard reject with both incoming + supported versions in reason.
220        // Format locked with spark: `schema_mismatch=<received> binary_supports=<ours>`.
221        if let Some(declared) = event.get("schema_version").and_then(Value::as_str) {
222            let ours = signing::EVENT_SCHEMA_VERSION;
223            if signing::schema_major(declared) != signing::schema_major(ours) {
224                rejected.push(json!({
225                    "event_id": event_id,
226                    "reason": format!(
227                        "schema_mismatch={declared} binary_supports={ours}"
228                    ),
229                    "blocks_cursor": true,
230                    "transient": true,
231                    "schema_version": declared,
232                }));
233                if first_block_idx.is_none() {
234                    first_block_idx = Some(idx);
235                }
236                continue;
237            }
238        }
239
240        // P0.1: unknown kind → transient, block cursor, fail loud.
241        if !is_known_kind(kind) {
242            let reason = format!("unknown_kind={kind} binary_version={binary_version}");
243            rejected.push(json!({
244                "event_id": event_id,
245                "reason": reason,
246                "blocks_cursor": true,
247                "transient": true,
248            }));
249            if first_block_idx.is_none() {
250                first_block_idx = Some(idx);
251            }
252            continue;
253        }
254
255        // pair_drop / pair_drop_ack — pre-trust side effects that pin sender.
256        let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
257            Ok(Some(_)) => true,
258            Ok(None) => false,
259            Err(e) => {
260                // P0.2: a pair_drop that WAS recognised (kind=1100, type=pair_drop)
261                // but FAILED during consumption is exactly the silent-fail class —
262                // sender expects to be pinned but isn't, and never finds out. Log
263                // + structured record for `wire doctor`.
264                let peer_handle = event
265                    .get("from")
266                    .and_then(Value::as_str)
267                    .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
268                    .unwrap_or_else(|| "<unknown>".to_string());
269                eprintln!(
270                    "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
271                     sender will not be pinned; have them re-add or retry."
272                );
273                pair_invite::record_pair_rejection(
274                    &peer_handle,
275                    "pair_drop_consume_failed",
276                    &e.to_string(),
277                );
278                false
279            }
280        };
281        // pair_drop_ack carries the peer's relay coordinates (relay_url /
282        // slot_id / slot_token) and, on consume, OVERWRITES our pinned
283        // endpoints for that peer + stamps the durable bilateral marker.
284        // Those are machine-trusting side effects, so they must not run on
285        // an unverified event: a forged kind=1101 claiming `from: <peer>`
286        // with attacker relay coords would otherwise redirect all our
287        // outbound traffic to that peer into the attacker's relay. We pin
288        // the peer in trust at dial time (`cmd_add`), so a legitimate ack's
289        // sender is always already pinned and verifies here; an ack we
290        // can't verify (forged, or for a peer we never dialed) is dropped
291        // before it can touch relay state. Verify against fresh trust so an
292        // earlier pair_drop in this same batch that pinned the sender is
293        // visible.
294        if event.get("kind").and_then(Value::as_u64) == Some(1101) {
295            let ack_trust = config::read_trust()?;
296            match signing::verify_message_v31(event, &ack_trust) {
297                Ok(()) => {
298                    if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
299                        let peer_handle = event
300                            .get("from")
301                            .and_then(Value::as_str)
302                            .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
303                            .unwrap_or_else(|| "<unknown>".to_string());
304                        eprintln!(
305                            "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
306                             their slot_token NOT recorded; we cannot `wire send` to them \
307                             until they retry."
308                        );
309                        pair_invite::record_pair_rejection(
310                            &peer_handle,
311                            "pair_drop_ack_consume_failed",
312                            &e.to_string(),
313                        );
314                    }
315                }
316                Err(e) => {
317                    let peer_handle = event
318                        .get("from")
319                        .and_then(Value::as_str)
320                        .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
321                        .unwrap_or_else(|| "<unknown>".to_string());
322                    eprintln!(
323                        "wire pull: DROPPING unverified pair_drop_ack from {peer_handle}: {e}. \
324                         relay endpoints NOT updated (sender not pinned, or signature forged)."
325                    );
326                    pair_invite::record_pair_rejection(
327                        &peer_handle,
328                        "pair_drop_ack_unverified",
329                        &e.to_string(),
330                    );
331                }
332            }
333        }
334        let mut active_trust = if drop_paired {
335            config::read_trust()?
336        } else {
337            trust_snapshot.clone()
338        };
339
340        // Flow-like-water: same-machine sister sessions are the same operator,
341        // same uid, same disk — mutually trusted by construction. If an inbound
342        // event is signed by a recognized local sister we haven't pinned yet,
343        // pin it VERIFIED and re-verify — no pending-inbound, no accept step, no
344        // gate. The match is on the signer's full DID against our own session
345        // registry (a DID commits to its key, so a remote peer cannot forge one
346        // that collides with a local sister's). This is what makes
347        // `wire dial <sister>` a frictionless mutual pairing: the dialer pins
348        // the target, and the target auto-pins the dialer the instant its first
349        // event arrives. Persisted so the pin outlives this pull.
350        let verify = match signing::verify_message_v31(event, &active_trust) {
351            Ok(()) => Ok(()),
352            Err(e) => {
353                if maybe_autopin_local_sister(event, &mut active_trust) {
354                    let _ = config::write_trust(&active_trust);
355                    signing::verify_message_v31(event, &active_trust)
356                } else {
357                    Err(e)
358                }
359            }
360        };
361
362        match verify {
363            Ok(()) => {
364                let from = event
365                    .get("from")
366                    .and_then(Value::as_str)
367                    .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
368                    .unwrap_or_else(|| "unknown".to_string());
369                let path = inbox_dir.join(format!("{from}.jsonl"));
370
371                // P0.X (0.5.11): dedupe-on-write. Spark reported 3 duplicate
372                // pair_drop_ack events landing in their inbox same second,
373                // same event_id. Relay double-store or push retry-after-
374                // success can re-deliver. Inbox should be content-unique by
375                // event_id.
376                if inbox_already_contains(&path, &event_id) {
377                    rejected.push(json!({
378                        "event_id": event_id,
379                        "reason": "duplicate event_id already in inbox",
380                        "blocks_cursor": false,
381                        "transient": false,
382                    }));
383                    if first_block_idx.is_none() {
384                        last_advanced = Some(event_id.clone());
385                    }
386                    continue;
387                }
388
389                use std::io::Write;
390                let mut f = std::fs::OpenOptions::new()
391                    .create(true)
392                    .append(true)
393                    .open(&path)?;
394                let mut line = serde_json::to_vec(event)?;
395                line.push(b'\n');
396                f.write_all(&line)?;
397                // v0.14.3 (#14): also surface the event timestamp so the
398                // caller (run_sync_pull) can stamp
399                // `relay_state.peers[<from>].last_inbound_event_at`
400                // — sender-side staleness needs a daemon-written
401                // signal, not inbox-file mtime (mtime breaks on
402                // backup/restore/touch and has fs-specific resolution).
403                let ts = event
404                    .get("timestamp")
405                    .and_then(Value::as_str)
406                    .unwrap_or("")
407                    .to_string();
408                written.push(json!({
409                    "event_id": event_id,
410                    "from": from,
411                    "timestamp": ts,
412                }));
413                if first_block_idx.is_none() {
414                    last_advanced = Some(event_id.clone());
415                }
416            }
417            Err(e) if verify_error_is_transient(&e) => {
418                rejected.push(json!({
419                    "event_id": event_id,
420                    "reason": e.to_string(),
421                    "blocks_cursor": true,
422                    "transient": true,
423                }));
424                if first_block_idx.is_none() {
425                    first_block_idx = Some(idx);
426                }
427            }
428            Err(e) => {
429                rejected.push(json!({
430                    "event_id": event_id,
431                    "reason": e.to_string(),
432                    "blocks_cursor": false,
433                    "transient": false,
434                }));
435                if first_block_idx.is_none() {
436                    last_advanced = Some(event_id.clone());
437                }
438            }
439        }
440    }
441
442    let result = PullResult {
443        written: written.clone(),
444        rejected: rejected.clone(),
445        advance_cursor_to: last_advanced.clone(),
446        blocked: first_block_idx.is_some(),
447    };
448
449    // P2.10: structured trace. No-op when WIRE_DIAG is not set; one line
450    // per pull when it is. Enough signal for `wire diag tail` to replay
451    // a session.
452    crate::diag::emit(
453        "pull",
454        json!({
455            "events_in": events.len(),
456            "written": result.written.len(),
457            "rejected": result.rejected.len(),
458            "blocked": result.blocked,
459            "advance_to": result.advance_cursor_to,
460        }),
461    );
462
463    Ok(result)
464}
465
466#[cfg(test)]
467mod tests {
468    use super::*;
469    use serde_json::json;
470
471    #[test]
472    fn known_kinds_recognised() {
473        // Special cases.
474        assert!(is_known_kind(1));
475        assert!(is_known_kind(100));
476        // Named v0.1 kinds.
477        assert!(is_known_kind(1000));
478        assert!(is_known_kind(1100));
479        assert!(is_known_kind(1101));
480        assert!(is_known_kind(1201));
481    }
482
483    #[test]
484    fn unknown_kinds_rejected() {
485        assert!(!is_known_kind(0));
486        assert!(!is_known_kind(9999));
487        assert!(!is_known_kind(1099));
488        assert!(!is_known_kind(50000));
489    }
490
491    #[test]
492    fn unknown_kind_rejection_carries_binary_version_and_kind() {
493        // Spark's E. rule: the silent failure must be loud. Reject reason
494        // must name both the offending kind AND the binary version so an
495        // operator running `wire pull --json` sees instantly which side is
496        // behind.
497        crate::config::test_support::with_temp_home(|| {
498            crate::config::ensure_dirs().unwrap();
499            let inbox = crate::config::inbox_dir().unwrap();
500
501            let event = json!({
502                "event_id": "deadbeef",
503                "kind": 9999u32,
504                "type": "speculation",
505                "from": "did:wire:future-peer",
506            });
507
508            let result =
509                process_events(&[event], Some("prior-cursor".to_string()), &inbox).unwrap();
510
511            assert_eq!(result.rejected.len(), 1);
512            let reason = result.rejected[0]["reason"].as_str().unwrap();
513            assert!(
514                reason.contains("unknown_kind=9999"),
515                "reason missing kind: {reason}"
516            );
517            assert!(
518                reason.contains("binary_version="),
519                "reason missing binary_version: {reason}"
520            );
521            assert_eq!(result.rejected[0]["blocks_cursor"], true);
522
523            // Cursor MUST NOT advance past unknown event.
524            assert_eq!(
525                result.advance_cursor_to,
526                Some("prior-cursor".to_string()),
527                "cursor advanced past unknown kind — silent drop regression"
528            );
529            assert!(result.blocked);
530        });
531    }
532
533    #[test]
534    fn schema_mismatch_blocks_cursor_with_reason_shape() {
535        // P0.Z lock-in: format of the rejection reason. Spark + I agreed on
536        // exact shape `schema_mismatch=v3.2 binary_supports=v3.1` so an
537        // operator running `wire pull --json | jq` can grep for it.
538        // Wrong major (v4 vs v3) -> reject.
539        crate::config::test_support::with_temp_home(|| {
540            crate::config::ensure_dirs().unwrap();
541            let inbox = crate::config::inbox_dir().unwrap();
542            let event = json!({
543                "event_id": "future-binary",
544                "schema_version": "v4.0",
545                "kind": 1000u32,
546                "type": "decision",
547                "from": "did:wire:future",
548            });
549            let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
550            assert_eq!(result.rejected.len(), 1);
551            let reason = result.rejected[0]["reason"].as_str().unwrap();
552            assert!(reason.contains("schema_mismatch=v4.0"));
553            assert!(reason.contains("binary_supports=v3.1"));
554            assert_eq!(result.rejected[0]["blocks_cursor"], true);
555            assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
556        });
557    }
558
559    #[test]
560    fn schema_minor_bump_within_same_major_is_accepted() {
561        // v3.2 from a slightly-newer peer is still v3 major — must NOT be
562        // rejected just because the minor differs. Otherwise we lock the
563        // protocol to whoever shipped first.
564        crate::config::test_support::with_temp_home(|| {
565            crate::config::ensure_dirs().unwrap();
566            let inbox = crate::config::inbox_dir().unwrap();
567            let event = json!({
568                "event_id": "minor-bump",
569                "schema_version": "v3.2",
570                "kind": 1000u32,
571                "type": "decision",
572                "from": "did:wire:peer-not-in-trust",
573            });
574            let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
575            // Schema check passes, falls through to verify which rejects
576            // for trust reasons (transient blocks_cursor=true). Either way,
577            // the reason must NOT be a schema_mismatch.
578            let reason = result.rejected[0]["reason"].as_str().unwrap();
579            assert!(
580                !reason.contains("schema_mismatch"),
581                "minor bump should not be schema_mismatch: {reason}"
582            );
583        });
584    }
585
586    #[test]
587    fn legacy_event_without_schema_version_field_is_accepted() {
588        // Pre-0.5.11 events have no schema_version. Reject on absence
589        // would lock us out from every pre-existing inbox + every peer
590        // that hasn't upgraded yet. Absent field = accept (transient
591        // verify-rejection later is fine, just not a schema rejection).
592        crate::config::test_support::with_temp_home(|| {
593            crate::config::ensure_dirs().unwrap();
594            let inbox = crate::config::inbox_dir().unwrap();
595            let event = json!({
596                "event_id": "legacy",
597                "kind": 1000u32,
598                "type": "decision",
599                "from": "did:wire:legacy-peer",
600            });
601            let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
602            let reason = result.rejected[0]["reason"].as_str().unwrap();
603            assert!(!reason.contains("schema_mismatch"));
604        });
605    }
606
607    #[test]
608    fn forged_pair_drop_ack_does_not_mutate_relay_endpoints() {
609        // Security regression: a kind=1101 pair_drop_ack from a sender we
610        // never pinned (forged `from`, attacker relay coords) must NOT
611        // overwrite our relay endpoints. Pre-fix, the ack was consumed
612        // before signature verification, letting an attacker redirect our
613        // outbound traffic for the impersonated peer to their own relay.
614        crate::config::test_support::with_temp_home(|| {
615            crate::config::ensure_dirs().unwrap();
616            let inbox = crate::config::inbox_dir().unwrap();
617            let forged = json!({
618                "event_id": "forged-ack-0001",
619                "kind": 1101u32,
620                "type": "pair_drop_ack",
621                "from": "did:wire:victimpeer-deadbeef",
622                "body": {
623                    "relay_url": "https://attacker.example",
624                    "slot_id": "attackerslot",
625                    "slot_token": "attackertoken",
626                },
627            });
628            // Sender is NOT in trust → verify must fail → no mutation.
629            let _ = process_events(&[forged], Some("c".to_string()), &inbox).unwrap();
630            let relay_state = crate::config::read_relay_state().unwrap();
631            let peers = relay_state.get("peers").and_then(Value::as_object);
632            assert!(
633                peers.is_none_or(|m| !m.contains_key("victimpeer")),
634                "forged ack must not pin endpoints for the impersonated peer; \
635                 relay_state.peers = {peers:?}"
636            );
637        });
638    }
639
640    #[test]
641    fn inbox_dedupe_skips_duplicate_event_id() {
642        // P0.X smoke: spark's bug — same event_id arriving twice in the
643        // same inbox file produces only ONE inbox line. The pull result
644        // surfaces the duplicate as rejected[] with a clear reason so
645        // operators see what's happening (vs silently dropping).
646        let tmp = std::env::temp_dir().join(format!(
647            "wire-dedupe-test-{}-{}",
648            std::process::id(),
649            rand::random::<u32>()
650        ));
651        std::fs::create_dir_all(&tmp).unwrap();
652        let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
653        let existing_line = json!({
654            "event_id": event_id,
655            "from": "did:wire:peer",
656            "type": "claim",
657            "body": "first occurrence",
658        });
659        let path = tmp.join("peer.jsonl");
660        std::fs::write(&path, format!("{existing_line}\n")).unwrap();
661        assert!(inbox_already_contains(&path, event_id));
662        assert!(!inbox_already_contains(&path, "different-event-id"));
663        assert!(!inbox_already_contains(&path, ""));
664    }
665
666    #[test]
667    fn inbox_dedupe_substring_in_body_is_not_false_positive() {
668        // Adversarial: event_id substring inside a body field shouldn't
669        // count as the event already being present.
670        let tmp = std::env::temp_dir().join(format!(
671            "wire-dedupe-substring-{}-{}",
672            std::process::id(),
673            rand::random::<u32>()
674        ));
675        std::fs::create_dir_all(&tmp).unwrap();
676        let target_eid = "deadbeefcafebabe";
677        // Existing line has the target eid AS A STRING INSIDE the body,
678        // NOT as the event_id field.
679        let existing_line = json!({
680            "event_id": "different",
681            "from": "did:wire:peer",
682            "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
683        });
684        let path = tmp.join("peer.jsonl");
685        std::fs::write(&path, format!("{existing_line}\n")).unwrap();
686        // Substring screen sees the eid in the body, but the line-parse
687        // confirmation rejects it.
688        assert!(!inbox_already_contains(&path, target_eid));
689    }
690
691    #[test]
692    fn known_kind_after_unknown_does_not_advance_cursor() {
693        // Block rule: once first event blocks, NO later event can advance
694        // the cursor past it, even if later events would otherwise succeed.
695        // Re-pull observes both → visible.
696        crate::config::test_support::with_temp_home(|| {
697            crate::config::ensure_dirs().unwrap();
698            let inbox = crate::config::inbox_dir().unwrap();
699
700            let events = vec![
701                json!({
702                    "event_id": "evt-unknown",
703                    "kind": 9999u32,
704                    "type": "speculation",
705                    "from": "did:wire:future",
706                }),
707                json!({
708                    "event_id": "evt-known-but-untrusted",
709                    "kind": 1000u32,
710                    "type": "decision",
711                    "from": "did:wire:peer-not-in-trust",
712                }),
713            ];
714
715            let result = process_events(&events, Some("prior".to_string()), &inbox).unwrap();
716
717            assert_eq!(result.rejected.len(), 2);
718            assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
719            assert!(result.blocked);
720        });
721    }
722}