1use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40pub struct PullResult {
42 pub written: Vec<Value>,
43 pub rejected: Vec<Value>,
44 pub advance_cursor_to: Option<String>,
48 pub blocked: bool,
52}
53
54fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62 if event_id.is_empty() {
63 return false;
64 }
65 let body = match std::fs::read_to_string(path) {
66 Ok(b) => b,
67 Err(_) => return false,
68 };
69 let needle = format!("\"event_id\":\"{event_id}\"");
73 if !body.contains(&needle) {
74 return false;
75 }
76 for line in body.lines() {
79 let trimmed = line.trim();
80 if trimmed.is_empty() {
81 continue;
82 }
83 if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84 && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85 {
86 return true;
87 }
88 }
89 false
90}
91
92pub fn is_known_kind(kind: u32) -> bool {
98 if kind == 1 || kind == 100 {
99 return true;
100 }
101 signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107 matches!(
108 err,
109 signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110 )
111}
112
113pub fn process_events(
122 events: &[Value],
123 initial_cursor: Option<String>,
124 inbox_dir: &Path,
125) -> Result<PullResult> {
126 let binary_version = env!("CARGO_PKG_VERSION");
127 let trust_snapshot = config::read_trust()?;
128
129 let mut written = Vec::new();
130 let mut rejected = Vec::new();
131 let mut last_advanced = initial_cursor.clone();
132 let mut first_block_idx: Option<usize> = None;
133
134 for (idx, event) in events.iter().enumerate() {
135 let event_id = event
136 .get("event_id")
137 .and_then(Value::as_str)
138 .unwrap_or("")
139 .to_string();
140 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
141
142 if let Some(declared) = event
148 .get("schema_version")
149 .and_then(Value::as_str)
150 {
151 let ours = signing::EVENT_SCHEMA_VERSION;
152 if signing::schema_major(declared) != signing::schema_major(ours) {
153 rejected.push(json!({
154 "event_id": event_id,
155 "reason": format!(
156 "schema_mismatch={declared} binary_supports={ours}"
157 ),
158 "blocks_cursor": true,
159 "transient": true,
160 "schema_version": declared,
161 }));
162 if first_block_idx.is_none() {
163 first_block_idx = Some(idx);
164 }
165 continue;
166 }
167 }
168
169 if !is_known_kind(kind) {
171 let reason = format!(
172 "unknown_kind={kind} binary_version={binary_version}"
173 );
174 rejected.push(json!({
175 "event_id": event_id,
176 "reason": reason,
177 "blocks_cursor": true,
178 "transient": true,
179 }));
180 if first_block_idx.is_none() {
181 first_block_idx = Some(idx);
182 }
183 continue;
184 }
185
186 let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
188 Ok(Some(_)) => true,
189 Ok(None) => false,
190 Err(e) => {
191 let peer_handle = event
196 .get("from")
197 .and_then(Value::as_str)
198 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
199 .unwrap_or_else(|| "<unknown>".to_string());
200 eprintln!(
201 "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
202 sender will not be pinned; have them re-add or retry."
203 );
204 pair_invite::record_pair_rejection(
205 &peer_handle,
206 "pair_drop_consume_failed",
207 &e.to_string(),
208 );
209 false
210 }
211 };
212 if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
213 let peer_handle = event
214 .get("from")
215 .and_then(Value::as_str)
216 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
217 .unwrap_or_else(|| "<unknown>".to_string());
218 eprintln!(
219 "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
220 their slot_token NOT recorded; we cannot `wire send` to them \
221 until they retry."
222 );
223 pair_invite::record_pair_rejection(
224 &peer_handle,
225 "pair_drop_ack_consume_failed",
226 &e.to_string(),
227 );
228 }
229 let active_trust = if drop_paired {
230 config::read_trust()?
231 } else {
232 trust_snapshot.clone()
233 };
234
235 match signing::verify_message_v31(event, &active_trust) {
236 Ok(()) => {
237 let from = event
238 .get("from")
239 .and_then(Value::as_str)
240 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
241 .unwrap_or_else(|| "unknown".to_string());
242 let path = inbox_dir.join(format!("{from}.jsonl"));
243
244 if inbox_already_contains(&path, &event_id) {
250 rejected.push(json!({
251 "event_id": event_id,
252 "reason": "duplicate event_id already in inbox",
253 "blocks_cursor": false,
254 "transient": false,
255 }));
256 if first_block_idx.is_none() {
257 last_advanced = Some(event_id.clone());
258 }
259 continue;
260 }
261
262 use std::io::Write;
263 let mut f = std::fs::OpenOptions::new()
264 .create(true)
265 .append(true)
266 .open(&path)?;
267 let mut line = serde_json::to_vec(event)?;
268 line.push(b'\n');
269 f.write_all(&line)?;
270 written.push(json!({"event_id": event_id, "from": from}));
271 if first_block_idx.is_none() {
272 last_advanced = Some(event_id.clone());
273 }
274 }
275 Err(e) if verify_error_is_transient(&e) => {
276 rejected.push(json!({
277 "event_id": event_id,
278 "reason": e.to_string(),
279 "blocks_cursor": true,
280 "transient": true,
281 }));
282 if first_block_idx.is_none() {
283 first_block_idx = Some(idx);
284 }
285 }
286 Err(e) => {
287 rejected.push(json!({
288 "event_id": event_id,
289 "reason": e.to_string(),
290 "blocks_cursor": false,
291 "transient": false,
292 }));
293 if first_block_idx.is_none() {
294 last_advanced = Some(event_id.clone());
295 }
296 }
297 }
298 }
299
300 let result = PullResult {
301 written: written.clone(),
302 rejected: rejected.clone(),
303 advance_cursor_to: last_advanced.clone(),
304 blocked: first_block_idx.is_some(),
305 };
306
307 crate::diag::emit(
311 "pull",
312 json!({
313 "events_in": events.len(),
314 "written": result.written.len(),
315 "rejected": result.rejected.len(),
316 "blocked": result.blocked,
317 "advance_to": result.advance_cursor_to,
318 }),
319 );
320
321 Ok(result)
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327 use serde_json::json;
328
329 #[test]
330 fn known_kinds_recognised() {
331 assert!(is_known_kind(1));
333 assert!(is_known_kind(100));
334 assert!(is_known_kind(1000));
336 assert!(is_known_kind(1100));
337 assert!(is_known_kind(1101));
338 assert!(is_known_kind(1201));
339 }
340
341 #[test]
342 fn unknown_kinds_rejected() {
343 assert!(!is_known_kind(0));
344 assert!(!is_known_kind(9999));
345 assert!(!is_known_kind(1099));
346 assert!(!is_known_kind(50000));
347 }
348
349 #[test]
350 fn unknown_kind_rejection_carries_binary_version_and_kind() {
351 crate::config::test_support::with_temp_home(|| {
356 crate::config::ensure_dirs().unwrap();
357 let inbox = crate::config::inbox_dir().unwrap();
358
359 let event = json!({
360 "event_id": "deadbeef",
361 "kind": 9999u32,
362 "type": "speculation",
363 "from": "did:wire:future-peer",
364 });
365
366 let result = process_events(
367 &[event],
368 Some("prior-cursor".to_string()),
369 &inbox,
370 )
371 .unwrap();
372
373 assert_eq!(result.rejected.len(), 1);
374 let reason = result.rejected[0]["reason"].as_str().unwrap();
375 assert!(
376 reason.contains("unknown_kind=9999"),
377 "reason missing kind: {reason}"
378 );
379 assert!(
380 reason.contains("binary_version="),
381 "reason missing binary_version: {reason}"
382 );
383 assert_eq!(result.rejected[0]["blocks_cursor"], true);
384
385 assert_eq!(
387 result.advance_cursor_to,
388 Some("prior-cursor".to_string()),
389 "cursor advanced past unknown kind — silent drop regression"
390 );
391 assert!(result.blocked);
392 });
393 }
394
395 #[test]
396 fn schema_mismatch_blocks_cursor_with_reason_shape() {
397 crate::config::test_support::with_temp_home(|| {
402 crate::config::ensure_dirs().unwrap();
403 let inbox = crate::config::inbox_dir().unwrap();
404 let event = json!({
405 "event_id": "future-binary",
406 "schema_version": "v4.0",
407 "kind": 1000u32,
408 "type": "decision",
409 "from": "did:wire:future",
410 });
411 let result = process_events(&[event], Some("prior".to_string()), &inbox)
412 .unwrap();
413 assert_eq!(result.rejected.len(), 1);
414 let reason = result.rejected[0]["reason"].as_str().unwrap();
415 assert!(reason.contains("schema_mismatch=v4.0"));
416 assert!(reason.contains("binary_supports=v3.1"));
417 assert_eq!(result.rejected[0]["blocks_cursor"], true);
418 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
419 });
420 }
421
422 #[test]
423 fn schema_minor_bump_within_same_major_is_accepted() {
424 crate::config::test_support::with_temp_home(|| {
428 crate::config::ensure_dirs().unwrap();
429 let inbox = crate::config::inbox_dir().unwrap();
430 let event = json!({
431 "event_id": "minor-bump",
432 "schema_version": "v3.2",
433 "kind": 1000u32,
434 "type": "decision",
435 "from": "did:wire:peer-not-in-trust",
436 });
437 let result = process_events(&[event], Some("prior".to_string()), &inbox)
438 .unwrap();
439 let reason = result.rejected[0]["reason"].as_str().unwrap();
443 assert!(
444 !reason.contains("schema_mismatch"),
445 "minor bump should not be schema_mismatch: {reason}"
446 );
447 });
448 }
449
450 #[test]
451 fn legacy_event_without_schema_version_field_is_accepted() {
452 crate::config::test_support::with_temp_home(|| {
457 crate::config::ensure_dirs().unwrap();
458 let inbox = crate::config::inbox_dir().unwrap();
459 let event = json!({
460 "event_id": "legacy",
461 "kind": 1000u32,
462 "type": "decision",
463 "from": "did:wire:legacy-peer",
464 });
465 let result = process_events(&[event], Some("prior".to_string()), &inbox)
466 .unwrap();
467 let reason = result.rejected[0]["reason"].as_str().unwrap();
468 assert!(!reason.contains("schema_mismatch"));
469 });
470 }
471
472 #[test]
473 fn inbox_dedupe_skips_duplicate_event_id() {
474 let tmp = std::env::temp_dir().join(format!(
479 "wire-dedupe-test-{}-{}",
480 std::process::id(),
481 rand::random::<u32>()
482 ));
483 std::fs::create_dir_all(&tmp).unwrap();
484 let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
485 let existing_line = json!({
486 "event_id": event_id,
487 "from": "did:wire:peer",
488 "type": "claim",
489 "body": "first occurrence",
490 });
491 let path = tmp.join("peer.jsonl");
492 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
493 assert!(inbox_already_contains(&path, event_id));
494 assert!(!inbox_already_contains(&path, "different-event-id"));
495 assert!(!inbox_already_contains(&path, ""));
496 }
497
498 #[test]
499 fn inbox_dedupe_substring_in_body_is_not_false_positive() {
500 let tmp = std::env::temp_dir().join(format!(
503 "wire-dedupe-substring-{}-{}",
504 std::process::id(),
505 rand::random::<u32>()
506 ));
507 std::fs::create_dir_all(&tmp).unwrap();
508 let target_eid = "deadbeefcafebabe";
509 let existing_line = json!({
512 "event_id": "different",
513 "from": "did:wire:peer",
514 "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
515 });
516 let path = tmp.join("peer.jsonl");
517 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
518 assert!(!inbox_already_contains(&path, target_eid));
521 }
522
523 #[test]
524 fn known_kind_after_unknown_does_not_advance_cursor() {
525 crate::config::test_support::with_temp_home(|| {
529 crate::config::ensure_dirs().unwrap();
530 let inbox = crate::config::inbox_dir().unwrap();
531
532 let events = vec![
533 json!({
534 "event_id": "evt-unknown",
535 "kind": 9999u32,
536 "type": "speculation",
537 "from": "did:wire:future",
538 }),
539 json!({
540 "event_id": "evt-known-but-untrusted",
541 "kind": 1000u32,
542 "type": "decision",
543 "from": "did:wire:peer-not-in-trust",
544 }),
545 ];
546
547 let result = process_events(
548 &events,
549 Some("prior".to_string()),
550 &inbox,
551 )
552 .unwrap();
553
554 assert_eq!(result.rejected.len(), 2);
555 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
556 assert!(result.blocked);
557 });
558 }
559}