ai_memory/mcp/tools/capture_turn.rs
1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 (#1389) — MCP `memory_capture_turn` handler. Substrate-side
5//! implementation of the L4 layer of the layered-capture architecture
6//! per RFC-0001 (`docs/rfc/RFC-0001-mcp-turn-capture.md`).
7//!
8//! # What this tool does
9//!
10//! Hosts (Claude Code / Codex CLI / Gemini CLI / IDE plugins / future
11//! MCP-aware harnesses) call `memory_capture_turn` once per
12//! conversation turn to volunteer the turn content directly into the
13//! substrate. The substrate stores it idempotently by
14//! `(host_session_id, host_turn_index)` and writes a `signed_events`
15//! row tagged `layer = "L4"` so audit can prove which layer caught
16//! each turn.
17//!
18//! # Why L4 is THE FIX
19//!
20//! Layered defense (per architecture memo `f62cb182`):
21//!
22//! - **L1** — agent discipline (`memory_capture_nag`) catches the
23//! common case "agent forgot."
24//! - **L2** — `recover-previous-session` catches SIGKILL between
25//! sessions on the same host.
26//! - **L3** — substrate filesystem-notify watcher catches mid-session
27//! crashes + concurrent multi-session capture.
28//! - **L4** — THIS tool — host volunteers turns directly via MCP
29//! protocol. No transcript scraping. No format coupling. The trust
30//! boundary is the protocol contract. **Survives 50 years of
31//! vendor churn** because the substrate doesn't depend on any
32//! single host's implementation details.
33//!
34//! # Performance contract
35//!
36//! Per issue #1394 + the operator's "optimal performance" directive:
37//! synchronous dispatch < 10 ms p95 under release-build conditions.
38//! Substrate path: sha256 of canonical bytes + dedup-table SELECT on
39//! `(host_session_id, host_turn_index)` + (on miss) memory INSERT +
40//! `transcript_line_dedup` INSERT + `signed_events` chain row in a
41//! single transaction.
42//!
43//! # Idempotency contract (per RFC-0001 §"Idempotency contract")
44//!
45//! Two calls with the same `(host_session_id, host_turn_index)`
46//! produce exactly one memory. The second call returns
47//! `dedup_hit: true` + the existing memory_id. Re-delivery on host
48//! reconnect is safe.
49//!
50//! # Status (v0.7.0 ship slice)
51//!
52//! The Request struct + Tool impl + dispatch wiring land first
53//! (this commit). The substantive storage transaction lands in a
54//! follow-up slice on the same branch (`feat/1389-layered-capture`)
55//! — the skeleton handler returns a stub envelope with
56//! `dedup_hit: false` + a placeholder memory_id so the wire shape
57//! is exercisable from MCP clients during the implementation cycle.
58
59use crate::models::field_names;
60use std::time::Instant;
61
62use base64::Engine;
63use base64::engine::general_purpose::STANDARD as B64_STD;
64use schemars::JsonSchema;
65use serde::Deserialize;
66use serde_json::{Value, json};
67use sha2::{Digest, Sha256};
68
69use crate::mcp::param_names;
70use crate::mcp::registry::McpTool;
71use crate::models::{Memory, MemoryKind, Tier};
72use crate::signed_events::{self, SignedEvent};
73
74/// Env var carrying the operator's per-host Ed25519 pubkey allowlist
75/// for L4 `memory_capture_turn` signature verification (#1414).
76/// Comma-separated base64-encoded 32-byte pubkeys. Unset / empty =
77/// no host signatures accepted (every `host_signature_b64` +
78/// `host_pubkey_b64` payload errors with `HOST_PUBKEY_NOT_ENROLLED`).
79///
80/// Mirrors the `AI_MEMORY_ADMIN_AGENT_IDS` shape — an operator-
81/// curated allowlist read at call time, no daemon-restart required
82/// for enrollment changes (each call re-reads the env). Documented
83/// in CLAUDE.md §"Environment Variables".
84pub(crate) const L4_HOST_PUBKEY_ALLOWLIST_ENV: &str = "AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST";
85
86/// #1558 batch 5 wave 3 — action label for the L4 capture-turn write
87/// gate (deny-message verb + the `action` field on ask/pending
88/// envelopes). File-local: no other surface uses this label.
89const ACTION_CAPTURE_TURN: &str = "capture_turn";
90
91/// `memory_capture_turn` request body per RFC-0001 §"Tool input schema".
92///
93/// Field-by-field doc comments become the schemars-generated
94/// `description` strings in the MCP `inputSchema`. The schema doubles
95/// as the wire contract for every MCP-aware host that volunteers
96/// turns.
97// Per the #1052 wire-truthfulness decision (Agent-4 F2): no MCP
98// tool-request struct carries `deny_unknown_fields`. The wire schema
99// must not advertise `additionalProperties: false` while the runtime
100// stays permissive. For an L4 multi-host ingest surface this is
101// load-bearing — a host that adds a top-level field must not have its
102// turns rejected wholesale (the #1052 rationale cites exactly "clients
103// with newer field sets"). Unknown extra fields are tolerated and
104// ignored; missing REQUIRED fields still error via serde. Arbitrary
105// host-specific data belongs in `metadata`.
106#[derive(Debug, Clone, Deserialize, JsonSchema)]
107pub struct MemoryCaptureTurnRequest {
108 /// Opaque identifier the host issues per conversation session.
109 /// Stable across turns within a session; distinct across
110 /// sessions. Used as one half of the dedup key
111 /// `(host_session_id, host_turn_index)`.
112 pub host_session_id: String,
113
114 /// Monotonically increasing per-`(host_session_id)` turn counter.
115 /// Starts at 0 for the first turn. The substrate uses
116 /// `(host_session_id, host_turn_index)` as the canonical dedup
117 /// key so re-delivery of the same turn is idempotent.
118 pub host_turn_index: i64,
119
120 /// Speaker classification — `user` / `assistant` / `tool_use` /
121 /// `tool_result` / `system` / `other`. Drives downstream
122 /// memory_kind assignment in the v0.8 decision-detector
123 /// classifier.
124 pub role: String,
125
126 /// Verbatim turn text. The substrate preserves this byte-for-byte;
127 /// classifiers run separately downstream via the existing
128 /// atomiser / curator surface.
129 pub content: String,
130
131 /// Identifier for the host implementation (e.g. `"claude-code"`,
132 /// `"codex"`, `"gemini"`, `"cursor"`, `"cline"`). Surfaced in the
133 /// audit trail + the operator-facing per-host coverage report.
134 /// When omitted, defaults to `"unknown"`.
135 #[serde(default)]
136 pub host_kind: Option<String>,
137
138 /// Version string for the host implementation. Surfaced in the
139 /// audit trail so future format drift can be diagnosed by host
140 /// version.
141 #[serde(default)]
142 pub host_version: Option<String>,
143
144 /// Optional summary of tool invocations within this assistant
145 /// turn. Each entry is `{tool: string, brief: string}`. The
146 /// substrate preserves the list verbatim but does not (at v0.7.0)
147 /// classify or index per-tool-call. Reserved for v0.7.x atom-
148 /// per-tool indexing; the field is wire-stable today so hosts
149 /// can already populate it without breakage.
150 #[serde(default)]
151 #[allow(dead_code)]
152 pub tool_calls: Vec<ToolCallSummary>,
153
154 /// RFC3339 instant the host emitted the turn. Used as the
155 /// recovered memory's `created_at` so the timeline matches the
156 /// original conversation rather than the capture-call wall-clock.
157 /// When omitted, the substrate stamps with its current clock.
158 #[serde(default)]
159 pub timestamp_iso: Option<String>,
160
161 /// Optional Ed25519 signature over the canonical-bytes encoding
162 /// `host_session_id || 0x00 || host_turn_index || 0x00 || role ||
163 /// 0x00 || content`. When present + verified, the substrate
164 /// writes `attest_level = "signed_by_peer"` on the resulting
165 /// memory. When absent, `attest_level = "self_signed"`.
166 #[serde(default)]
167 pub host_signature_b64: Option<String>,
168
169 /// Ed25519 pubkey the substrate should verify
170 /// `host_signature_b64` against. The pubkey MUST be pre-enrolled
171 /// via the existing federation peer-allowlist mechanism.
172 /// Unenrolled pubkeys cause the call to fail with
173 /// `HOST_PUBKEY_NOT_ENROLLED`.
174 #[serde(default)]
175 pub host_pubkey_b64: Option<String>,
176
177 /// Substrate namespace the turn lands in. Defaults to the
178 /// agent's resolved default namespace per the calling context.
179 #[serde(default)]
180 pub namespace: Option<String>,
181
182 /// Optional arbitrary metadata the host wants to preserve
183 /// alongside the turn. Reserved keys (`agent_id`, `entity_id`,
184 /// `mentioned_entity_id`) follow the existing
185 /// `crate::validate::RequestValidator` rules.
186 #[serde(default)]
187 pub metadata: Option<Value>,
188}
189
190/// Summary of one tool call within an assistant turn. Mirrors the
191/// `crate::recover::parsers::ToolCallSummary` shape so L2/L3 +
192/// L4 capture surfaces produce the same downstream memory shape.
193// Wire-truthful permissive per #1052 (see MemoryCaptureTurnRequest).
194#[derive(Debug, Clone, Deserialize, JsonSchema)]
195#[allow(dead_code)] // Skeleton phase — fields read by the storage
196// transaction landing in the follow-up slice.
197pub struct ToolCallSummary {
198 /// Tool name (e.g. `"Bash"`, `"Read"`,
199 /// `"mcp__memory__memory_store"`).
200 pub tool: String,
201 /// One-line target/brief. For `Bash`, the `description` arg; for
202 /// `Read`, the file path; for an MCP tool, the first 1-2 fields
203 /// of the request struct. Truncated to 200 chars by convention.
204 pub brief: String,
205}
206
207/// Zero-sized `McpTool` registration for the v0.7.0 D1.6 recipe.
208pub struct MemoryCaptureTurnTool;
209
210impl McpTool for MemoryCaptureTurnTool {
211 fn name() -> &'static str {
212 crate::mcp::registry::tool_names::MEMORY_CAPTURE_TURN
213 }
214
215 fn description() -> &'static str {
216 "L4 host-volunteered turn capture per RFC-0001 (mcp-turn-capture). \
217 Idempotent by (host_session_id, host_turn_index)."
218 }
219
220 fn docs() -> &'static str {
221 "v0.7.0 #1389 L4: host volunteers each conversation turn directly \
222 via the MCP protocol. Substrate stores it idempotently and writes \
223 a signed_events row tagged layer=L4. Replaces transcript-file \
224 scraping with a clean protocol-level contract. Full design at \
225 docs/rfc/RFC-0001-mcp-turn-capture.md. Closes the #1388 substrate \
226 failure mode at the protocol layer."
227 }
228
229 fn input_schema() -> Value {
230 crate::mcp::registry::input_schema_for::<MemoryCaptureTurnRequest>()
231 }
232
233 fn family() -> &'static str {
234 // Lifecycle — capture is a substrate-lifecycle primitive
235 // (every host-volunteered turn produces one memory row).
236 crate::profile::Family::Lifecycle.name()
237 }
238}
239
240/// Handler entrypoint dispatched from `crate::mcp::handle_request`.
241///
242/// Performs the L4 idempotent capture per RFC-0001 §"Idempotency
243/// contract":
244///
245/// 1. SELECT `memory_id` FROM `transcript_line_dedup` WHERE
246/// `(host_session_id, host_turn_index) = (?, ?)` — the canonical
247/// dedup key.
248/// 2. On hit: return `{memory_id, dedup_hit: true, layer: "L4",
249/// elapsed_ms}`. No DB write.
250/// 3. On miss: compute sha256 of canonical-bytes, `BEGIN IMMEDIATE`
251/// transaction → `memories` INSERT via the canonical
252/// `storage::insert` path → `transcript_line_dedup` INSERT →
253/// COMMIT (or ROLLBACK on any failure with the transaction
254/// rolled back atomically).
255///
256/// # Errors
257///
258/// Returns a string-stable error code per the MCP-spec error
259/// convention (see `crate::mcp::handle_request`'s 2025-03-26
260/// `§"Tool result"` comment):
261///
262/// - `INVALID_INPUT: <reason>` — request failed deserialization or
263/// schema validation.
264/// - `DEDUP_QUERY_FAILED: <detail>` — `transcript_line_dedup`
265/// SELECT I/O failure.
266/// - `MEMORY_INSERT_FAILED: <detail>` — `storage::insert` returned
267/// an error (governance refusal, validation failure, SQL error).
268/// - `DEDUP_INSERT_FAILED: <detail>` — `transcript_line_dedup`
269/// INSERT failed; transaction rolled back, no row written.
270/// - `TX_BEGIN_FAILED: <detail>` / `TX_COMMIT_FAILED: <detail>` —
271/// transaction lifecycle errors.
272/// - `HOST_PUBKEY_NOT_ENROLLED: <pubkey>` — `host_pubkey_b64` is not
273/// on the operator's L4 host-pubkey allowlist
274/// (`AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST` env). Per #1414.
275/// - `SIGNED_EVENTS_APPEND_FAILED: <detail>` — substrate failed to
276/// write the L4 audit row. Per #1415.
277///
278/// # Security (post-#1413 critical fix)
279///
280/// - **agent_id agreement** — when `req.metadata.agent_id` is present
281/// it MUST equal the resolved `caller_agent_id` (mirroring
282/// `resolve_http_agent_id`'s body-header agreement contract).
283/// Mismatch returns `INVALID_INPUT` and refuses the write.
284/// - **Signature verification** — when `host_signature_b64` and
285/// `host_pubkey_b64` are present, the pubkey is checked against the
286/// `AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST` env-var allowlist
287/// (`HOST_PUBKEY_NOT_ENROLLED` on miss) and the signature is verified
288/// via Ed25519 over the canonical-bytes encoding
289/// `host_session_id || 0x00 || host_turn_index || 0x00 || role ||
290/// 0x00 || content` (`INVALID_INPUT: signature_verification_failed`
291/// on mismatch). On success the L4 audit row carries
292/// `attest_level = "signed_by_peer"`; absent both fields yields
293/// `attest_level = "self_signed"`; exactly one of the two fields
294/// present errors with `INVALID_INPUT`.
295/// - **`signed_events` chain row** — the substrate writes one row per
296/// successful capture inside the BEGIN IMMEDIATE transaction with
297/// `event_type = "memory_capture_turn"`, the resolved `attest_level`,
298/// and `payload_hash = sha256(canonical bytes)` so audit can prove
299/// which layer caught each turn (#1415).
300pub fn handle_capture_turn(
301 conn: &rusqlite::Connection,
302 params: &Value,
303 caller_agent_id: Option<&str>,
304) -> Result<Value, String> {
305 let start = Instant::now();
306 let req: MemoryCaptureTurnRequest =
307 serde_json::from_value(params.clone()).map_err(|e| format!("INVALID_INPUT: {e}"))?;
308
309 // v0.7.0 #1413 — resolve effective caller for the agent_id agreement
310 // check + signed_events row attribution. MCP stdio captures the host
311 // identity at `initialize.clientInfo.name`; when present, the
312 // dispatcher threads it via `ctx.mcp_client`. When absent, we still
313 // mint a per-request fallback so audit attribution is never empty.
314 let caller = caller_agent_id.unwrap_or("anonymous:mcp-unknown");
315
316 // v0.7.0 #1416 — all validation (agent_id agreement #1413,
317 // signature verification #1414) + Memory/SignedEvent construction
318 // is backend-agnostic and lives in `prepare_capture_turn`; the
319 // dedup-lookup + atomic three-row transaction is the sqlite SSOT
320 // `crate::storage::capture_turn_idempotent` (also reached by
321 // `SqliteStore::capture_turn_idempotent` through the SAL trait).
322 let write = prepare_capture_turn(&req, caller)?;
323 let attest_level = write.signed_event.attest_level.clone();
324
325 // v0.7.0 H1 (HIGH) — write-gate parity for the mutating
326 // `capture_turn` verb. Pre-fix, L4 turn capture persisted a Memory
327 // row WITHOUT passing through the K9 permission gate or the
328 // K3/Task-1.9 governance gate that `memory_store` enforces, so a
329 // governed namespace could be written via `memory_capture_turn`
330 // ungated. L4 capture is a store-class write: we gate it under the
331 // same `Op::MemoryStore` / `GovernedAction::Store` policy surface as
332 // `memory_store`, against the resolved target namespace. A dedup-hit
333 // is a no-op write, so gating before the idempotent transaction is
334 // safe — a denied namespace refuses uniformly whether or not the
335 // turn was already captured.
336 {
337 let gate_namespace = write.memory.namespace.clone();
338 let gate_payload = json!({
339 "id": write.memory.id,
340 "title": write.memory.title,
341 "namespace": gate_namespace,
342 });
343
344 use crate::permissions::{Op, PermissionContext, Permissions};
345 let ctx = PermissionContext {
346 op: Op::MemoryStore,
347 namespace: gate_namespace.clone(),
348 agent_id: caller.to_string(),
349 payload: gate_payload.clone(),
350 };
351 match Permissions::evaluate(&ctx, &[]) {
352 crate::permissions::Decision::Allow | crate::permissions::Decision::Modify(_) => {}
353 crate::permissions::Decision::Deny(reason) => {
354 return Err(crate::governance::deny_message(
355 ACTION_CAPTURE_TURN,
356 crate::governance::DenyGate::PermissionRule,
357 &reason,
358 ));
359 }
360 crate::permissions::Decision::Ask(prompt) => {
361 return Ok(json!({
362 "status": "ask",
363 "reason": prompt,
364 "action": ACTION_CAPTURE_TURN,
365 "namespace": gate_namespace,
366 }));
367 }
368 }
369
370 use crate::models::{GovernanceDecision, GovernedAction};
371 match crate::db::enforce_governance(
372 conn,
373 GovernedAction::Store,
374 &gate_namespace,
375 caller,
376 Some(&write.memory.id),
377 Some(caller),
378 &gate_payload,
379 )
380 .map_err(|e| e.to_string())?
381 {
382 GovernanceDecision::Allow => {}
383 GovernanceDecision::Deny(refusal) => {
384 return Err(crate::governance::deny_message(
385 ACTION_CAPTURE_TURN,
386 crate::governance::DenyGate::Governance,
387 &refusal.reason,
388 ));
389 }
390 GovernanceDecision::Pending(pending_id) => {
391 return Ok(json!({
392 "status": "pending",
393 (field_names::PENDING_ID): pending_id,
394 "reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
395 "action": ACTION_CAPTURE_TURN,
396 "namespace": gate_namespace,
397 }));
398 }
399 }
400 }
401
402 let result = crate::storage::capture_turn_idempotent(conn, &write)?;
403 let elapsed_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
404
405 if result.dedup_hit {
406 Ok(json!({
407 "memory_id": result.memory_id,
408 "dedup_hit": true,
409 "layer": "L4",
410 (field_names::ELAPSED_MS): elapsed_ms,
411 }))
412 } else {
413 Ok(json!({
414 "memory_id": result.memory_id,
415 "dedup_hit": false,
416 "layer": "L4",
417 (field_names::ATTEST_LEVEL): attest_level,
418 (field_names::ELAPSED_MS): elapsed_ms,
419 }))
420 }
421}
422
423/// v0.7.0 #1416 — backend-agnostic preparation of an L4 capture write.
424///
425/// Performs every step that does NOT touch the database — agent_id
426/// agreement (#1413), canonical-bytes hashing, host-signature
427/// verification (#1414), and construction of the [`Memory`] plus the
428/// [`SignedEvent`] audit row (#1415) — and returns a ready-to-write
429/// [`crate::models::CaptureTurnWrite`]. Both surfaces hand the bundle to
430/// the dedup-keyed transaction: the MCP sqlite handler via
431/// `crate::storage::capture_turn_idempotent`, the HTTP route via
432/// `app.store.capture_turn_idempotent` (sqlite OR postgres). Keeping the
433/// verification here means it is enforced identically across both
434/// surfaces and both backends — no adapter can skip it.
435pub(crate) fn prepare_capture_turn(
436 req: &MemoryCaptureTurnRequest,
437 caller: &str,
438) -> Result<crate::models::CaptureTurnWrite, String> {
439 // #1413 — agent_id agreement. A caller-stamped `metadata.agent_id`
440 // MUST equal the resolved caller, else an attacker with access to
441 // the surface could forge memories attributed to other identities.
442 if let Some(meta_agent) = req
443 .metadata
444 .as_ref()
445 .and_then(|v| v.get(param_names::AGENT_ID))
446 .and_then(|v| v.as_str())
447 && meta_agent != caller
448 {
449 return Err(format!(
450 "INVALID_INPUT: metadata.agent_id ({meta_agent:?}) does not match resolved caller ({caller:?})"
451 ));
452 }
453
454 // #1414 — canonical-bytes + host-signature verification. Cases:
455 // (sig, pubkey) both Some → check allowlist + verify Ed25519
456 // exactly one Some → INVALID_INPUT (paired-fields contract)
457 // both None → unsigned ("self_signed") capture path
458 let canonical = format!(
459 "{}\0{}\0{}\0{}",
460 &req.host_session_id, req.host_turn_index, &req.role, &req.content
461 );
462 let sha_vec = {
463 let mut hasher = Sha256::new();
464 hasher.update(canonical.as_bytes());
465 hasher.finalize().to_vec()
466 };
467
468 let (sig_bytes_opt, attest_level): (Option<Vec<u8>>, String) =
469 verify_host_signature(req, canonical.as_bytes())?;
470
471 let host_kind = req.host_kind.as_deref().unwrap_or("unknown").to_string();
472 let now_iso = chrono::Utc::now().to_rfc3339();
473 let created_at = req.timestamp_iso.clone().unwrap_or_else(|| now_iso.clone());
474
475 let mut tags = vec![
476 "captured-via-l4".to_string(),
477 format!("host:{host_kind}"),
478 format!("role:{}", req.role),
479 format!("attest:{attest_level}"),
480 ];
481 if let Some(hv) = req.host_version.as_deref() {
482 tags.push(format!("host-version:{hv}"));
483 }
484
485 // Title MUST be unique per (host_session_id, host_turn_index)
486 // because the substrate's `storage::insert` upserts on
487 // `(title, namespace)`; without host_session_id in the title,
488 // two distinct sessions whose turn N has the same role would
489 // collide on the same memory row.
490 let title = format!(
491 "L4 capture {} {} turn {} ({})",
492 host_kind, req.host_session_id, req.host_turn_index, req.role
493 );
494
495 // #1413 — stamp `metadata.agent_id` with the resolved caller so the
496 // inserted memory carries the authenticated identity. Synthesize an
497 // object when absent; patch when present without agent_id; preserve
498 // verbatim when present with a matching agent_id (checked above).
499 let metadata = {
500 let mut m = req.metadata.clone().unwrap_or_else(|| json!({}));
501 if let Some(obj) = m.as_object_mut() {
502 obj.entry("agent_id".to_string())
503 .or_insert_with(|| Value::String(caller.to_string()));
504 }
505 m
506 };
507
508 let mem = Memory {
509 id: uuid::Uuid::new_v4().to_string(),
510 tier: Tier::Long,
511 // v0.7.0 F-E4 fix (#1436): route through DEFAULT_NAMESPACE
512 // SSOT at src/lib.rs:266 instead of the bare literal.
513 namespace: req
514 .namespace
515 .clone()
516 .unwrap_or_else(|| crate::DEFAULT_NAMESPACE.to_string()),
517 title,
518 content: req.content.clone(),
519 tags,
520 priority: 5,
521 confidence: 1.0,
522 source: "host".to_string(),
523 metadata,
524 created_at,
525 updated_at: now_iso.clone(),
526 last_accessed_at: Some(now_iso.clone()),
527 memory_kind: MemoryKind::Observation,
528 ..Memory::default()
529 };
530
531 // v0.7.0 #1415 — audit-chain row appended inside the same tx as the
532 // data rows (by the store method) so audit can prove L4 caught the
533 // turn; attest_level reflects whether the host provided a verified
534 // Ed25519 signature (#1414).
535 let signed_event = SignedEvent {
536 id: uuid::Uuid::new_v4().to_string(),
537 agent_id: caller.to_string(),
538 event_type: signed_events::event_types::MEMORY_CAPTURE_TURN.to_string(),
539 payload_hash: sha_vec.clone(),
540 signature: sig_bytes_opt,
541 attest_level,
542 timestamp: now_iso,
543 ..SignedEvent::default()
544 };
545
546 Ok(crate::models::CaptureTurnWrite {
547 memory: mem,
548 sha256: sha_vec,
549 host_kind,
550 host_session_id: req.host_session_id.clone(),
551 host_turn_index: req.host_turn_index,
552 recovered_at_ms: chrono::Utc::now().timestamp_millis(),
553 signed_event,
554 })
555}
556
557/// v0.7.0 #1414 — parse + verify the host signature pair, returning
558/// `(sig_bytes_opt, attest_level)` for downstream use in the
559/// signed_events row.
560///
561/// Contract:
562/// - both `host_signature_b64` and `host_pubkey_b64` present →
563/// pubkey allowlist check → Ed25519 verify → ("signed_by_peer")
564/// - exactly one of the two present → `INVALID_INPUT` (paired-fields)
565/// - both absent → (`None`, "self_signed")
566fn verify_host_signature(
567 req: &MemoryCaptureTurnRequest,
568 canonical_bytes: &[u8],
569) -> Result<(Option<Vec<u8>>, String), String> {
570 match (req.host_signature_b64.as_deref(), req.host_pubkey_b64.as_deref()) {
571 (None, None) => Ok((None, crate::models::AttestLevel::SelfSigned.as_str().to_string())),
572 (Some(_), None) | (None, Some(_)) => Err(
573 "INVALID_INPUT: host_signature_b64 and host_pubkey_b64 must both be present or both absent"
574 .to_string(),
575 ),
576 (Some(sig_b64), Some(pubkey_b64)) => {
577 let pubkey_bytes = B64_STD
578 .decode(pubkey_b64)
579 .map_err(|e| format!("INVALID_INPUT: host_pubkey_b64 not valid base64: {e}"))?;
580 let pubkey_arr: [u8; 32] = pubkey_bytes.try_into().map_err(|_| {
581 "INVALID_INPUT: host_pubkey_b64 must decode to 32 bytes (Ed25519)".to_string()
582 })?;
583
584 if !is_host_pubkey_enrolled(&pubkey_arr) {
585 return Err(format!("HOST_PUBKEY_NOT_ENROLLED: {pubkey_b64}"));
586 }
587
588 let verifying_key = ed25519_dalek::VerifyingKey::from_bytes(&pubkey_arr).map_err(
589 |e| format!("INVALID_INPUT: host_pubkey_b64 not a valid Ed25519 key: {e}"),
590 )?;
591
592 let sig_bytes = B64_STD
593 .decode(sig_b64)
594 .map_err(|e| format!("INVALID_INPUT: host_signature_b64 not valid base64: {e}"))?;
595 let sig_arr: [u8; 64] = sig_bytes.clone().try_into().map_err(|_| {
596 "INVALID_INPUT: host_signature_b64 must decode to 64 bytes (Ed25519)".to_string()
597 })?;
598 let signature = ed25519_dalek::Signature::from_bytes(&sig_arr);
599
600 verifying_key
601 .verify_strict(canonical_bytes, &signature)
602 .map_err(|e| {
603 format!("INVALID_INPUT: signature_verification_failed: {e}")
604 })?;
605
606 Ok((Some(sig_bytes), crate::models::AttestLevel::SignedByPeer.as_str().to_string()))
607 }
608 }
609}
610
611/// Check the env-var allowlist for a host pubkey. Re-reads the env
612/// on every call so operators can adjust enrollment without daemon
613/// restart. An unset / empty env means no host signatures are
614/// accepted (every signed-path call yields `HOST_PUBKEY_NOT_ENROLLED`)
615/// — the conservative default per the v0.7.0 sole-authority rule.
616fn is_host_pubkey_enrolled(pubkey: &[u8; 32]) -> bool {
617 let Ok(raw) = std::env::var(L4_HOST_PUBKEY_ALLOWLIST_ENV) else {
618 return false;
619 };
620 for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
621 if let Ok(bytes) = B64_STD.decode(entry)
622 && bytes.len() == 32
623 && bytes.as_slice() == pubkey
624 {
625 return true;
626 }
627 }
628 false
629}
630
631#[cfg(test)]
632mod d1_6_1389_tests {
633 //! D1.6 (#987) parity tests for the L4 `memory_capture_turn` tool.
634 //! Shared helpers live at [`crate::mcp::parity_test_helpers`].
635 use super::*;
636
637 #[test]
638 fn capture_turn_tool_metadata() {
639 assert_eq!(MemoryCaptureTurnTool::name(), "memory_capture_turn");
640 assert_eq!(MemoryCaptureTurnTool::family(), "lifecycle");
641 // Description + docs are non-empty so the MCP capability
642 // surface advertises a meaningful tool to discovery callers.
643 assert!(!MemoryCaptureTurnTool::description().is_empty());
644 assert!(!MemoryCaptureTurnTool::docs().is_empty());
645 }
646
647 #[test]
648 fn input_schema_is_valid_json() {
649 let schema = MemoryCaptureTurnTool::input_schema();
650 // The schemars-derived schema must serialize as a JSON
651 // object with required + properties keys per the JSON Schema
652 // draft spec.
653 let obj = schema.as_object().expect("schema is an object");
654 assert!(
655 obj.contains_key("properties"),
656 "schema must advertise properties"
657 );
658 // Sanity-check that the four required fields are required.
659 let required = obj
660 .get("required")
661 .and_then(Value::as_array)
662 .expect("required is an array");
663 let required_names: Vec<&str> = required.iter().filter_map(Value::as_str).collect();
664 for name in &["host_session_id", "host_turn_index", "role", "content"] {
665 assert!(
666 required_names.contains(name),
667 "required must include {name}"
668 );
669 }
670 }
671}
672
673#[cfg(test)]
674mod handler_tests {
675 //! Skeleton handler tests — the wire shape is stable; the
676 //! placeholder behavior is exercised here. The substantive
677 //! storage tests land in the follow-up commit alongside the
678 //! transactional path.
679 use super::*;
680
681 fn fresh_conn() -> rusqlite::Connection {
682 crate::storage::open(std::path::Path::new(":memory:")).expect("open in-memory db")
683 }
684
685 /// Test helper — calls the handler with no MCP-handshake caller
686 /// (`None`). The agent_id agreement check at #1413 is a no-op
687 /// when the request body carries no `metadata.agent_id`, so
688 /// every legacy test continues to pass under the new signature.
689 fn call_handler(conn: &rusqlite::Connection, params: &Value) -> Result<Value, String> {
690 handle_capture_turn(conn, params, None)
691 }
692
693 #[test]
694 fn handler_accepts_minimal_request() {
695 let conn = fresh_conn();
696 let resp = call_handler(
697 &conn,
698 &json!({
699 "host_session_id": "session-a",
700 "host_turn_index": 0,
701 "role": "user",
702 "content": "hello"
703 }),
704 )
705 .expect("ok");
706 assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
707 assert_eq!(resp["layer"].as_str(), Some("L4"));
708 assert!(resp["memory_id"].as_str().is_some());
709 }
710
711 #[test]
712 fn handler_rejects_missing_required_fields() {
713 let conn = fresh_conn();
714 let resp = call_handler(&conn, &json!({ "host_session_id": "x" }));
715 let err = resp.expect_err("missing required fields must error");
716 assert!(
717 err.starts_with("INVALID_INPUT"),
718 "error must use INVALID_INPUT prefix, got: {err}"
719 );
720 }
721
722 #[test]
723 fn handler_tolerates_unknown_fields_at_runtime() {
724 // #1052 wire-truthful contract: the schema does not advertise
725 // `additionalProperties: false`, so the runtime must tolerate
726 // (and ignore) unknown extra fields rather than reject the turn.
727 // Wider host compat — a host with a newer field set must not
728 // have its turns dropped wholesale.
729 let conn = fresh_conn();
730 let resp = call_handler(
731 &conn,
732 &json!({
733 "host_session_id": "session-a",
734 "host_turn_index": 0,
735 "role": "user",
736 "content": "hello",
737 "an_unknown_extra_field": "tolerated and ignored"
738 }),
739 )
740 .expect(
741 "unknown extra fields are tolerated at runtime (post-#1052 wire-truthful contract)",
742 );
743 assert_eq!(resp["layer"].as_str(), Some("L4"));
744 assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
745 }
746
747 #[test]
748 fn handler_rejects_missing_required_field() {
749 // Permissive on UNKNOWN fields, but REQUIRED fields are still
750 // enforced by serde (no `#[serde(default)]`). A turn missing
751 // `content` must error rather than silently store an empty turn.
752 let conn = fresh_conn();
753 let err = call_handler(
754 &conn,
755 &json!({
756 "host_session_id": "session-a",
757 "host_turn_index": 0,
758 "role": "user"
759 }),
760 )
761 .expect_err("missing required `content` must error");
762 assert!(err.starts_with("INVALID_INPUT"), "got: {err}");
763 }
764
765 #[test]
766 fn handler_accepts_full_request_with_tool_calls() {
767 let conn = fresh_conn();
768 let resp = call_handler(
769 &conn,
770 &json!({
771 "host_session_id": "session-a",
772 "host_turn_index": 5,
773 "role": "assistant",
774 "content": "running command",
775 "host_kind": "claude-code",
776 "host_version": "1.0.0",
777 "tool_calls": [
778 {"tool": "Bash", "brief": "list files"}
779 ],
780 "timestamp_iso": "2026-05-28T12:00:00Z",
781 "namespace": "test"
782 }),
783 )
784 .expect("ok");
785 // Post-storage-tx: memory_id is a UUID. dedup_hit=false on
786 // the first call. layer=L4 always.
787 assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
788 assert_eq!(resp["layer"].as_str(), Some("L4"));
789 let memory_id = resp["memory_id"].as_str().expect("memory_id is a string");
790 assert!(!memory_id.is_empty(), "memory_id must be non-empty");
791 }
792
793 #[test]
794 fn handler_idempotent_on_same_session_turn() {
795 // Per RFC-0001 §"Idempotency contract": two calls with the
796 // same (host_session_id, host_turn_index) produce exactly
797 // one memory. The second call returns dedup_hit:true and
798 // the existing memory_id.
799 let conn = fresh_conn();
800 let first = call_handler(
801 &conn,
802 &json!({
803 "host_session_id": "session-idem",
804 "host_turn_index": 0,
805 "role": "user",
806 "content": "operator directive"
807 }),
808 )
809 .expect("first call ok");
810 assert_eq!(first["dedup_hit"].as_bool(), Some(false));
811 let first_id = first["memory_id"].as_str().unwrap().to_string();
812
813 let second = call_handler(
814 &conn,
815 &json!({
816 "host_session_id": "session-idem",
817 "host_turn_index": 0,
818 "role": "user",
819 "content": "operator directive"
820 }),
821 )
822 .expect("second call ok");
823 assert_eq!(
824 second["dedup_hit"].as_bool(),
825 Some(true),
826 "second call must dedup-hit"
827 );
828 assert_eq!(
829 second["memory_id"].as_str().unwrap(),
830 first_id,
831 "second call returns the first call's memory_id"
832 );
833 }
834
835 #[test]
836 fn handler_distinct_session_turn_creates_separate_memories() {
837 let conn = fresh_conn();
838 let a = call_handler(
839 &conn,
840 &json!({
841 "host_session_id": "session-a",
842 "host_turn_index": 0,
843 "role": "user",
844 "content": "a"
845 }),
846 )
847 .expect("a ok");
848 let b = call_handler(
849 &conn,
850 &json!({
851 "host_session_id": "session-b",
852 "host_turn_index": 0,
853 "role": "user",
854 "content": "b"
855 }),
856 )
857 .expect("b ok");
858 let c = call_handler(
859 &conn,
860 &json!({
861 "host_session_id": "session-a",
862 "host_turn_index": 1,
863 "role": "user",
864 "content": "c"
865 }),
866 )
867 .expect("c ok");
868
869 assert_eq!(a["dedup_hit"].as_bool(), Some(false));
870 assert_eq!(b["dedup_hit"].as_bool(), Some(false));
871 assert_eq!(c["dedup_hit"].as_bool(), Some(false));
872
873 let a_id = a["memory_id"].as_str().unwrap();
874 let b_id = b["memory_id"].as_str().unwrap();
875 let c_id = c["memory_id"].as_str().unwrap();
876 assert_ne!(a_id, b_id, "distinct sessions produce distinct memories");
877 assert_ne!(a_id, c_id, "distinct turns produce distinct memories");
878 assert_ne!(b_id, c_id);
879 }
880
881 // ------------------------------------------------------------------
882 // #1414 host-signature verification coverage (2026-06-11). Drives the
883 // `verify_host_signature` + `is_host_pubkey_enrolled` signed-path
884 // arms via the pure `prepare_capture_turn` so no governance gate /
885 // connection is needed.
886 // ------------------------------------------------------------------
887
888 /// Serialize the allowlist-env mutations across these tests.
889 fn allowlist_lock() -> std::sync::MutexGuard<'static, ()> {
890 static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
891 LOCK.get_or_init(|| std::sync::Mutex::new(()))
892 .lock()
893 .unwrap_or_else(|e| e.into_inner())
894 }
895
896 struct AllowlistGuard {
897 prev: Option<String>,
898 }
899 impl AllowlistGuard {
900 fn set(value: Option<&str>) -> Self {
901 let prev = std::env::var(L4_HOST_PUBKEY_ALLOWLIST_ENV).ok();
902 unsafe {
903 match value {
904 Some(v) => std::env::set_var(L4_HOST_PUBKEY_ALLOWLIST_ENV, v),
905 None => std::env::remove_var(L4_HOST_PUBKEY_ALLOWLIST_ENV),
906 }
907 }
908 Self { prev }
909 }
910 }
911 impl Drop for AllowlistGuard {
912 fn drop(&mut self) {
913 unsafe {
914 match &self.prev {
915 Some(v) => std::env::set_var(L4_HOST_PUBKEY_ALLOWLIST_ENV, v),
916 None => std::env::remove_var(L4_HOST_PUBKEY_ALLOWLIST_ENV),
917 }
918 }
919 }
920 }
921
922 /// Deserialize a request from a JSON object — the same path the
923 /// production handler uses (`serde_json::from_value`). Lets the
924 /// signed-path tests build requests without a `Default` impl.
925 fn req_from(v: Value) -> MemoryCaptureTurnRequest {
926 serde_json::from_value(v).expect("valid request shape")
927 }
928
929 /// Build a deterministic signing key + the canonical-bytes signature
930 /// for a `(session, turn, role, content)` tuple, mirroring the
931 /// substrate's canonical encoding, and return the request JSON +
932 /// the b64 pubkey for allowlist enrollment.
933 fn signed_req_json(
934 session: &str,
935 turn: i64,
936 role: &str,
937 content: &str,
938 key: &ed25519_dalek::SigningKey,
939 ) -> (Value, String) {
940 use ed25519_dalek::Signer;
941 let canonical = format!("{session}\0{turn}\0{role}\0{content}");
942 let sig = key.sign(canonical.as_bytes());
943 let pubkey_b64 = B64_STD.encode(key.verifying_key().to_bytes());
944 let sig_b64 = B64_STD.encode(sig.to_bytes());
945 let v = json!({
946 "host_session_id": session,
947 "host_turn_index": turn,
948 "role": role,
949 "content": content,
950 "host_signature_b64": sig_b64,
951 "host_pubkey_b64": pubkey_b64,
952 });
953 (v, pubkey_b64)
954 }
955
956 fn test_key() -> ed25519_dalek::SigningKey {
957 ed25519_dalek::SigningKey::from_bytes(&[7u8; 32])
958 }
959
960 #[test]
961 fn signed_path_enrolled_pubkey_verifies_signed_by_peer() {
962 let _g = allowlist_lock();
963 let key = test_key();
964 let (v, pubkey_b64) = signed_req_json("sess-sig", 0, "user", "signed content", &key);
965 let _env = AllowlistGuard::set(Some(&pubkey_b64));
966 let write = prepare_capture_turn(&req_from(v), "ai:caller").expect("verify ok");
967 assert_eq!(
968 write.signed_event.attest_level,
969 crate::models::AttestLevel::SignedByPeer.as_str()
970 );
971 assert!(write.signed_event.signature.is_some());
972 }
973
974 #[test]
975 fn signed_path_unenrolled_pubkey_refused() {
976 let _g = allowlist_lock();
977 let key = test_key();
978 let (v, _pubkey) = signed_req_json("sess-sig", 0, "user", "signed content", &key);
979 // Allowlist empty → not enrolled.
980 let _env = AllowlistGuard::set(Some(""));
981 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("must refuse");
982 assert!(err.starts_with("HOST_PUBKEY_NOT_ENROLLED"), "got: {err}");
983 }
984
985 #[test]
986 fn signed_path_unset_allowlist_refuses_every_signed_call() {
987 let _g = allowlist_lock();
988 let key = test_key();
989 let (v, _pubkey) = signed_req_json("sess-sig", 0, "user", "c", &key);
990 let _env = AllowlistGuard::set(None);
991 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("must refuse");
992 assert!(err.starts_with("HOST_PUBKEY_NOT_ENROLLED"), "got: {err}");
993 }
994
995 #[test]
996 fn signed_path_tampered_signature_fails_verification() {
997 let _g = allowlist_lock();
998 let key = test_key();
999 let (mut v, pubkey_b64) = signed_req_json("sess-sig", 0, "user", "original", &key);
1000 // Enroll the pubkey, but corrupt the content so the signature no
1001 // longer matches the canonical bytes → verify_strict fails.
1002 v["content"] = json!("tampered");
1003 let _env = AllowlistGuard::set(Some(&pubkey_b64));
1004 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("verify must fail");
1005 assert!(err.contains("signature_verification_failed"), "got: {err}");
1006 }
1007
1008 #[test]
1009 fn signed_path_paired_fields_mismatch_rejected() {
1010 // Exactly one of (sig, pubkey) present → paired-fields error.
1011 let v = json!({
1012 "host_session_id": "s",
1013 "host_turn_index": 0,
1014 "role": "user",
1015 "content": "c",
1016 "host_signature_b64": "AA",
1017 });
1018 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("paired-fields");
1019 assert!(
1020 err.contains("must both be present or both absent"),
1021 "got: {err}"
1022 );
1023 }
1024
1025 #[test]
1026 fn signed_path_bad_base64_pubkey_rejected() {
1027 let _g = allowlist_lock();
1028 let v = json!({
1029 "host_session_id": "s",
1030 "host_turn_index": 0,
1031 "role": "user",
1032 "content": "c",
1033 "host_signature_b64": "AA",
1034 "host_pubkey_b64": "!!!not-base64!!!",
1035 });
1036 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("bad b64");
1037 assert!(
1038 err.contains("host_pubkey_b64 not valid base64"),
1039 "got: {err}"
1040 );
1041 }
1042
1043 #[test]
1044 fn signed_path_wrong_length_pubkey_rejected() {
1045 let _g = allowlist_lock();
1046 // Valid base64 but decodes to != 32 bytes.
1047 let short = B64_STD.encode([1u8; 16]);
1048 let v = json!({
1049 "host_session_id": "s",
1050 "host_turn_index": 0,
1051 "role": "user",
1052 "content": "c",
1053 "host_signature_b64": "AA",
1054 "host_pubkey_b64": short,
1055 });
1056 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("wrong len");
1057 assert!(err.contains("must decode to 32 bytes"), "got: {err}");
1058 }
1059
1060 #[test]
1061 fn agent_id_mismatch_rejected_in_prepare() {
1062 // #1413 — metadata.agent_id must equal the resolved caller.
1063 let v = json!({
1064 "host_session_id": "s",
1065 "host_turn_index": 0,
1066 "role": "user",
1067 "content": "c",
1068 "metadata": {"agent_id": "ai:someone-else"},
1069 });
1070 let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("agent mismatch");
1071 assert!(err.contains("does not match resolved caller"), "got: {err}");
1072 }
1073}