1use anyhow::Result;
35use serde_json::{Value, json};
36use std::path::Path;
37
38use crate::{config, pair_invite, signing};
39
40pub struct PullResult {
42 pub written: Vec<Value>,
43 pub rejected: Vec<Value>,
44 pub advance_cursor_to: Option<String>,
48 pub blocked: bool,
52}
53
54fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
62 if event_id.is_empty() {
63 return false;
64 }
65 let body = match std::fs::read_to_string(path) {
66 Ok(b) => b,
67 Err(_) => return false,
68 };
69 let needle = format!("\"event_id\":\"{event_id}\"");
73 if !body.contains(&needle) {
74 return false;
75 }
76 for line in body.lines() {
79 let trimmed = line.trim();
80 if trimmed.is_empty() {
81 continue;
82 }
83 if let Ok(v) = serde_json::from_str::<Value>(trimmed)
84 && v.get("event_id").and_then(Value::as_str) == Some(event_id)
85 {
86 return true;
87 }
88 }
89 false
90}
91
92pub fn is_known_kind(kind: u32) -> bool {
98 if kind == 1 || kind == 100 {
99 return true;
100 }
101 signing::kinds().iter().any(|(k, _)| *k == kind)
102}
103
104fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
107 matches!(
108 err,
109 signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
110 )
111}
112
113pub fn process_events(
122 events: &[Value],
123 initial_cursor: Option<String>,
124 inbox_dir: &Path,
125) -> Result<PullResult> {
126 let binary_version = env!("CARGO_PKG_VERSION");
127 let trust_snapshot = config::read_trust()?;
128
129 let mut written = Vec::new();
130 let mut rejected = Vec::new();
131 let mut last_advanced = initial_cursor.clone();
132 let mut first_block_idx: Option<usize> = None;
133
134 for (idx, event) in events.iter().enumerate() {
135 let event_id = event
136 .get("event_id")
137 .and_then(Value::as_str)
138 .unwrap_or("")
139 .to_string();
140 let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
141
142 if let Some(declared) = event.get("schema_version").and_then(Value::as_str) {
148 let ours = signing::EVENT_SCHEMA_VERSION;
149 if signing::schema_major(declared) != signing::schema_major(ours) {
150 rejected.push(json!({
151 "event_id": event_id,
152 "reason": format!(
153 "schema_mismatch={declared} binary_supports={ours}"
154 ),
155 "blocks_cursor": true,
156 "transient": true,
157 "schema_version": declared,
158 }));
159 if first_block_idx.is_none() {
160 first_block_idx = Some(idx);
161 }
162 continue;
163 }
164 }
165
166 if !is_known_kind(kind) {
168 let reason = format!("unknown_kind={kind} binary_version={binary_version}");
169 rejected.push(json!({
170 "event_id": event_id,
171 "reason": reason,
172 "blocks_cursor": true,
173 "transient": true,
174 }));
175 if first_block_idx.is_none() {
176 first_block_idx = Some(idx);
177 }
178 continue;
179 }
180
181 let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
183 Ok(Some(_)) => true,
184 Ok(None) => false,
185 Err(e) => {
186 let peer_handle = event
191 .get("from")
192 .and_then(Value::as_str)
193 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
194 .unwrap_or_else(|| "<unknown>".to_string());
195 eprintln!(
196 "wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
197 sender will not be pinned; have them re-add or retry."
198 );
199 pair_invite::record_pair_rejection(
200 &peer_handle,
201 "pair_drop_consume_failed",
202 &e.to_string(),
203 );
204 false
205 }
206 };
207 if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
208 let peer_handle = event
209 .get("from")
210 .and_then(Value::as_str)
211 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
212 .unwrap_or_else(|| "<unknown>".to_string());
213 eprintln!(
214 "wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
215 their slot_token NOT recorded; we cannot `wire send` to them \
216 until they retry."
217 );
218 pair_invite::record_pair_rejection(
219 &peer_handle,
220 "pair_drop_ack_consume_failed",
221 &e.to_string(),
222 );
223 }
224 let active_trust = if drop_paired {
225 config::read_trust()?
226 } else {
227 trust_snapshot.clone()
228 };
229
230 match signing::verify_message_v31(event, &active_trust) {
231 Ok(()) => {
232 let from = event
233 .get("from")
234 .and_then(Value::as_str)
235 .map(|s| crate::agent_card::display_handle_from_did(s).to_string())
236 .unwrap_or_else(|| "unknown".to_string());
237 let path = inbox_dir.join(format!("{from}.jsonl"));
238
239 if inbox_already_contains(&path, &event_id) {
245 rejected.push(json!({
246 "event_id": event_id,
247 "reason": "duplicate event_id already in inbox",
248 "blocks_cursor": false,
249 "transient": false,
250 }));
251 if first_block_idx.is_none() {
252 last_advanced = Some(event_id.clone());
253 }
254 continue;
255 }
256
257 use std::io::Write;
258 let mut f = std::fs::OpenOptions::new()
259 .create(true)
260 .append(true)
261 .open(&path)?;
262 let mut line = serde_json::to_vec(event)?;
263 line.push(b'\n');
264 f.write_all(&line)?;
265 let ts = event
272 .get("timestamp")
273 .and_then(Value::as_str)
274 .unwrap_or("")
275 .to_string();
276 written.push(json!({
277 "event_id": event_id,
278 "from": from,
279 "timestamp": ts,
280 }));
281 if first_block_idx.is_none() {
282 last_advanced = Some(event_id.clone());
283 }
284 }
285 Err(e) if verify_error_is_transient(&e) => {
286 rejected.push(json!({
287 "event_id": event_id,
288 "reason": e.to_string(),
289 "blocks_cursor": true,
290 "transient": true,
291 }));
292 if first_block_idx.is_none() {
293 first_block_idx = Some(idx);
294 }
295 }
296 Err(e) => {
297 rejected.push(json!({
298 "event_id": event_id,
299 "reason": e.to_string(),
300 "blocks_cursor": false,
301 "transient": false,
302 }));
303 if first_block_idx.is_none() {
304 last_advanced = Some(event_id.clone());
305 }
306 }
307 }
308 }
309
310 let result = PullResult {
311 written: written.clone(),
312 rejected: rejected.clone(),
313 advance_cursor_to: last_advanced.clone(),
314 blocked: first_block_idx.is_some(),
315 };
316
317 crate::diag::emit(
321 "pull",
322 json!({
323 "events_in": events.len(),
324 "written": result.written.len(),
325 "rejected": result.rejected.len(),
326 "blocked": result.blocked,
327 "advance_to": result.advance_cursor_to,
328 }),
329 );
330
331 Ok(result)
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use serde_json::json;
338
339 #[test]
340 fn known_kinds_recognised() {
341 assert!(is_known_kind(1));
343 assert!(is_known_kind(100));
344 assert!(is_known_kind(1000));
346 assert!(is_known_kind(1100));
347 assert!(is_known_kind(1101));
348 assert!(is_known_kind(1201));
349 }
350
351 #[test]
352 fn unknown_kinds_rejected() {
353 assert!(!is_known_kind(0));
354 assert!(!is_known_kind(9999));
355 assert!(!is_known_kind(1099));
356 assert!(!is_known_kind(50000));
357 }
358
359 #[test]
360 fn unknown_kind_rejection_carries_binary_version_and_kind() {
361 crate::config::test_support::with_temp_home(|| {
366 crate::config::ensure_dirs().unwrap();
367 let inbox = crate::config::inbox_dir().unwrap();
368
369 let event = json!({
370 "event_id": "deadbeef",
371 "kind": 9999u32,
372 "type": "speculation",
373 "from": "did:wire:future-peer",
374 });
375
376 let result =
377 process_events(&[event], Some("prior-cursor".to_string()), &inbox).unwrap();
378
379 assert_eq!(result.rejected.len(), 1);
380 let reason = result.rejected[0]["reason"].as_str().unwrap();
381 assert!(
382 reason.contains("unknown_kind=9999"),
383 "reason missing kind: {reason}"
384 );
385 assert!(
386 reason.contains("binary_version="),
387 "reason missing binary_version: {reason}"
388 );
389 assert_eq!(result.rejected[0]["blocks_cursor"], true);
390
391 assert_eq!(
393 result.advance_cursor_to,
394 Some("prior-cursor".to_string()),
395 "cursor advanced past unknown kind — silent drop regression"
396 );
397 assert!(result.blocked);
398 });
399 }
400
401 #[test]
402 fn schema_mismatch_blocks_cursor_with_reason_shape() {
403 crate::config::test_support::with_temp_home(|| {
408 crate::config::ensure_dirs().unwrap();
409 let inbox = crate::config::inbox_dir().unwrap();
410 let event = json!({
411 "event_id": "future-binary",
412 "schema_version": "v4.0",
413 "kind": 1000u32,
414 "type": "decision",
415 "from": "did:wire:future",
416 });
417 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
418 assert_eq!(result.rejected.len(), 1);
419 let reason = result.rejected[0]["reason"].as_str().unwrap();
420 assert!(reason.contains("schema_mismatch=v4.0"));
421 assert!(reason.contains("binary_supports=v3.1"));
422 assert_eq!(result.rejected[0]["blocks_cursor"], true);
423 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
424 });
425 }
426
427 #[test]
428 fn schema_minor_bump_within_same_major_is_accepted() {
429 crate::config::test_support::with_temp_home(|| {
433 crate::config::ensure_dirs().unwrap();
434 let inbox = crate::config::inbox_dir().unwrap();
435 let event = json!({
436 "event_id": "minor-bump",
437 "schema_version": "v3.2",
438 "kind": 1000u32,
439 "type": "decision",
440 "from": "did:wire:peer-not-in-trust",
441 });
442 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
443 let reason = result.rejected[0]["reason"].as_str().unwrap();
447 assert!(
448 !reason.contains("schema_mismatch"),
449 "minor bump should not be schema_mismatch: {reason}"
450 );
451 });
452 }
453
454 #[test]
455 fn legacy_event_without_schema_version_field_is_accepted() {
456 crate::config::test_support::with_temp_home(|| {
461 crate::config::ensure_dirs().unwrap();
462 let inbox = crate::config::inbox_dir().unwrap();
463 let event = json!({
464 "event_id": "legacy",
465 "kind": 1000u32,
466 "type": "decision",
467 "from": "did:wire:legacy-peer",
468 });
469 let result = process_events(&[event], Some("prior".to_string()), &inbox).unwrap();
470 let reason = result.rejected[0]["reason"].as_str().unwrap();
471 assert!(!reason.contains("schema_mismatch"));
472 });
473 }
474
475 #[test]
476 fn inbox_dedupe_skips_duplicate_event_id() {
477 let tmp = std::env::temp_dir().join(format!(
482 "wire-dedupe-test-{}-{}",
483 std::process::id(),
484 rand::random::<u32>()
485 ));
486 std::fs::create_dir_all(&tmp).unwrap();
487 let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
488 let existing_line = json!({
489 "event_id": event_id,
490 "from": "did:wire:peer",
491 "type": "claim",
492 "body": "first occurrence",
493 });
494 let path = tmp.join("peer.jsonl");
495 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
496 assert!(inbox_already_contains(&path, event_id));
497 assert!(!inbox_already_contains(&path, "different-event-id"));
498 assert!(!inbox_already_contains(&path, ""));
499 }
500
501 #[test]
502 fn inbox_dedupe_substring_in_body_is_not_false_positive() {
503 let tmp = std::env::temp_dir().join(format!(
506 "wire-dedupe-substring-{}-{}",
507 std::process::id(),
508 rand::random::<u32>()
509 ));
510 std::fs::create_dir_all(&tmp).unwrap();
511 let target_eid = "deadbeefcafebabe";
512 let existing_line = json!({
515 "event_id": "different",
516 "from": "did:wire:peer",
517 "body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
518 });
519 let path = tmp.join("peer.jsonl");
520 std::fs::write(&path, format!("{existing_line}\n")).unwrap();
521 assert!(!inbox_already_contains(&path, target_eid));
524 }
525
526 #[test]
527 fn known_kind_after_unknown_does_not_advance_cursor() {
528 crate::config::test_support::with_temp_home(|| {
532 crate::config::ensure_dirs().unwrap();
533 let inbox = crate::config::inbox_dir().unwrap();
534
535 let events = vec![
536 json!({
537 "event_id": "evt-unknown",
538 "kind": 9999u32,
539 "type": "speculation",
540 "from": "did:wire:future",
541 }),
542 json!({
543 "event_id": "evt-known-but-untrusted",
544 "kind": 1000u32,
545 "type": "decision",
546 "from": "did:wire:peer-not-in-trust",
547 }),
548 ];
549
550 let result = process_events(&events, Some("prior".to_string()), &inbox).unwrap();
551
552 assert_eq!(result.rejected.len(), 2);
553 assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
554 assert!(result.blocked);
555 });
556 }
557}