use anyhow::Result;
use serde_json::{Value, json};
use std::path::Path;
use crate::{config, pair_invite, signing};
pub struct PullResult {
pub written: Vec<Value>,
pub rejected: Vec<Value>,
pub advance_cursor_to: Option<String>,
pub blocked: bool,
}
fn inbox_already_contains(path: &std::path::Path, event_id: &str) -> bool {
if event_id.is_empty() {
return false;
}
let body = match std::fs::read_to_string(path) {
Ok(b) => b,
Err(_) => return false,
};
let needle = format!("\"event_id\":\"{event_id}\"");
if !body.contains(&needle) {
return false;
}
for line in body.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(v) = serde_json::from_str::<Value>(trimmed)
&& v.get("event_id").and_then(Value::as_str) == Some(event_id)
{
return true;
}
}
false
}
pub fn is_known_kind(kind: u32) -> bool {
if kind == 1 || kind == 100 {
return true;
}
signing::kinds().iter().any(|(k, _)| *k == kind)
}
fn verify_error_is_transient(err: &signing::VerifyError) -> bool {
matches!(
err,
signing::VerifyError::UnknownAgent(_) | signing::VerifyError::UnknownKey(_, _)
)
}
pub fn process_events(
events: &[Value],
initial_cursor: Option<String>,
inbox_dir: &Path,
) -> Result<PullResult> {
let binary_version = env!("CARGO_PKG_VERSION");
let trust_snapshot = config::read_trust()?;
let mut written = Vec::new();
let mut rejected = Vec::new();
let mut last_advanced = initial_cursor.clone();
let mut first_block_idx: Option<usize> = None;
for (idx, event) in events.iter().enumerate() {
let event_id = event
.get("event_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let kind = event.get("kind").and_then(Value::as_u64).unwrap_or(0) as u32;
if let Some(declared) = event
.get("schema_version")
.and_then(Value::as_str)
{
let ours = signing::EVENT_SCHEMA_VERSION;
if signing::schema_major(declared) != signing::schema_major(ours) {
rejected.push(json!({
"event_id": event_id,
"reason": format!(
"schema_mismatch={declared} binary_supports={ours}"
),
"blocks_cursor": true,
"transient": true,
"schema_version": declared,
}));
if first_block_idx.is_none() {
first_block_idx = Some(idx);
}
continue;
}
}
if !is_known_kind(kind) {
let reason = format!(
"unknown_kind={kind} binary_version={binary_version}"
);
rejected.push(json!({
"event_id": event_id,
"reason": reason,
"blocks_cursor": true,
"transient": true,
}));
if first_block_idx.is_none() {
first_block_idx = Some(idx);
}
continue;
}
let drop_paired = match pair_invite::maybe_consume_pair_drop(event) {
Ok(Some(_)) => true,
Ok(None) => false,
Err(e) => {
let peer_handle = event
.get("from")
.and_then(Value::as_str)
.map(|s| crate::agent_card::display_handle_from_did(s).to_string())
.unwrap_or_else(|| "<unknown>".to_string());
eprintln!(
"wire pull: pair_drop from {peer_handle} consume FAILED: {e}. \
sender will not be pinned; have them re-add or retry."
);
pair_invite::record_pair_rejection(
&peer_handle,
"pair_drop_consume_failed",
&e.to_string(),
);
false
}
};
if let Err(e) = pair_invite::maybe_consume_pair_drop_ack(event) {
let peer_handle = event
.get("from")
.and_then(Value::as_str)
.map(|s| crate::agent_card::display_handle_from_did(s).to_string())
.unwrap_or_else(|| "<unknown>".to_string());
eprintln!(
"wire pull: pair_drop_ack from {peer_handle} consume FAILED: {e}. \
their slot_token NOT recorded; we cannot `wire send` to them \
until they retry."
);
pair_invite::record_pair_rejection(
&peer_handle,
"pair_drop_ack_consume_failed",
&e.to_string(),
);
}
let active_trust = if drop_paired {
config::read_trust()?
} else {
trust_snapshot.clone()
};
match signing::verify_message_v31(event, &active_trust) {
Ok(()) => {
let from = event
.get("from")
.and_then(Value::as_str)
.map(|s| crate::agent_card::display_handle_from_did(s).to_string())
.unwrap_or_else(|| "unknown".to_string());
let path = inbox_dir.join(format!("{from}.jsonl"));
if inbox_already_contains(&path, &event_id) {
rejected.push(json!({
"event_id": event_id,
"reason": "duplicate event_id already in inbox",
"blocks_cursor": false,
"transient": false,
}));
if first_block_idx.is_none() {
last_advanced = Some(event_id.clone());
}
continue;
}
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)?;
let mut line = serde_json::to_vec(event)?;
line.push(b'\n');
f.write_all(&line)?;
written.push(json!({"event_id": event_id, "from": from}));
if first_block_idx.is_none() {
last_advanced = Some(event_id.clone());
}
}
Err(e) if verify_error_is_transient(&e) => {
rejected.push(json!({
"event_id": event_id,
"reason": e.to_string(),
"blocks_cursor": true,
"transient": true,
}));
if first_block_idx.is_none() {
first_block_idx = Some(idx);
}
}
Err(e) => {
rejected.push(json!({
"event_id": event_id,
"reason": e.to_string(),
"blocks_cursor": false,
"transient": false,
}));
if first_block_idx.is_none() {
last_advanced = Some(event_id.clone());
}
}
}
}
let result = PullResult {
written: written.clone(),
rejected: rejected.clone(),
advance_cursor_to: last_advanced.clone(),
blocked: first_block_idx.is_some(),
};
crate::diag::emit(
"pull",
json!({
"events_in": events.len(),
"written": result.written.len(),
"rejected": result.rejected.len(),
"blocked": result.blocked,
"advance_to": result.advance_cursor_to,
}),
);
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn known_kinds_recognised() {
assert!(is_known_kind(1));
assert!(is_known_kind(100));
assert!(is_known_kind(1000));
assert!(is_known_kind(1100));
assert!(is_known_kind(1101));
assert!(is_known_kind(1201));
}
#[test]
fn unknown_kinds_rejected() {
assert!(!is_known_kind(0));
assert!(!is_known_kind(9999));
assert!(!is_known_kind(1099));
assert!(!is_known_kind(50000));
}
#[test]
fn unknown_kind_rejection_carries_binary_version_and_kind() {
crate::config::test_support::with_temp_home(|| {
crate::config::ensure_dirs().unwrap();
let inbox = crate::config::inbox_dir().unwrap();
let event = json!({
"event_id": "deadbeef",
"kind": 9999u32,
"type": "speculation",
"from": "did:wire:future-peer",
});
let result = process_events(
&[event],
Some("prior-cursor".to_string()),
&inbox,
)
.unwrap();
assert_eq!(result.rejected.len(), 1);
let reason = result.rejected[0]["reason"].as_str().unwrap();
assert!(
reason.contains("unknown_kind=9999"),
"reason missing kind: {reason}"
);
assert!(
reason.contains("binary_version="),
"reason missing binary_version: {reason}"
);
assert_eq!(result.rejected[0]["blocks_cursor"], true);
assert_eq!(
result.advance_cursor_to,
Some("prior-cursor".to_string()),
"cursor advanced past unknown kind — silent drop regression"
);
assert!(result.blocked);
});
}
#[test]
fn schema_mismatch_blocks_cursor_with_reason_shape() {
crate::config::test_support::with_temp_home(|| {
crate::config::ensure_dirs().unwrap();
let inbox = crate::config::inbox_dir().unwrap();
let event = json!({
"event_id": "future-binary",
"schema_version": "v4.0",
"kind": 1000u32,
"type": "decision",
"from": "did:wire:future",
});
let result = process_events(&[event], Some("prior".to_string()), &inbox)
.unwrap();
assert_eq!(result.rejected.len(), 1);
let reason = result.rejected[0]["reason"].as_str().unwrap();
assert!(reason.contains("schema_mismatch=v4.0"));
assert!(reason.contains("binary_supports=v3.1"));
assert_eq!(result.rejected[0]["blocks_cursor"], true);
assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
});
}
#[test]
fn schema_minor_bump_within_same_major_is_accepted() {
crate::config::test_support::with_temp_home(|| {
crate::config::ensure_dirs().unwrap();
let inbox = crate::config::inbox_dir().unwrap();
let event = json!({
"event_id": "minor-bump",
"schema_version": "v3.2",
"kind": 1000u32,
"type": "decision",
"from": "did:wire:peer-not-in-trust",
});
let result = process_events(&[event], Some("prior".to_string()), &inbox)
.unwrap();
let reason = result.rejected[0]["reason"].as_str().unwrap();
assert!(
!reason.contains("schema_mismatch"),
"minor bump should not be schema_mismatch: {reason}"
);
});
}
#[test]
fn legacy_event_without_schema_version_field_is_accepted() {
crate::config::test_support::with_temp_home(|| {
crate::config::ensure_dirs().unwrap();
let inbox = crate::config::inbox_dir().unwrap();
let event = json!({
"event_id": "legacy",
"kind": 1000u32,
"type": "decision",
"from": "did:wire:legacy-peer",
});
let result = process_events(&[event], Some("prior".to_string()), &inbox)
.unwrap();
let reason = result.rejected[0]["reason"].as_str().unwrap();
assert!(!reason.contains("schema_mismatch"));
});
}
#[test]
fn inbox_dedupe_skips_duplicate_event_id() {
let tmp = std::env::temp_dir().join(format!(
"wire-dedupe-test-{}-{}",
std::process::id(),
rand::random::<u32>()
));
std::fs::create_dir_all(&tmp).unwrap();
let event_id = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef";
let existing_line = json!({
"event_id": event_id,
"from": "did:wire:peer",
"type": "claim",
"body": "first occurrence",
});
let path = tmp.join("peer.jsonl");
std::fs::write(&path, format!("{existing_line}\n")).unwrap();
assert!(inbox_already_contains(&path, event_id));
assert!(!inbox_already_contains(&path, "different-event-id"));
assert!(!inbox_already_contains(&path, ""));
}
#[test]
fn inbox_dedupe_substring_in_body_is_not_false_positive() {
let tmp = std::env::temp_dir().join(format!(
"wire-dedupe-substring-{}-{}",
std::process::id(),
rand::random::<u32>()
));
std::fs::create_dir_all(&tmp).unwrap();
let target_eid = "deadbeefcafebabe";
let existing_line = json!({
"event_id": "different",
"from": "did:wire:peer",
"body": format!("the user mentioned event_id deadbeefcafebabe in passing"),
});
let path = tmp.join("peer.jsonl");
std::fs::write(&path, format!("{existing_line}\n")).unwrap();
assert!(!inbox_already_contains(&path, target_eid));
}
#[test]
fn known_kind_after_unknown_does_not_advance_cursor() {
crate::config::test_support::with_temp_home(|| {
crate::config::ensure_dirs().unwrap();
let inbox = crate::config::inbox_dir().unwrap();
let events = vec![
json!({
"event_id": "evt-unknown",
"kind": 9999u32,
"type": "speculation",
"from": "did:wire:future",
}),
json!({
"event_id": "evt-known-but-untrusted",
"kind": 1000u32,
"type": "decision",
"from": "did:wire:peer-not-in-trust",
}),
];
let result = process_events(
&events,
Some("prior".to_string()),
&inbox,
)
.unwrap();
assert_eq!(result.rejected.len(), 2);
assert_eq!(result.advance_cursor_to, Some("prior".to_string()));
assert!(result.blocked);
});
}
}