wire/pull.rs
1//! Pull-event processing — pure logic shared by `wire pull` and the daemon
2//! sync loop.
3//!
4//! P0.1 (0.5.11): refuse to silently advance cursor past events the running
5//! binary cannot process. The cursor only advances to the last event in the
6//! contiguous prefix that was either successfully written or rejected for a
7//! TERMINAL reason. Events rejected for TRANSIENT reasons (unknown kind,
8//! signer not yet pinned) block the cursor — so the next pull re-sees them
9//! and a future binary version or freshly-pinned peer can pick up where we
10//! left off.
11//!
12//! Without this rule, an old daemon running against a newer relay silently
13//! ate v0.5.x `pair_drop` events (kind=1100) it could neither pin nor verify,
14//! advancing the cursor past them. Today's debug session lost ~30 min to it.
15//!
16//! Adversarial test: `tests/pull_unknown_kind.rs` synthesises a kind=9999
17//! event, runs `process_events`, and asserts the cursor stays put + the
18//! rejection carries `binary_version=` and `unknown_kind=` so the failure is
19//! loud on every retry.
20//!
21//! Cursor advancement rules:
22//!
23//! - terminal reject (bad signature, missing field, event_id mismatch,
24//! revoked key) → advance past, retry won't help.
25//! - transient reject (unknown kind to THIS binary, signer not in trust) →
26//! DO NOT advance past, future state may unblock.
27//! - success → advance past.
28//!
29//! The first transient reject "blocks" the cursor; subsequent events in the
30//! batch are still processed for their inbox-write side effect but cannot
31//! advance the cursor beyond the block point. Re-pull observes the same
32//! blocking event again → visible failure mode.
33
34use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40/// Outcome of processing a batch of pulled events.
41pub struct PullResult {
42 pub written: Vec<Value>,
43 pub rejected: Vec<Value>,
44 /// New value for `self.last_pulled_event_id`. `None` means the cursor
45 /// was not advanced (either no events processable beyond the prior
46 /// cursor, or the first event blocked).
47 pub advance_cursor_to: Option<String>,
48 /// True if at least one event in this batch is blocking cursor advance.
49 /// Surfaces to operators in `wire pull` non-JSON output so silent stall
50 /// is visible.
51 pub blocked: bool,
52}
53
54/// Check whether a peer inbox file already contains an event with this
55/// `event_id`. Scan-based — O(file_size) — but inbox files are small and
56/// only the write path consults this (a few times per pull). Avoids
57/// pulling in a separate index file.
58///
59/// Returns false if the file doesn't exist yet, so the first write to a
60/// new peer's inbox is a no-op check.
61fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62 if event_id.is_empty() {
63 return false;
64 }
65 let body = match std::fs::read_to_string(path) {
66 Ok(b) => b,
67 Err(_) => return false,
68 };
69 // Quick substring screen first — if event_id isn't anywhere in the
70 // file, no point parsing every line. event_id appears once per event
71 // as a JSON string value, so the substring is a strong signal.
72 let needle = format!("\"event_id\":\"{event_id}\"");
73 if !body.contains(&needle) {
74 return false;
75 }
76 // Confirm by line-parse — defensive against an event_id substring
77 // appearing inside a body field. JSON parsing rejects that case.
78 for line in body.lines() {
79 let trimmed = line.trim();
80 if trimmed.is_empty() {
81 continue;
82 }
83 if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84 && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85 {
86 return true;
87 }
88 }
89 false
90}
91
92/// Is `kind` known to THIS binary? Used by P0.1 to refuse silent cursor
93/// advance past events from a future protocol version.
94///
95/// The Nostr-compat special cases (kind=1, kind=100) are handled in
96/// `signing::kind_class`; this function mirrors them.
97pub fn is_known_kind(kind: u32) -> bool {
98 if kind == 1 || kind == 100 {
99 return true;
100 }
101 signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104/// Whether a `VerifyError` is transient (peer pinning may complete later)
105/// or terminal (retry won't help).
106fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107 matches!(
108 err,
109 signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110 )
111}
112
113/// Flow-like-water mutual trust for same-machine sisters: if `event`'s signer
114/// DID matches one of THIS operator's local sister sessions, pin that sister's
115/// card into `trust` at VERIFIED and return `true`. Same machine, same uid,
116/// same disk ⇒ same operator ⇒ trusted by construction — no accept step, no
117/// SAS, no gate. Returns `false` (caller keeps the original reject) when the
118/// signer isn't a known local sister, the registry is unreadable, or the
119/// sister's card can't be loaded.
120///
121/// Matching on the full DID — not the display handle — is the safety anchor: a
122/// wire DID is a hash commitment to its key, so a remote peer cannot forge a
123/// DID that collides with a local sister's without holding that sister's key
124/// (which would mean it IS the same operator).
125fn maybe_autopin_local_sister(event: &Value, trust: &mut Value) -> bool {
126 let Some(from_did) = event.get("from").and_then(Value::as_str) else {
127 return false;
128 };
129 let Ok(sessions) = crate::session::list_sessions() else {
130 return false;
131 };
132 for s in sessions {
133 if s.did.as_deref() != Some(from_did) {
134 continue;
135 }
136 let card_path = s
137 .home_dir
138 .join("config")
139 .join("wire")
140 .join("agent-card.json");
141 let Ok(bytes) = std::fs::read(&card_path) else {
142 return false;
143 };
144 let Ok(card) = serde_json::from_slice::<Value>(&bytes) else {
145 return false;
146 };
147 crate::trust::add_agent_card_pin(trust, &card, Some("VERIFIED"));
148
149 // Mutual trust must be mutual REACHABILITY: also register the sister's
150 // relay slot so our reply has somewhere to go — otherwise the receive
151 // direction works but `wire send <sister>` back fails "peer not pinned".
152 // The authoritative source is the sister's OWN relay-state self
153 // endpoints (where their `wire up` recorded the slot — for `--no-local`
154 // federation these are flat fields `self_endpoints()` synthesizes); the
155 // on-disk card carries no endpoints. Same machine, same disk ⇒ read it
156 // directly. Best-effort; failure here doesn't undo the trust pin.
157 let sister_relay_json = s.home_dir.join("config").join("wire").join("relay.json");
158 let sister_endpoints = std::fs::read(&sister_relay_json)
159 .ok()
160 .and_then(|b| serde_json::from_slice::<Value>(&b).ok())
161 .map(|rs| crate::endpoints::self_endpoints(&rs))
162 .unwrap_or_default();
163 if !sister_endpoints.is_empty() {
164 let handle = card
165 .get("handle")
166 .and_then(Value::as_str)
167 .map(str::to_string)
168 .unwrap_or_else(|| {
169 crate::agent_card::display_handle_from_did(from_did).to_string()
170 });
171 if let Ok(mut relay_state) = crate::config::read_relay_state()
172 && crate::endpoints::pin_peer_endpoints(
173 &mut relay_state,
174 &handle,
175 &sister_endpoints,
176 )
177 .is_ok()
178 {
179 let _ = crate::config::write_relay_state(&relay_state);
180 }
181 }
182 return true;
183 }
184 false
185}
186
187/// Process a pulled-event batch. Mutates inbox files + relay state (via
188/// `pair_invite` side effects) but returns the new cursor target rather
189/// than writing it — caller persists.
190///
191/// `initial_cursor` is the pre-pull value of `self.last_pulled_event_id`.
192/// Returned `advance_cursor_to` is what the caller should write back. If
193/// the first event blocks the cursor, `advance_cursor_to == initial_cursor`
194/// (no change).
195pub fn process_events(
196 events: &[Value],
197 initial_cursor: Option<String>,
198 inbox_dir: &Path,
199) -> Result<PullResult> {
200 let binary_version = env!("CARGO_PKG_VERSION");
201 let trust_snapshot = config::read_trust()?;
202
203 let mut written = Vec::new();
204 let mut rejected = Vec::new();
205 let mut last_advanced = initial_cursor.clone();
206 let mut first_block_idx: Option<usize> = None;
207
208 for (idx, event) in events.iter().enumerate() {
209 let event_id = event
210 .get("event_id")
211 .and_then(Value::as_str)
212 .unwrap_or("")
213 .to_string();
214 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
215
216 // P0.Z (0.5.11): if the event declares a schema_version, its major
217 // must match ours. Absent field = legacy event (pre-0.5.11), accept
218 // — we can't retroactively stamp old traffic. Mismatched major =
219 // hard reject with both incoming + supported versions in reason.
220 // Format locked with spark: `schema_mismatch=<received> binary_supports=<ours>`.
221 if let Some(declared) = event.get("schema_version").and_then(Value::as_str) {
222 let ours = signing::EVENT_SCHEMA_VERSION;
223 if signing::schema_major(declared) != signing::schema_major(ours) {
224 rejected.push(json!({
225 "event_id": event_id,
226 "reason": format!(
227 "schema_mismatch={declared} binary_supports={ours}"
228 ),
229 "blocks_cursor": true,
230 "transient": true,
231 "schema_version": declared,
232 }));
233 if first_block_idx.is_none() {
234 first_block_idx = Some(idx);
235 }
236 continue;
237 }
238 }
239
240 // P0.1: unknown kind → transient, block cursor, fail loud.
241 if !is_known_kind(kind) {
242 let reason = format!("unknown_kind={kind} binary_version={binary_version}");
243 rejected.push(json!({
244 "event_id": event_id,
245 "reason": reason,
246 "blocks_cursor": true,
247 "transient": true,
248 }));
249 if first_block_idx.is_none() {
250 first_block_idx = Some(idx);
251 }
252 continue;
253 }
254
255 // pair_drop / pair_drop_ack — pre-trust side effects that pin sender.
256 let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
257 Ok(Some(_)) => true,
258 Ok(None) => false,
259 Err(e) => {
260 // P0.2: a pair_drop that WAS recognised (kind=1100, type=pair_drop)
261 // but FAILED during consumption is exactly the silent-fail class —
262 // sender expects to be pinned but isn't, and never finds out. Log
263 // + structured record for `wire doctor`.
264 let peer_handle = event
265 .get("from")
266 .and_then(Value::as_str)
267 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
268 .unwrap_or_else(|| "<unknown>".to_string());
269 eprintln!(
270 "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
271 sender will not be pinned; have them re-add or retry."
272 );
273 pair_invite::record_pair_rejection(
274 &peer_handle,
275 "pair_drop_consume_failed",
276 &e.to_string(),
277 );
278 false
279 }
280 };
281 // pair_drop_ack carries the peer's relay coordinates (relay_url /
282 // slot_id / slot_token) and, on consume, OVERWRITES our pinned
283 // endpoints for that peer + stamps the durable bilateral marker.
284 // Those are machine-trusting side effects, so they must not run on
285 // an unverified event: a forged kind=1101 claiming `from: <peer>`
286 // with attacker relay coords would otherwise redirect all our
287 // outbound traffic to that peer into the attacker's relay. We pin
288 // the peer in trust at dial time (`cmd_add`), so a legitimate ack's
289 // sender is always already pinned and verifies here; an ack we
290 // can't verify (forged, or for a peer we never dialed) is dropped
291 // before it can touch relay state. Verify against fresh trust so an
292 // earlier pair_drop in this same batch that pinned the sender is
293 // visible.
294 if event.get("kind").and_then(Value::as_u64) == Some(1101) {
295 let ack_trust = config::read_trust()?;
296 match signing::verify_message_v31(event, &ack_trust) {
297 Ok(()) => {
298 if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
299 let peer_handle = event
300 .get("from")
301 .and_then(Value::as_str)
302 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
303 .unwrap_or_else(|| "<unknown>".to_string());
304 eprintln!(
305 "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
306 their slot_token NOT recorded; we cannot `wire send` to them \
307 until they retry."
308 );
309 pair_invite::record_pair_rejection(
310 &peer_handle,
311 "pair_drop_ack_consume_failed",
312 &e.to_string(),
313 );
314 }
315 }
316 Err(e) => {
317 let peer_handle = event
318 .get("from")
319 .and_then(Value::as_str)
320 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
321 .unwrap_or_else(|| "<unknown>".to_string());
322 eprintln!(
323 "wire pull: DROPPING unverified pair_drop_ack from {peer_handle}: {e}. \
324 relay endpoints NOT updated (sender not pinned, or signature forged)."
325 );
326 pair_invite::record_pair_rejection(
327 &peer_handle,
328 "pair_drop_ack_unverified",
329 &e.to_string(),
330 );
331 }
332 }
333 }
334 let mut active_trust = if drop_paired {
335 config::read_trust()?
336 } else {
337 trust_snapshot.clone()
338 };
339
340 // Flow-like-water: same-machine sister sessions are the same operator,
341 // same uid, same disk — mutually trusted by construction. If an inbound
342 // event is signed by a recognized local sister we haven't pinned yet,
343 // pin it VERIFIED and re-verify — no pending-inbound, no accept step, no
344 // gate. The match is on the signer's full DID against our own session
345 // registry (a DID commits to its key, so a remote peer cannot forge one
346 // that collides with a local sister's). This is what makes
347 // `wire dial <sister>` a frictionless mutual pairing: the dialer pins
348 // the target, and the target auto-pins the dialer the instant its first
349 // event arrives. Persisted so the pin outlives this pull.
350 let verify = match signing::verify_message_v31(event, &active_trust) {
351 Ok(()) => Ok(()),
352 Err(e) => {
353 if maybe_autopin_local_sister(event, &mut active_trust) {
354 let _ = config::write_trust(&active_trust);
355 signing::verify_message_v31(event, &active_trust)
356 } else {
357 Err(e)
358 }
359 }
360 };
361
362 match verify {
363 Ok(()) => {
364 let from = event
365 .get("from")
366 .and_then(Value::as_str)
367 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
368 .unwrap_or_else(|| "unknown".to_string());
369 let path = inbox_dir.join(format!("{from}.jsonl"));
370
371 // P0.X (0.5.11): dedupe-on-write. Spark reported 3 duplicate
372 // pair_drop_ack events landing in their inbox same second,
373 // same event_id. Relay double-store or push retry-after-
374 // success can re-deliver. Inbox should be content-unique by
375 // event_id.
376 if inbox_already_contains(&path, &event_id) {
377 rejected.push(json!({
378 "event_id": event_id,
379 "reason": "duplicate event_id already in inbox",
380 "blocks_cursor": false,
381 "transient": false,
382 }));
383 if first_block_idx.is_none() {
384 last_advanced = Some(event_id.clone());
385 }
386 continue;
387 }
388
389 use std::io::Write;
390 let mut f = std::fs::OpenOptions::new()
391 .create(true)
392 .append(true)
393 .open(&path)?;
394 let mut line = serde_json::to_vec(event)?;
395 line.push(b'\n');
396 f.write_all(&line)?;
397 // v0.14.3 (#14): also surface the event timestamp so the
398 // caller (run_sync_pull) can stamp
399 // `relay_state.peers[<from>].last_inbound_event_at`
400 // — sender-side staleness needs a daemon-written
401 // signal, not inbox-file mtime (mtime breaks on
402 // backup/restore/touch and has fs-specific resolution).
403 let ts = event
404 .get("timestamp")
405 .and_then(Value::as_str)
406 .unwrap_or("")
407 .to_string();
408 written.push(json!({
409 "event_id": event_id,
410 "from": from,
411 "timestamp": ts,
412 }));
413 if first_block_idx.is_none() {
414 last_advanced = Some(event_id.clone());
415 }
416 }
417 Err(e) if verify_error_is_transient(&e) => {
418 rejected.push(json!({
419 "event_id": event_id,
420 "reason": e.to_string(),
421 "blocks_cursor": true,
422 "transient": true,
423 }));
424 if first_block_idx.is_none() {
425 first_block_idx = Some(idx);
426 }
427 }
428 Err(e) => {
429 rejected.push(json!({
430 "event_id": event_id,
431 "reason": e.to_string(),
432 "blocks_cursor": false,
433 "transient": false,
434 }));
435 if first_block_idx.is_none() {
436 last_advanced = Some(event_id.clone());
437 }
438 }
439 }
440 }
441
442 let result = PullResult {
443 written: written.clone(),
444 rejected: rejected.clone(),
445 advance_cursor_to: last_advanced.clone(),
446 blocked: first_block_idx.is_some(),
447 };
448
449 // P2.10: structured trace. No-op when WIRE_DIAG is not set; one line
450 // per pull when it is. Enough signal for `wire diag tail` to replay
451 // a session.
452 crate::diag::emit(
453 "pull",
454 json!({
455 "events_in": events.len(),
456 "written": result.written.len(),
457 "rejected": result.rejected.len(),
458 "blocked": result.blocked,
459 "advance_to": result.advance_cursor_to,
460 }),
461 );
462
463 Ok(result)
464}
465
466#[cfg(test)]
467mod tests {
468 use super::*;
469 use serde_json::json;
470
471 #[test]
472 fn known_kinds_recognised() {
473 // Special cases.
474 assert!(is_known_kind(1));
475 assert!(is_known_kind(100));
476 // Named v0.1 kinds.
477 assert!(is_known_kind(1000));
478 assert!(is_known_kind(1100));
479 assert!(is_known_kind(1101));
480 assert!(is_known_kind(1201));
481 }
482
483 #[test]
484 fn unknown_kinds_rejected() {
485 assert!(!is_known_kind(0));
486 assert!(!is_known_kind(9999));
487 assert!(!is_known_kind(1099));
488 assert!(!is_known_kind(50000));
489 }
490
491 #[test]
492 fn unknown_kind_rejection_carries_binary_version_and_kind() {
493 // Spark's E. rule: the silent failure must be loud. Reject reason
494 // must name both the offending kind AND the binary version so an
495 // operator running `wire pull --json` sees instantly which side is
496 // behind.
497 crate::config::test_support::with_temp_home(|| {
498 crate::config::ensure_dirs().unwrap();
499 let inbox = crate::config::inbox_dir().unwrap();
500
501 let event = json!({
502 "event_id": "deadbeef",
503 "kind": 9999u32,
504 "type": "speculation",
505 "from": "did:wire:future-peer",
506 });
507
508 let result =
509 process_events(&[event], Some("prior-cursor".to_string()), &inbox).unwrap();
510
511 assert_eq!(result.rejected.len(), 1);
512 let reason = result.rejected[0]["reason"].as_str().unwrap();
513 assert!(
514 reason.contains("unknown_kind=9999"),
515 "reason missing kind: {reason}"
516 );
517 assert!(
518 reason.contains("binary_version="),
519 "reason missing binary_version: {reason}"
520 );
521 assert_eq!(result.rejected[0]["blocks_cursor"], true);
522
523 // Cursor MUST NOT advance past unknown event.
524 assert_eq!(
525 result.advance_cursor_to,
526 Some("prior-cursor".to_string()),
527 "cursor advanced past unknown kind — silent drop regression"
528 );
529 assert!(result.blocked);
530 });
531 }
532
533 #[test]
534 fn schema_mismatch_blocks_cursor_with_reason_shape() {
535 // P0.Z lock-in: format of the rejection reason. Spark + I agreed on
536 // exact shape `schema_mismatch=v3.2 binary_supports=v3.1` so an
537 // operator running `wire pull --json | jq` can grep for it.
538 // Wrong major (v4 vs v3) -> reject.
539 crate::config::test_support::with_temp_home(|| {
540 crate::config::ensure_dirs().unwrap();
541 let inbox = crate::config::inbox_dir().unwrap();
542 let event = json!({
543 "event_id": "future-binary",
544 "schema_version": "v4.0",
545 "kind": 1000u32,
546 "type": "decision",
547 "from": "did:wire:future",
548 });
549 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
550 assert_eq!(result.rejected.len(), 1);
551 let reason = result.rejected[0]["reason"].as_str().unwrap();
552 assert!(reason.contains("schema_mismatch=v4.0"));
553 assert!(reason.contains("binary_supports=v3.1"));
554 assert_eq!(result.rejected[0]["blocks_cursor"], true);
555 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
556 });
557 }
558
559 #[test]
560 fn schema_minor_bump_within_same_major_is_accepted() {
561 // v3.2 from a slightly-newer peer is still v3 major — must NOT be
562 // rejected just because the minor differs. Otherwise we lock the
563 // protocol to whoever shipped first.
564 crate::config::test_support::with_temp_home(|| {
565 crate::config::ensure_dirs().unwrap();
566 let inbox = crate::config::inbox_dir().unwrap();
567 let event = json!({
568 "event_id": "minor-bump",
569 "schema_version": "v3.2",
570 "kind": 1000u32,
571 "type": "decision",
572 "from": "did:wire:peer-not-in-trust",
573 });
574 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
575 // Schema check passes, falls through to verify which rejects
576 // for trust reasons (transient blocks_cursor=true). Either way,
577 // the reason must NOT be a schema_mismatch.
578 let reason = result.rejected[0]["reason"].as_str().unwrap();
579 assert!(
580 !reason.contains("schema_mismatch"),
581 "minor bump should not be schema_mismatch: {reason}"
582 );
583 });
584 }
585
586 #[test]
587 fn legacy_event_without_schema_version_field_is_accepted() {
588 // Pre-0.5.11 events have no schema_version. Reject on absence
589 // would lock us out from every pre-existing inbox + every peer
590 // that hasn't upgraded yet. Absent field = accept (transient
591 // verify-rejection later is fine, just not a schema rejection).
592 crate::config::test_support::with_temp_home(|| {
593 crate::config::ensure_dirs().unwrap();
594 let inbox = crate::config::inbox_dir().unwrap();
595 let event = json!({
596 "event_id": "legacy",
597 "kind": 1000u32,
598 "type": "decision",
599 "from": "did:wire:legacy-peer",
600 });
601 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
602 let reason = result.rejected[0]["reason"].as_str().unwrap();
603 assert!(!reason.contains("schema_mismatch"));
604 });
605 }
606
607 #[test]
608 fn forged_pair_drop_ack_does_not_mutate_relay_endpoints() {
609 // Security regression: a kind=1101 pair_drop_ack from a sender we
610 // never pinned (forged `from`, attacker relay coords) must NOT
611 // overwrite our relay endpoints. Pre-fix, the ack was consumed
612 // before signature verification, letting an attacker redirect our
613 // outbound traffic for the impersonated peer to their own relay.
614 crate::config::test_support::with_temp_home(|| {
615 crate::config::ensure_dirs().unwrap();
616 let inbox = crate::config::inbox_dir().unwrap();
617 let forged = json!({
618 "event_id": "forged-ack-0001",
619 "kind": 1101u32,
620 "type": "pair_drop_ack",
621 "from": "did:wire:victimpeer-deadbeef",
622 "body": {
623 "relay_url": "https://attacker.example",
624 "slot_id": "attackerslot",
625 "slot_token": "attackertoken",
626 },
627 });
628 // Sender is NOT in trust → verify must fail → no mutation.
629 let _ = process_events(&[forged], Some("c".to_string()), &inbox).unwrap();
630 let relay_state = crate::config::read_relay_state().unwrap();
631 let peers = relay_state.get("peers").and_then(Value::as_object);
632 assert!(
633 peers.is_none_or(|m| !m.contains_key("victimpeer")),
634 "forged ack must not pin endpoints for the impersonated peer; \
635 relay_state.peers = {peers:?}"
636 );
637 });
638 }
639
640 #[test]
641 fn inbox_dedupe_skips_duplicate_event_id() {
642 // P0.X smoke: spark's bug — same event_id arriving twice in the
643 // same inbox file produces only ONE inbox line. The pull result
644 // surfaces the duplicate as rejected[] with a clear reason so
645 // operators see what's happening (vs silently dropping).
646 let tmp = std::env::temp_dir().join(format!(
647 "wire-dedupe-test-{}-{}",
648 std::process::id(),
649 rand::random::<u32>()
650 ));
651 std::fs::create_dir_all(&tmp).unwrap();
652 let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
653 let existing_line = json!({
654 "event_id": event_id,
655 "from": "did:wire:peer",
656 "type": "claim",
657 "body": "first occurrence",
658 });
659 let path = tmp.join("peer.jsonl");
660 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
661 assert!(inbox_already_contains(&path, event_id));
662 assert!(!inbox_already_contains(&path, "different-event-id"));
663 assert!(!inbox_already_contains(&path, ""));
664 }
665
666 #[test]
667 fn inbox_dedupe_substring_in_body_is_not_false_positive() {
668 // Adversarial: event_id substring inside a body field shouldn't
669 // count as the event already being present.
670 let tmp = std::env::temp_dir().join(format!(
671 "wire-dedupe-substring-{}-{}",
672 std::process::id(),
673 rand::random::<u32>()
674 ));
675 std::fs::create_dir_all(&tmp).unwrap();
676 let target_eid = "deadbeefcafebabe";
677 // Existing line has the target eid AS A STRING INSIDE the body,
678 // NOT as the event_id field.
679 let existing_line = json!({
680 "event_id": "different",
681 "from": "did:wire:peer",
682 "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
683 });
684 let path = tmp.join("peer.jsonl");
685 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
686 // Substring screen sees the eid in the body, but the line-parse
687 // confirmation rejects it.
688 assert!(!inbox_already_contains(&path, target_eid));
689 }
690
691 #[test]
692 fn known_kind_after_unknown_does_not_advance_cursor() {
693 // Block rule: once first event blocks, NO later event can advance
694 // the cursor past it, even if later events would otherwise succeed.
695 // Re-pull observes both → visible.
696 crate::config::test_support::with_temp_home(|| {
697 crate::config::ensure_dirs().unwrap();
698 let inbox = crate::config::inbox_dir().unwrap();
699
700 let events = vec![
701 json!({
702 "event_id": "evt-unknown",
703 "kind": 9999u32,
704 "type": "speculation",
705 "from": "did:wire:future",
706 }),
707 json!({
708 "event_id": "evt-known-but-untrusted",
709 "kind": 1000u32,
710 "type": "decision",
711 "from": "did:wire:peer-not-in-trust",
712 }),
713 ];
714
715 let result = process_events(&events, Some("prior".to_string()), &inbox).unwrap();
716
717 assert_eq!(result.rejected.len(), 2);
718 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
719 assert!(result.blocked);
720 });
721 }
722}