1use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40pub struct PullResult {
42 pub written: Vec<Value>,
43 pub rejected: Vec<Value>,
44 pub advance_cursor_to: Option<String>,
48 pub blocked: bool,
52}
53
54fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62 if event_id.is_empty() {
63 return false;
64 }
65 let body = match std::fs::read_to_string(path) {
66 Ok(b) => b,
67 Err(_) => return false,
68 };
69 let needle = format!("\"event_id\":\"{event_id}\"");
73 if !body.contains(&needle) {
74 return false;
75 }
76 for line in body.lines() {
79 let trimmed = line.trim();
80 if trimmed.is_empty() {
81 continue;
82 }
83 if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84 && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85 {
86 return true;
87 }
88 }
89 false
90}
91
92pub fn is_known_kind(kind: u32) -> bool {
98 if kind == 1 || kind == 100 {
99 return true;
100 }
101 signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107 matches!(
108 err,
109 signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110 )
111}
112
113pub fn process_events(
122 events: &[Value],
123 initial_cursor: Option<String>,
124 inbox_dir: &Path,
125) -> Result<PullResult> {
126 let binary_version = env!("CARGO_PKG_VERSION");
127 let trust_snapshot = config::read_trust()?;
128
129 let mut written = Vec::new();
130 let mut rejected = Vec::new();
131 let mut last_advanced = initial_cursor.clone();
132 let mut first_block_idx: Option<usize> = None;
133
134 for (idx, event) in events.iter().enumerate() {
135 let event_id = event
136 .get("event_id")
137 .and_then(Value::as_str)
138 .unwrap_or("")
139 .to_string();
140 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
141
142 if let Some(declared) = event.get("schema_version").and_then(Value::as_str) {
148 let ours = signing::EVENT_SCHEMA_VERSION;
149 if signing::schema_major(declared) != signing::schema_major(ours) {
150 rejected.push(json!({
151 "event_id": event_id,
152 "reason": format!(
153 "schema_mismatch={declared} binary_supports={ours}"
154 ),
155 "blocks_cursor": true,
156 "transient": true,
157 "schema_version": declared,
158 }));
159 if first_block_idx.is_none() {
160 first_block_idx = Some(idx);
161 }
162 continue;
163 }
164 }
165
166 if !is_known_kind(kind) {
168 let reason = format!("unknown_kind={kind} binary_version={binary_version}");
169 rejected.push(json!({
170 "event_id": event_id,
171 "reason": reason,
172 "blocks_cursor": true,
173 "transient": true,
174 }));
175 if first_block_idx.is_none() {
176 first_block_idx = Some(idx);
177 }
178 continue;
179 }
180
181 let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
183 Ok(Some(_)) => true,
184 Ok(None) => false,
185 Err(e) => {
186 let peer_handle = event
191 .get("from")
192 .and_then(Value::as_str)
193 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
194 .unwrap_or_else(|| "<unknown>".to_string());
195 eprintln!(
196 "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
197 sender will not be pinned; have them re-add or retry."
198 );
199 pair_invite::record_pair_rejection(
200 &peer_handle,
201 "pair_drop_consume_failed",
202 &e.to_string(),
203 );
204 false
205 }
206 };
207 if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
208 let peer_handle = event
209 .get("from")
210 .and_then(Value::as_str)
211 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
212 .unwrap_or_else(|| "<unknown>".to_string());
213 eprintln!(
214 "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
215 their slot_token NOT recorded; we cannot `wire send` to them \
216 until they retry."
217 );
218 pair_invite::record_pair_rejection(
219 &peer_handle,
220 "pair_drop_ack_consume_failed",
221 &e.to_string(),
222 );
223 }
224 let active_trust = if drop_paired {
225 config::read_trust()?
226 } else {
227 trust_snapshot.clone()
228 };
229
230 match signing::verify_message_v31(event, &active_trust) {
231 Ok(()) => {
232 let from = event
233 .get("from")
234 .and_then(Value::as_str)
235 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
236 .unwrap_or_else(|| "unknown".to_string());
237 let path = inbox_dir.join(format!("{from}.jsonl"));
238
239 if inbox_already_contains(&path, &event_id) {
245 rejected.push(json!({
246 "event_id": event_id,
247 "reason": "duplicate event_id already in inbox",
248 "blocks_cursor": false,
249 "transient": false,
250 }));
251 if first_block_idx.is_none() {
252 last_advanced = Some(event_id.clone());
253 }
254 continue;
255 }
256
257 use std::io::Write;
258 let mut f = std::fs::OpenOptions::new()
259 .create(true)
260 .append(true)
261 .open(&path)?;
262 let mut line = serde_json::to_vec(event)?;
263 line.push(b'\n');
264 f.write_all(&line)?;
265 written.push(json!({"event_id": event_id, "from": from}));
266 if first_block_idx.is_none() {
267 last_advanced = Some(event_id.clone());
268 }
269 }
270 Err(e) if verify_error_is_transient(&e) => {
271 rejected.push(json!({
272 "event_id": event_id,
273 "reason": e.to_string(),
274 "blocks_cursor": true,
275 "transient": true,
276 }));
277 if first_block_idx.is_none() {
278 first_block_idx = Some(idx);
279 }
280 }
281 Err(e) => {
282 rejected.push(json!({
283 "event_id": event_id,
284 "reason": e.to_string(),
285 "blocks_cursor": false,
286 "transient": false,
287 }));
288 if first_block_idx.is_none() {
289 last_advanced = Some(event_id.clone());
290 }
291 }
292 }
293 }
294
295 let result = PullResult {
296 written: written.clone(),
297 rejected: rejected.clone(),
298 advance_cursor_to: last_advanced.clone(),
299 blocked: first_block_idx.is_some(),
300 };
301
302 crate::diag::emit(
306 "pull",
307 json!({
308 "events_in": events.len(),
309 "written": result.written.len(),
310 "rejected": result.rejected.len(),
311 "blocked": result.blocked,
312 "advance_to": result.advance_cursor_to,
313 }),
314 );
315
316 Ok(result)
317}
318
319#[cfg(test)]
320mod tests {
321 use super::*;
322 use serde_json::json;
323
324 #[test]
325 fn known_kinds_recognised() {
326 assert!(is_known_kind(1));
328 assert!(is_known_kind(100));
329 assert!(is_known_kind(1000));
331 assert!(is_known_kind(1100));
332 assert!(is_known_kind(1101));
333 assert!(is_known_kind(1201));
334 }
335
336 #[test]
337 fn unknown_kinds_rejected() {
338 assert!(!is_known_kind(0));
339 assert!(!is_known_kind(9999));
340 assert!(!is_known_kind(1099));
341 assert!(!is_known_kind(50000));
342 }
343
344 #[test]
345 fn unknown_kind_rejection_carries_binary_version_and_kind() {
346 crate::config::test_support::with_temp_home(|| {
351 crate::config::ensure_dirs().unwrap();
352 let inbox = crate::config::inbox_dir().unwrap();
353
354 let event = json!({
355 "event_id": "deadbeef",
356 "kind": 9999u32,
357 "type": "speculation",
358 "from": "did:wire:future-peer",
359 });
360
361 let result =
362 process_events(&[event], Some("prior-cursor".to_string()), &inbox).unwrap();
363
364 assert_eq!(result.rejected.len(), 1);
365 let reason = result.rejected[0]["reason"].as_str().unwrap();
366 assert!(
367 reason.contains("unknown_kind=9999"),
368 "reason missing kind: {reason}"
369 );
370 assert!(
371 reason.contains("binary_version="),
372 "reason missing binary_version: {reason}"
373 );
374 assert_eq!(result.rejected[0]["blocks_cursor"], true);
375
376 assert_eq!(
378 result.advance_cursor_to,
379 Some("prior-cursor".to_string()),
380 "cursor advanced past unknown kind — silent drop regression"
381 );
382 assert!(result.blocked);
383 });
384 }
385
386 #[test]
387 fn schema_mismatch_blocks_cursor_with_reason_shape() {
388 crate::config::test_support::with_temp_home(|| {
393 crate::config::ensure_dirs().unwrap();
394 let inbox = crate::config::inbox_dir().unwrap();
395 let event = json!({
396 "event_id": "future-binary",
397 "schema_version": "v4.0",
398 "kind": 1000u32,
399 "type": "decision",
400 "from": "did:wire:future",
401 });
402 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
403 assert_eq!(result.rejected.len(), 1);
404 let reason = result.rejected[0]["reason"].as_str().unwrap();
405 assert!(reason.contains("schema_mismatch=v4.0"));
406 assert!(reason.contains("binary_supports=v3.1"));
407 assert_eq!(result.rejected[0]["blocks_cursor"], true);
408 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
409 });
410 }
411
412 #[test]
413 fn schema_minor_bump_within_same_major_is_accepted() {
414 crate::config::test_support::with_temp_home(|| {
418 crate::config::ensure_dirs().unwrap();
419 let inbox = crate::config::inbox_dir().unwrap();
420 let event = json!({
421 "event_id": "minor-bump",
422 "schema_version": "v3.2",
423 "kind": 1000u32,
424 "type": "decision",
425 "from": "did:wire:peer-not-in-trust",
426 });
427 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
428 let reason = result.rejected[0]["reason"].as_str().unwrap();
432 assert!(
433 !reason.contains("schema_mismatch"),
434 "minor bump should not be schema_mismatch: {reason}"
435 );
436 });
437 }
438
439 #[test]
440 fn legacy_event_without_schema_version_field_is_accepted() {
441 crate::config::test_support::with_temp_home(|| {
446 crate::config::ensure_dirs().unwrap();
447 let inbox = crate::config::inbox_dir().unwrap();
448 let event = json!({
449 "event_id": "legacy",
450 "kind": 1000u32,
451 "type": "decision",
452 "from": "did:wire:legacy-peer",
453 });
454 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
455 let reason = result.rejected[0]["reason"].as_str().unwrap();
456 assert!(!reason.contains("schema_mismatch"));
457 });
458 }
459
460 #[test]
461 fn inbox_dedupe_skips_duplicate_event_id() {
462 let tmp = std::env::temp_dir().join(format!(
467 "wire-dedupe-test-{}-{}",
468 std::process::id(),
469 rand::random::<u32>()
470 ));
471 std::fs::create_dir_all(&tmp).unwrap();
472 let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
473 let existing_line = json!({
474 "event_id": event_id,
475 "from": "did:wire:peer",
476 "type": "claim",
477 "body": "first occurrence",
478 });
479 let path = tmp.join("peer.jsonl");
480 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
481 assert!(inbox_already_contains(&path, event_id));
482 assert!(!inbox_already_contains(&path, "different-event-id"));
483 assert!(!inbox_already_contains(&path, ""));
484 }
485
486 #[test]
487 fn inbox_dedupe_substring_in_body_is_not_false_positive() {
488 let tmp = std::env::temp_dir().join(format!(
491 "wire-dedupe-substring-{}-{}",
492 std::process::id(),
493 rand::random::<u32>()
494 ));
495 std::fs::create_dir_all(&tmp).unwrap();
496 let target_eid = "deadbeefcafebabe";
497 let existing_line = json!({
500 "event_id": "different",
501 "from": "did:wire:peer",
502 "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
503 });
504 let path = tmp.join("peer.jsonl");
505 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
506 assert!(!inbox_already_contains(&path, target_eid));
509 }
510
511 #[test]
512 fn known_kind_after_unknown_does_not_advance_cursor() {
513 crate::config::test_support::with_temp_home(|| {
517 crate::config::ensure_dirs().unwrap();
518 let inbox = crate::config::inbox_dir().unwrap();
519
520 let events = vec![
521 json!({
522 "event_id": "evt-unknown",
523 "kind": 9999u32,
524 "type": "speculation",
525 "from": "did:wire:future",
526 }),
527 json!({
528 "event_id": "evt-known-but-untrusted",
529 "kind": 1000u32,
530 "type": "decision",
531 "from": "did:wire:peer-not-in-trust",
532 }),
533 ];
534
535 let result = process_events(&events, Some("prior".to_string()), &inbox).unwrap();
536
537 assert_eq!(result.rejected.len(), 2);
538 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
539 assert!(result.blocked);
540 });
541 }
542}