Skip to main content

ai_memory/mcp/tools/
capture_turn.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! v0.7.0 (#1389) — MCP `memory_capture_turn` handler. Substrate-side
5//! implementation of the L4 layer of the layered-capture architecture
6//! per RFC-0001 (`docs/rfc/RFC-0001-mcp-turn-capture.md`).
7//!
8//! # What this tool does
9//!
10//! Hosts (Claude Code / Codex CLI / Gemini CLI / IDE plugins / future
11//! MCP-aware harnesses) call `memory_capture_turn` once per
12//! conversation turn to volunteer the turn content directly into the
13//! substrate. The substrate stores it idempotently by
14//! `(host_session_id, host_turn_index)` and writes a `signed_events`
15//! row tagged `layer = "L4"` so audit can prove which layer caught
16//! each turn.
17//!
18//! # Why L4 is THE FIX
19//!
20//! Layered defense (per architecture memo `f62cb182`):
21//!
22//! - **L1** — agent discipline (`memory_capture_nag`) catches the
23//!   common case "agent forgot."
24//! - **L2** — `recover-previous-session` catches SIGKILL between
25//!   sessions on the same host.
26//! - **L3** — substrate filesystem-notify watcher catches mid-session
27//!   crashes + concurrent multi-session capture.
28//! - **L4** — THIS tool — host volunteers turns directly via MCP
29//!   protocol. No transcript scraping. No format coupling. The trust
30//!   boundary is the protocol contract. **Survives 50 years of
31//!   vendor churn** because the substrate doesn't depend on any
32//!   single host's implementation details.
33//!
34//! # Performance contract
35//!
36//! Per issue #1394 + the operator's "optimal performance" directive:
37//! synchronous dispatch < 10 ms p95 under release-build conditions.
38//! Substrate path: sha256 of canonical bytes + dedup-table SELECT on
39//! `(host_session_id, host_turn_index)` + (on miss) memory INSERT +
40//! `transcript_line_dedup` INSERT + `signed_events` chain row in a
41//! single transaction.
42//!
43//! # Idempotency contract (per RFC-0001 §"Idempotency contract")
44//!
45//! Two calls with the same `(host_session_id, host_turn_index)`
46//! produce exactly one memory. The second call returns
47//! `dedup_hit: true` + the existing memory_id. Re-delivery on host
48//! reconnect is safe.
49//!
50//! # Status (v0.7.0 ship slice)
51//!
52//! The Request struct + Tool impl + dispatch wiring land first
53//! (this commit). The substantive storage transaction lands in a
54//! follow-up slice on the same branch (`feat/1389-layered-capture`)
55//! — the skeleton handler returns a stub envelope with
56//! `dedup_hit: false` + a placeholder memory_id so the wire shape
57//! is exercisable from MCP clients during the implementation cycle.
58
59use crate::models::field_names;
60use std::time::Instant;
61
62use base64::Engine;
63use base64::engine::general_purpose::STANDARD as B64_STD;
64use schemars::JsonSchema;
65use serde::Deserialize;
66use serde_json::{Value, json};
67use sha2::{Digest, Sha256};
68
69use crate::mcp::param_names;
70use crate::mcp::registry::McpTool;
71use crate::models::{Memory, MemoryKind, Tier};
72use crate::signed_events::{self, SignedEvent};
73
74/// Env var carrying the operator's per-host Ed25519 pubkey allowlist
75/// for L4 `memory_capture_turn` signature verification (#1414).
76/// Comma-separated base64-encoded 32-byte pubkeys. Unset / empty =
77/// no host signatures accepted (every `host_signature_b64` +
78/// `host_pubkey_b64` payload errors with `HOST_PUBKEY_NOT_ENROLLED`).
79///
80/// Mirrors the `AI_MEMORY_ADMIN_AGENT_IDS` shape — an operator-
81/// curated allowlist read at call time, no daemon-restart required
82/// for enrollment changes (each call re-reads the env). Documented
83/// in CLAUDE.md §"Environment Variables".
84pub(crate) const L4_HOST_PUBKEY_ALLOWLIST_ENV: &str = "AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST";
85
86/// #1558 batch 5 wave 3 — action label for the L4 capture-turn write
87/// gate (deny-message verb + the `action` field on ask/pending
88/// envelopes). File-local: no other surface uses this label.
89const ACTION_CAPTURE_TURN: &str = "capture_turn";
90
91/// `memory_capture_turn` request body per RFC-0001 §"Tool input schema".
92///
93/// Field-by-field doc comments become the schemars-generated
94/// `description` strings in the MCP `inputSchema`. The schema doubles
95/// as the wire contract for every MCP-aware host that volunteers
96/// turns.
97// Per the #1052 wire-truthfulness decision (Agent-4 F2): no MCP
98// tool-request struct carries `deny_unknown_fields`. The wire schema
99// must not advertise `additionalProperties: false` while the runtime
100// stays permissive. For an L4 multi-host ingest surface this is
101// load-bearing — a host that adds a top-level field must not have its
102// turns rejected wholesale (the #1052 rationale cites exactly "clients
103// with newer field sets"). Unknown extra fields are tolerated and
104// ignored; missing REQUIRED fields still error via serde. Arbitrary
105// host-specific data belongs in `metadata`.
106#[derive(Debug, Clone, Deserialize, JsonSchema)]
107pub struct MemoryCaptureTurnRequest {
108    /// Opaque identifier the host issues per conversation session.
109    /// Stable across turns within a session; distinct across
110    /// sessions. Used as one half of the dedup key
111    /// `(host_session_id, host_turn_index)`.
112    pub host_session_id: String,
113
114    /// Monotonically increasing per-`(host_session_id)` turn counter.
115    /// Starts at 0 for the first turn. The substrate uses
116    /// `(host_session_id, host_turn_index)` as the canonical dedup
117    /// key so re-delivery of the same turn is idempotent.
118    pub host_turn_index: i64,
119
120    /// Speaker classification — `user` / `assistant` / `tool_use` /
121    /// `tool_result` / `system` / `other`. Drives downstream
122    /// memory_kind assignment in the v0.8 decision-detector
123    /// classifier.
124    pub role: String,
125
126    /// Verbatim turn text. The substrate preserves this byte-for-byte;
127    /// classifiers run separately downstream via the existing
128    /// atomiser / curator surface.
129    pub content: String,
130
131    /// Identifier for the host implementation (e.g. `"claude-code"`,
132    /// `"codex"`, `"gemini"`, `"cursor"`, `"cline"`). Surfaced in the
133    /// audit trail + the operator-facing per-host coverage report.
134    /// When omitted, defaults to `"unknown"`.
135    #[serde(default)]
136    pub host_kind: Option<String>,
137
138    /// Version string for the host implementation. Surfaced in the
139    /// audit trail so future format drift can be diagnosed by host
140    /// version.
141    #[serde(default)]
142    pub host_version: Option<String>,
143
144    /// Optional summary of tool invocations within this assistant
145    /// turn. Each entry is `{tool: string, brief: string}`. The
146    /// substrate preserves the list verbatim but does not (at v0.7.0)
147    /// classify or index per-tool-call. Reserved for v0.7.x atom-
148    /// per-tool indexing; the field is wire-stable today so hosts
149    /// can already populate it without breakage.
150    #[serde(default)]
151    #[allow(dead_code)]
152    pub tool_calls: Vec<ToolCallSummary>,
153
154    /// RFC3339 instant the host emitted the turn. Used as the
155    /// recovered memory's `created_at` so the timeline matches the
156    /// original conversation rather than the capture-call wall-clock.
157    /// When omitted, the substrate stamps with its current clock.
158    #[serde(default)]
159    pub timestamp_iso: Option<String>,
160
161    /// Optional Ed25519 signature over the canonical-bytes encoding
162    /// `host_session_id || 0x00 || host_turn_index || 0x00 || role ||
163    /// 0x00 || content`. When present + verified, the substrate
164    /// writes `attest_level = "signed_by_peer"` on the resulting
165    /// memory. When absent, `attest_level = "self_signed"`.
166    #[serde(default)]
167    pub host_signature_b64: Option<String>,
168
169    /// Ed25519 pubkey the substrate should verify
170    /// `host_signature_b64` against. The pubkey MUST be pre-enrolled
171    /// via the existing federation peer-allowlist mechanism.
172    /// Unenrolled pubkeys cause the call to fail with
173    /// `HOST_PUBKEY_NOT_ENROLLED`.
174    #[serde(default)]
175    pub host_pubkey_b64: Option<String>,
176
177    /// Substrate namespace the turn lands in. Defaults to the
178    /// agent's resolved default namespace per the calling context.
179    #[serde(default)]
180    pub namespace: Option<String>,
181
182    /// Optional arbitrary metadata the host wants to preserve
183    /// alongside the turn. Reserved keys (`agent_id`, `entity_id`,
184    /// `mentioned_entity_id`) follow the existing
185    /// `crate::validate::RequestValidator` rules.
186    #[serde(default)]
187    pub metadata: Option<Value>,
188}
189
190/// Summary of one tool call within an assistant turn. Mirrors the
191/// `crate::recover::parsers::ToolCallSummary` shape so L2/L3 +
192/// L4 capture surfaces produce the same downstream memory shape.
193// Wire-truthful permissive per #1052 (see MemoryCaptureTurnRequest).
194#[derive(Debug, Clone, Deserialize, JsonSchema)]
195#[allow(dead_code)] // Skeleton phase — fields read by the storage
196// transaction landing in the follow-up slice.
197pub struct ToolCallSummary {
198    /// Tool name (e.g. `"Bash"`, `"Read"`,
199    /// `"mcp__memory__memory_store"`).
200    pub tool: String,
201    /// One-line target/brief. For `Bash`, the `description` arg; for
202    /// `Read`, the file path; for an MCP tool, the first 1-2 fields
203    /// of the request struct. Truncated to 200 chars by convention.
204    pub brief: String,
205}
206
207/// Zero-sized `McpTool` registration for the v0.7.0 D1.6 recipe.
208pub struct MemoryCaptureTurnTool;
209
210impl McpTool for MemoryCaptureTurnTool {
211    fn name() -> &'static str {
212        crate::mcp::registry::tool_names::MEMORY_CAPTURE_TURN
213    }
214
215    fn description() -> &'static str {
216        "L4 host-volunteered turn capture per RFC-0001 (mcp-turn-capture). \
217         Idempotent by (host_session_id, host_turn_index)."
218    }
219
220    fn docs() -> &'static str {
221        "v0.7.0 #1389 L4: host volunteers each conversation turn directly \
222         via the MCP protocol. Substrate stores it idempotently and writes \
223         a signed_events row tagged layer=L4. Replaces transcript-file \
224         scraping with a clean protocol-level contract. Full design at \
225         docs/rfc/RFC-0001-mcp-turn-capture.md. Closes the #1388 substrate \
226         failure mode at the protocol layer."
227    }
228
229    fn input_schema() -> Value {
230        crate::mcp::registry::input_schema_for::<MemoryCaptureTurnRequest>()
231    }
232
233    fn family() -> &'static str {
234        // Lifecycle — capture is a substrate-lifecycle primitive
235        // (every host-volunteered turn produces one memory row).
236        crate::profile::Family::Lifecycle.name()
237    }
238}
239
240/// Handler entrypoint dispatched from `crate::mcp::handle_request`.
241///
242/// Performs the L4 idempotent capture per RFC-0001 §"Idempotency
243/// contract":
244///
245/// 1. SELECT `memory_id` FROM `transcript_line_dedup` WHERE
246///    `(host_session_id, host_turn_index) = (?, ?)` — the canonical
247///    dedup key.
248/// 2. On hit: return `{memory_id, dedup_hit: true, layer: "L4",
249///    elapsed_ms}`. No DB write.
250/// 3. On miss: compute sha256 of canonical-bytes, `BEGIN IMMEDIATE`
251///    transaction → `memories` INSERT via the canonical
252///    `storage::insert` path → `transcript_line_dedup` INSERT →
253///    COMMIT (or ROLLBACK on any failure with the transaction
254///    rolled back atomically).
255///
256/// # Errors
257///
258/// Returns a string-stable error code per the MCP-spec error
259/// convention (see `crate::mcp::handle_request`'s 2025-03-26
260/// `§"Tool result"` comment):
261///
262/// - `INVALID_INPUT: <reason>` — request failed deserialization or
263///   schema validation.
264/// - `DEDUP_QUERY_FAILED: <detail>` — `transcript_line_dedup`
265///   SELECT I/O failure.
266/// - `MEMORY_INSERT_FAILED: <detail>` — `storage::insert` returned
267///   an error (governance refusal, validation failure, SQL error).
268/// - `DEDUP_INSERT_FAILED: <detail>` — `transcript_line_dedup`
269///   INSERT failed; transaction rolled back, no row written.
270/// - `TX_BEGIN_FAILED: <detail>` / `TX_COMMIT_FAILED: <detail>` —
271///   transaction lifecycle errors.
272/// - `HOST_PUBKEY_NOT_ENROLLED: <pubkey>` — `host_pubkey_b64` is not
273///   on the operator's L4 host-pubkey allowlist
274///   (`AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST` env). Per #1414.
275/// - `SIGNED_EVENTS_APPEND_FAILED: <detail>` — substrate failed to
276///   write the L4 audit row. Per #1415.
277///
278/// # Security (post-#1413 critical fix)
279///
280/// - **agent_id agreement** — when `req.metadata.agent_id` is present
281///   it MUST equal the resolved `caller_agent_id` (mirroring
282///   `resolve_http_agent_id`'s body-header agreement contract).
283///   Mismatch returns `INVALID_INPUT` and refuses the write.
284/// - **Signature verification** — when `host_signature_b64` and
285///   `host_pubkey_b64` are present, the pubkey is checked against the
286///   `AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST` env-var allowlist
287///   (`HOST_PUBKEY_NOT_ENROLLED` on miss) and the signature is verified
288///   via Ed25519 over the canonical-bytes encoding
289///   `host_session_id || 0x00 || host_turn_index || 0x00 || role ||
290///   0x00 || content` (`INVALID_INPUT: signature_verification_failed`
291///   on mismatch). On success the L4 audit row carries
292///   `attest_level = "signed_by_peer"`; absent both fields yields
293///   `attest_level = "self_signed"`; exactly one of the two fields
294///   present errors with `INVALID_INPUT`.
295/// - **`signed_events` chain row** — the substrate writes one row per
296///   successful capture inside the BEGIN IMMEDIATE transaction with
297///   `event_type = "memory_capture_turn"`, the resolved `attest_level`,
298///   and `payload_hash = sha256(canonical bytes)` so audit can prove
299///   which layer caught each turn (#1415).
300pub fn handle_capture_turn(
301    conn: &rusqlite::Connection,
302    params: &Value,
303    caller_agent_id: Option<&str>,
304) -> Result<Value, String> {
305    let start = Instant::now();
306    let req: MemoryCaptureTurnRequest =
307        serde_json::from_value(params.clone()).map_err(|e| format!("INVALID_INPUT: {e}"))?;
308
309    // v0.7.0 #1413 — resolve effective caller for the agent_id agreement
310    // check + signed_events row attribution. MCP stdio captures the host
311    // identity at `initialize.clientInfo.name`; when present, the
312    // dispatcher threads it via `ctx.mcp_client`. When absent, we still
313    // mint a per-request fallback so audit attribution is never empty.
314    let caller = caller_agent_id.unwrap_or("anonymous:mcp-unknown");
315
316    // v0.7.0 #1416 — all validation (agent_id agreement #1413,
317    // signature verification #1414) + Memory/SignedEvent construction
318    // is backend-agnostic and lives in `prepare_capture_turn`; the
319    // dedup-lookup + atomic three-row transaction is the sqlite SSOT
320    // `crate::storage::capture_turn_idempotent` (also reached by
321    // `SqliteStore::capture_turn_idempotent` through the SAL trait).
322    let write = prepare_capture_turn(&req, caller)?;
323    let attest_level = write.signed_event.attest_level.clone();
324
325    // v0.7.0 H1 (HIGH) — write-gate parity for the mutating
326    // `capture_turn` verb. Pre-fix, L4 turn capture persisted a Memory
327    // row WITHOUT passing through the K9 permission gate or the
328    // K3/Task-1.9 governance gate that `memory_store` enforces, so a
329    // governed namespace could be written via `memory_capture_turn`
330    // ungated. L4 capture is a store-class write: we gate it under the
331    // same `Op::MemoryStore` / `GovernedAction::Store` policy surface as
332    // `memory_store`, against the resolved target namespace. A dedup-hit
333    // is a no-op write, so gating before the idempotent transaction is
334    // safe — a denied namespace refuses uniformly whether or not the
335    // turn was already captured.
336    {
337        let gate_namespace = write.memory.namespace.clone();
338        let gate_payload = json!({
339            "id": write.memory.id,
340            "title": write.memory.title,
341            "namespace": gate_namespace,
342        });
343
344        use crate::permissions::{Op, PermissionContext, Permissions};
345        let ctx = PermissionContext {
346            op: Op::MemoryStore,
347            namespace: gate_namespace.clone(),
348            agent_id: caller.to_string(),
349            payload: gate_payload.clone(),
350        };
351        match Permissions::evaluate(&ctx, &[]) {
352            crate::permissions::Decision::Allow | crate::permissions::Decision::Modify(_) => {}
353            crate::permissions::Decision::Deny(reason) => {
354                return Err(crate::governance::deny_message(
355                    ACTION_CAPTURE_TURN,
356                    crate::governance::DenyGate::PermissionRule,
357                    &reason,
358                ));
359            }
360            crate::permissions::Decision::Ask(prompt) => {
361                return Ok(json!({
362                    "status": "ask",
363                    "reason": prompt,
364                    "action": ACTION_CAPTURE_TURN,
365                    "namespace": gate_namespace,
366                }));
367            }
368        }
369
370        use crate::models::{GovernanceDecision, GovernedAction};
371        match crate::db::enforce_governance(
372            conn,
373            GovernedAction::Store,
374            &gate_namespace,
375            caller,
376            Some(&write.memory.id),
377            Some(caller),
378            &gate_payload,
379        )
380        .map_err(|e| e.to_string())?
381        {
382            GovernanceDecision::Allow => {}
383            GovernanceDecision::Deny(refusal) => {
384                return Err(crate::governance::deny_message(
385                    ACTION_CAPTURE_TURN,
386                    crate::governance::DenyGate::Governance,
387                    &refusal.reason,
388                ));
389            }
390            GovernanceDecision::Pending(pending_id) => {
391                return Ok(json!({
392                    "status": "pending",
393                    (field_names::PENDING_ID): pending_id,
394                    "reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
395                    "action": ACTION_CAPTURE_TURN,
396                    "namespace": gate_namespace,
397                }));
398            }
399        }
400    }
401
402    let result = crate::storage::capture_turn_idempotent(conn, &write)?;
403    let elapsed_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
404
405    if result.dedup_hit {
406        Ok(json!({
407            "memory_id": result.memory_id,
408            "dedup_hit": true,
409            "layer": "L4",
410            (field_names::ELAPSED_MS): elapsed_ms,
411        }))
412    } else {
413        Ok(json!({
414            "memory_id": result.memory_id,
415            "dedup_hit": false,
416            "layer": "L4",
417            (field_names::ATTEST_LEVEL): attest_level,
418            (field_names::ELAPSED_MS): elapsed_ms,
419        }))
420    }
421}
422
423/// v0.7.0 #1416 — backend-agnostic preparation of an L4 capture write.
424///
425/// Performs every step that does NOT touch the database — agent_id
426/// agreement (#1413), canonical-bytes hashing, host-signature
427/// verification (#1414), and construction of the [`Memory`] plus the
428/// [`SignedEvent`] audit row (#1415) — and returns a ready-to-write
429/// [`crate::models::CaptureTurnWrite`]. Both surfaces hand the bundle to
430/// the dedup-keyed transaction: the MCP sqlite handler via
431/// `crate::storage::capture_turn_idempotent`, the HTTP route via
432/// `app.store.capture_turn_idempotent` (sqlite OR postgres). Keeping the
433/// verification here means it is enforced identically across both
434/// surfaces and both backends — no adapter can skip it.
435pub(crate) fn prepare_capture_turn(
436    req: &MemoryCaptureTurnRequest,
437    caller: &str,
438) -> Result<crate::models::CaptureTurnWrite, String> {
439    // #1413 — agent_id agreement. A caller-stamped `metadata.agent_id`
440    // MUST equal the resolved caller, else an attacker with access to
441    // the surface could forge memories attributed to other identities.
442    if let Some(meta_agent) = req
443        .metadata
444        .as_ref()
445        .and_then(|v| v.get(param_names::AGENT_ID))
446        .and_then(|v| v.as_str())
447        && meta_agent != caller
448    {
449        return Err(format!(
450            "INVALID_INPUT: metadata.agent_id ({meta_agent:?}) does not match resolved caller ({caller:?})"
451        ));
452    }
453
454    // #1414 — canonical-bytes + host-signature verification. Cases:
455    //   (sig, pubkey) both Some → check allowlist + verify Ed25519
456    //   exactly one Some → INVALID_INPUT (paired-fields contract)
457    //   both None → unsigned ("self_signed") capture path
458    let canonical = format!(
459        "{}\0{}\0{}\0{}",
460        &req.host_session_id, req.host_turn_index, &req.role, &req.content
461    );
462    let sha_vec = {
463        let mut hasher = Sha256::new();
464        hasher.update(canonical.as_bytes());
465        hasher.finalize().to_vec()
466    };
467
468    let (sig_bytes_opt, attest_level): (Option<Vec<u8>>, String) =
469        verify_host_signature(req, canonical.as_bytes())?;
470
471    let host_kind = req.host_kind.as_deref().unwrap_or("unknown").to_string();
472    let now_iso = chrono::Utc::now().to_rfc3339();
473    let created_at = req.timestamp_iso.clone().unwrap_or_else(|| now_iso.clone());
474
475    let mut tags = vec![
476        "captured-via-l4".to_string(),
477        format!("host:{host_kind}"),
478        format!("role:{}", req.role),
479        format!("attest:{attest_level}"),
480    ];
481    if let Some(hv) = req.host_version.as_deref() {
482        tags.push(format!("host-version:{hv}"));
483    }
484
485    // Title MUST be unique per (host_session_id, host_turn_index)
486    // because the substrate's `storage::insert` upserts on
487    // `(title, namespace)`; without host_session_id in the title,
488    // two distinct sessions whose turn N has the same role would
489    // collide on the same memory row.
490    let title = format!(
491        "L4 capture {} {} turn {} ({})",
492        host_kind, req.host_session_id, req.host_turn_index, req.role
493    );
494
495    // #1413 — stamp `metadata.agent_id` with the resolved caller so the
496    // inserted memory carries the authenticated identity. Synthesize an
497    // object when absent; patch when present without agent_id; preserve
498    // verbatim when present with a matching agent_id (checked above).
499    let metadata = {
500        let mut m = req.metadata.clone().unwrap_or_else(|| json!({}));
501        if let Some(obj) = m.as_object_mut() {
502            obj.entry("agent_id".to_string())
503                .or_insert_with(|| Value::String(caller.to_string()));
504        }
505        m
506    };
507
508    let mem = Memory {
509        id: uuid::Uuid::new_v4().to_string(),
510        tier: Tier::Long,
511        // v0.7.0 F-E4 fix (#1436): route through DEFAULT_NAMESPACE
512        // SSOT at src/lib.rs:266 instead of the bare literal.
513        namespace: req
514            .namespace
515            .clone()
516            .unwrap_or_else(|| crate::DEFAULT_NAMESPACE.to_string()),
517        title,
518        content: req.content.clone(),
519        tags,
520        priority: 5,
521        confidence: 1.0,
522        source: "host".to_string(),
523        metadata,
524        created_at,
525        updated_at: now_iso.clone(),
526        last_accessed_at: Some(now_iso.clone()),
527        memory_kind: MemoryKind::Observation,
528        ..Memory::default()
529    };
530
531    // v0.7.0 #1415 — audit-chain row appended inside the same tx as the
532    // data rows (by the store method) so audit can prove L4 caught the
533    // turn; attest_level reflects whether the host provided a verified
534    // Ed25519 signature (#1414).
535    let signed_event = SignedEvent {
536        id: uuid::Uuid::new_v4().to_string(),
537        agent_id: caller.to_string(),
538        event_type: signed_events::event_types::MEMORY_CAPTURE_TURN.to_string(),
539        payload_hash: sha_vec.clone(),
540        signature: sig_bytes_opt,
541        attest_level,
542        timestamp: now_iso,
543        ..SignedEvent::default()
544    };
545
546    Ok(crate::models::CaptureTurnWrite {
547        memory: mem,
548        sha256: sha_vec,
549        host_kind,
550        host_session_id: req.host_session_id.clone(),
551        host_turn_index: req.host_turn_index,
552        recovered_at_ms: chrono::Utc::now().timestamp_millis(),
553        signed_event,
554    })
555}
556
557/// v0.7.0 #1414 — parse + verify the host signature pair, returning
558/// `(sig_bytes_opt, attest_level)` for downstream use in the
559/// signed_events row.
560///
561/// Contract:
562/// - both `host_signature_b64` and `host_pubkey_b64` present →
563///   pubkey allowlist check → Ed25519 verify → ("signed_by_peer")
564/// - exactly one of the two present → `INVALID_INPUT` (paired-fields)
565/// - both absent → (`None`, "self_signed")
566fn verify_host_signature(
567    req: &MemoryCaptureTurnRequest,
568    canonical_bytes: &[u8],
569) -> Result<(Option<Vec<u8>>, String), String> {
570    match (req.host_signature_b64.as_deref(), req.host_pubkey_b64.as_deref()) {
571        (None, None) => Ok((None, crate::models::AttestLevel::SelfSigned.as_str().to_string())),
572        (Some(_), None) | (None, Some(_)) => Err(
573            "INVALID_INPUT: host_signature_b64 and host_pubkey_b64 must both be present or both absent"
574                .to_string(),
575        ),
576        (Some(sig_b64), Some(pubkey_b64)) => {
577            let pubkey_bytes = B64_STD
578                .decode(pubkey_b64)
579                .map_err(|e| format!("INVALID_INPUT: host_pubkey_b64 not valid base64: {e}"))?;
580            let pubkey_arr: [u8; 32] = pubkey_bytes.try_into().map_err(|_| {
581                "INVALID_INPUT: host_pubkey_b64 must decode to 32 bytes (Ed25519)".to_string()
582            })?;
583
584            if !is_host_pubkey_enrolled(&pubkey_arr) {
585                return Err(format!("HOST_PUBKEY_NOT_ENROLLED: {pubkey_b64}"));
586            }
587
588            let verifying_key = ed25519_dalek::VerifyingKey::from_bytes(&pubkey_arr).map_err(
589                |e| format!("INVALID_INPUT: host_pubkey_b64 not a valid Ed25519 key: {e}"),
590            )?;
591
592            let sig_bytes = B64_STD
593                .decode(sig_b64)
594                .map_err(|e| format!("INVALID_INPUT: host_signature_b64 not valid base64: {e}"))?;
595            let sig_arr: [u8; 64] = sig_bytes.clone().try_into().map_err(|_| {
596                "INVALID_INPUT: host_signature_b64 must decode to 64 bytes (Ed25519)".to_string()
597            })?;
598            let signature = ed25519_dalek::Signature::from_bytes(&sig_arr);
599
600            verifying_key
601                .verify_strict(canonical_bytes, &signature)
602                .map_err(|e| {
603                    format!("INVALID_INPUT: signature_verification_failed: {e}")
604                })?;
605
606            Ok((Some(sig_bytes), crate::models::AttestLevel::SignedByPeer.as_str().to_string()))
607        }
608    }
609}
610
611/// Check the env-var allowlist for a host pubkey. Re-reads the env
612/// on every call so operators can adjust enrollment without daemon
613/// restart. An unset / empty env means no host signatures are
614/// accepted (every signed-path call yields `HOST_PUBKEY_NOT_ENROLLED`)
615/// — the conservative default per the v0.7.0 sole-authority rule.
616fn is_host_pubkey_enrolled(pubkey: &[u8; 32]) -> bool {
617    let Ok(raw) = std::env::var(L4_HOST_PUBKEY_ALLOWLIST_ENV) else {
618        return false;
619    };
620    for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
621        if let Ok(bytes) = B64_STD.decode(entry)
622            && bytes.len() == 32
623            && bytes.as_slice() == pubkey
624        {
625            return true;
626        }
627    }
628    false
629}
630
631#[cfg(test)]
632mod d1_6_1389_tests {
633    //! D1.6 (#987) parity tests for the L4 `memory_capture_turn` tool.
634    //! Shared helpers live at [`crate::mcp::parity_test_helpers`].
635    use super::*;
636
637    #[test]
638    fn capture_turn_tool_metadata() {
639        assert_eq!(MemoryCaptureTurnTool::name(), "memory_capture_turn");
640        assert_eq!(MemoryCaptureTurnTool::family(), "lifecycle");
641        // Description + docs are non-empty so the MCP capability
642        // surface advertises a meaningful tool to discovery callers.
643        assert!(!MemoryCaptureTurnTool::description().is_empty());
644        assert!(!MemoryCaptureTurnTool::docs().is_empty());
645    }
646
647    #[test]
648    fn input_schema_is_valid_json() {
649        let schema = MemoryCaptureTurnTool::input_schema();
650        // The schemars-derived schema must serialize as a JSON
651        // object with required + properties keys per the JSON Schema
652        // draft spec.
653        let obj = schema.as_object().expect("schema is an object");
654        assert!(
655            obj.contains_key("properties"),
656            "schema must advertise properties"
657        );
658        // Sanity-check that the four required fields are required.
659        let required = obj
660            .get("required")
661            .and_then(Value::as_array)
662            .expect("required is an array");
663        let required_names: Vec<&str> = required.iter().filter_map(Value::as_str).collect();
664        for name in &["host_session_id", "host_turn_index", "role", "content"] {
665            assert!(
666                required_names.contains(name),
667                "required must include {name}"
668            );
669        }
670    }
671}
672
673#[cfg(test)]
674mod handler_tests {
675    //! Skeleton handler tests — the wire shape is stable; the
676    //! placeholder behavior is exercised here. The substantive
677    //! storage tests land in the follow-up commit alongside the
678    //! transactional path.
679    use super::*;
680
681    fn fresh_conn() -> rusqlite::Connection {
682        crate::storage::open(std::path::Path::new(":memory:")).expect("open in-memory db")
683    }
684
685    /// Test helper — calls the handler with no MCP-handshake caller
686    /// (`None`). The agent_id agreement check at #1413 is a no-op
687    /// when the request body carries no `metadata.agent_id`, so
688    /// every legacy test continues to pass under the new signature.
689    fn call_handler(conn: &rusqlite::Connection, params: &Value) -> Result<Value, String> {
690        handle_capture_turn(conn, params, None)
691    }
692
693    #[test]
694    fn handler_accepts_minimal_request() {
695        let conn = fresh_conn();
696        let resp = call_handler(
697            &conn,
698            &json!({
699                "host_session_id": "session-a",
700                "host_turn_index": 0,
701                "role": "user",
702                "content": "hello"
703            }),
704        )
705        .expect("ok");
706        assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
707        assert_eq!(resp["layer"].as_str(), Some("L4"));
708        assert!(resp["memory_id"].as_str().is_some());
709    }
710
711    #[test]
712    fn handler_rejects_missing_required_fields() {
713        let conn = fresh_conn();
714        let resp = call_handler(&conn, &json!({ "host_session_id": "x" }));
715        let err = resp.expect_err("missing required fields must error");
716        assert!(
717            err.starts_with("INVALID_INPUT"),
718            "error must use INVALID_INPUT prefix, got: {err}"
719        );
720    }
721
722    #[test]
723    fn handler_tolerates_unknown_fields_at_runtime() {
724        // #1052 wire-truthful contract: the schema does not advertise
725        // `additionalProperties: false`, so the runtime must tolerate
726        // (and ignore) unknown extra fields rather than reject the turn.
727        // Wider host compat — a host with a newer field set must not
728        // have its turns dropped wholesale.
729        let conn = fresh_conn();
730        let resp = call_handler(
731            &conn,
732            &json!({
733                "host_session_id": "session-a",
734                "host_turn_index": 0,
735                "role": "user",
736                "content": "hello",
737                "an_unknown_extra_field": "tolerated and ignored"
738            }),
739        )
740        .expect(
741            "unknown extra fields are tolerated at runtime (post-#1052 wire-truthful contract)",
742        );
743        assert_eq!(resp["layer"].as_str(), Some("L4"));
744        assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
745    }
746
747    #[test]
748    fn handler_rejects_missing_required_field() {
749        // Permissive on UNKNOWN fields, but REQUIRED fields are still
750        // enforced by serde (no `#[serde(default)]`). A turn missing
751        // `content` must error rather than silently store an empty turn.
752        let conn = fresh_conn();
753        let err = call_handler(
754            &conn,
755            &json!({
756                "host_session_id": "session-a",
757                "host_turn_index": 0,
758                "role": "user"
759            }),
760        )
761        .expect_err("missing required `content` must error");
762        assert!(err.starts_with("INVALID_INPUT"), "got: {err}");
763    }
764
765    #[test]
766    fn handler_accepts_full_request_with_tool_calls() {
767        let conn = fresh_conn();
768        let resp = call_handler(
769            &conn,
770            &json!({
771                "host_session_id": "session-a",
772                "host_turn_index": 5,
773                "role": "assistant",
774                "content": "running command",
775                "host_kind": "claude-code",
776                "host_version": "1.0.0",
777                "tool_calls": [
778                    {"tool": "Bash", "brief": "list files"}
779                ],
780                "timestamp_iso": "2026-05-28T12:00:00Z",
781                "namespace": "test"
782            }),
783        )
784        .expect("ok");
785        // Post-storage-tx: memory_id is a UUID. dedup_hit=false on
786        // the first call. layer=L4 always.
787        assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
788        assert_eq!(resp["layer"].as_str(), Some("L4"));
789        let memory_id = resp["memory_id"].as_str().expect("memory_id is a string");
790        assert!(!memory_id.is_empty(), "memory_id must be non-empty");
791    }
792
793    #[test]
794    fn handler_idempotent_on_same_session_turn() {
795        // Per RFC-0001 §"Idempotency contract": two calls with the
796        // same (host_session_id, host_turn_index) produce exactly
797        // one memory. The second call returns dedup_hit:true and
798        // the existing memory_id.
799        let conn = fresh_conn();
800        let first = call_handler(
801            &conn,
802            &json!({
803                "host_session_id": "session-idem",
804                "host_turn_index": 0,
805                "role": "user",
806                "content": "operator directive"
807            }),
808        )
809        .expect("first call ok");
810        assert_eq!(first["dedup_hit"].as_bool(), Some(false));
811        let first_id = first["memory_id"].as_str().unwrap().to_string();
812
813        let second = call_handler(
814            &conn,
815            &json!({
816                "host_session_id": "session-idem",
817                "host_turn_index": 0,
818                "role": "user",
819                "content": "operator directive"
820            }),
821        )
822        .expect("second call ok");
823        assert_eq!(
824            second["dedup_hit"].as_bool(),
825            Some(true),
826            "second call must dedup-hit"
827        );
828        assert_eq!(
829            second["memory_id"].as_str().unwrap(),
830            first_id,
831            "second call returns the first call's memory_id"
832        );
833    }
834
835    #[test]
836    fn handler_distinct_session_turn_creates_separate_memories() {
837        let conn = fresh_conn();
838        let a = call_handler(
839            &conn,
840            &json!({
841                "host_session_id": "session-a",
842                "host_turn_index": 0,
843                "role": "user",
844                "content": "a"
845            }),
846        )
847        .expect("a ok");
848        let b = call_handler(
849            &conn,
850            &json!({
851                "host_session_id": "session-b",
852                "host_turn_index": 0,
853                "role": "user",
854                "content": "b"
855            }),
856        )
857        .expect("b ok");
858        let c = call_handler(
859            &conn,
860            &json!({
861                "host_session_id": "session-a",
862                "host_turn_index": 1,
863                "role": "user",
864                "content": "c"
865            }),
866        )
867        .expect("c ok");
868
869        assert_eq!(a["dedup_hit"].as_bool(), Some(false));
870        assert_eq!(b["dedup_hit"].as_bool(), Some(false));
871        assert_eq!(c["dedup_hit"].as_bool(), Some(false));
872
873        let a_id = a["memory_id"].as_str().unwrap();
874        let b_id = b["memory_id"].as_str().unwrap();
875        let c_id = c["memory_id"].as_str().unwrap();
876        assert_ne!(a_id, b_id, "distinct sessions produce distinct memories");
877        assert_ne!(a_id, c_id, "distinct turns produce distinct memories");
878        assert_ne!(b_id, c_id);
879    }
880
881    // ------------------------------------------------------------------
882    // #1414 host-signature verification coverage (2026-06-11). Drives the
883    // `verify_host_signature` + `is_host_pubkey_enrolled` signed-path
884    // arms via the pure `prepare_capture_turn` so no governance gate /
885    // connection is needed.
886    // ------------------------------------------------------------------
887
888    /// Serialize the allowlist-env mutations across these tests.
889    fn allowlist_lock() -> std::sync::MutexGuard<'static, ()> {
890        static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
891        LOCK.get_or_init(|| std::sync::Mutex::new(()))
892            .lock()
893            .unwrap_or_else(|e| e.into_inner())
894    }
895
896    struct AllowlistGuard {
897        prev: Option<String>,
898    }
899    impl AllowlistGuard {
900        fn set(value: Option<&str>) -> Self {
901            let prev = std::env::var(L4_HOST_PUBKEY_ALLOWLIST_ENV).ok();
902            unsafe {
903                match value {
904                    Some(v) => std::env::set_var(L4_HOST_PUBKEY_ALLOWLIST_ENV, v),
905                    None => std::env::remove_var(L4_HOST_PUBKEY_ALLOWLIST_ENV),
906                }
907            }
908            Self { prev }
909        }
910    }
911    impl Drop for AllowlistGuard {
912        fn drop(&mut self) {
913            unsafe {
914                match &self.prev {
915                    Some(v) => std::env::set_var(L4_HOST_PUBKEY_ALLOWLIST_ENV, v),
916                    None => std::env::remove_var(L4_HOST_PUBKEY_ALLOWLIST_ENV),
917                }
918            }
919        }
920    }
921
922    /// Deserialize a request from a JSON object — the same path the
923    /// production handler uses (`serde_json::from_value`). Lets the
924    /// signed-path tests build requests without a `Default` impl.
925    fn req_from(v: Value) -> MemoryCaptureTurnRequest {
926        serde_json::from_value(v).expect("valid request shape")
927    }
928
929    /// Build a deterministic signing key + the canonical-bytes signature
930    /// for a `(session, turn, role, content)` tuple, mirroring the
931    /// substrate's canonical encoding, and return the request JSON +
932    /// the b64 pubkey for allowlist enrollment.
933    fn signed_req_json(
934        session: &str,
935        turn: i64,
936        role: &str,
937        content: &str,
938        key: &ed25519_dalek::SigningKey,
939    ) -> (Value, String) {
940        use ed25519_dalek::Signer;
941        let canonical = format!("{session}\0{turn}\0{role}\0{content}");
942        let sig = key.sign(canonical.as_bytes());
943        let pubkey_b64 = B64_STD.encode(key.verifying_key().to_bytes());
944        let sig_b64 = B64_STD.encode(sig.to_bytes());
945        let v = json!({
946            "host_session_id": session,
947            "host_turn_index": turn,
948            "role": role,
949            "content": content,
950            "host_signature_b64": sig_b64,
951            "host_pubkey_b64": pubkey_b64,
952        });
953        (v, pubkey_b64)
954    }
955
956    fn test_key() -> ed25519_dalek::SigningKey {
957        ed25519_dalek::SigningKey::from_bytes(&[7u8; 32])
958    }
959
960    #[test]
961    fn signed_path_enrolled_pubkey_verifies_signed_by_peer() {
962        let _g = allowlist_lock();
963        let key = test_key();
964        let (v, pubkey_b64) = signed_req_json("sess-sig", 0, "user", "signed content", &key);
965        let _env = AllowlistGuard::set(Some(&pubkey_b64));
966        let write = prepare_capture_turn(&req_from(v), "ai:caller").expect("verify ok");
967        assert_eq!(
968            write.signed_event.attest_level,
969            crate::models::AttestLevel::SignedByPeer.as_str()
970        );
971        assert!(write.signed_event.signature.is_some());
972    }
973
974    #[test]
975    fn signed_path_unenrolled_pubkey_refused() {
976        let _g = allowlist_lock();
977        let key = test_key();
978        let (v, _pubkey) = signed_req_json("sess-sig", 0, "user", "signed content", &key);
979        // Allowlist empty → not enrolled.
980        let _env = AllowlistGuard::set(Some(""));
981        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("must refuse");
982        assert!(err.starts_with("HOST_PUBKEY_NOT_ENROLLED"), "got: {err}");
983    }
984
985    #[test]
986    fn signed_path_unset_allowlist_refuses_every_signed_call() {
987        let _g = allowlist_lock();
988        let key = test_key();
989        let (v, _pubkey) = signed_req_json("sess-sig", 0, "user", "c", &key);
990        let _env = AllowlistGuard::set(None);
991        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("must refuse");
992        assert!(err.starts_with("HOST_PUBKEY_NOT_ENROLLED"), "got: {err}");
993    }
994
995    #[test]
996    fn signed_path_tampered_signature_fails_verification() {
997        let _g = allowlist_lock();
998        let key = test_key();
999        let (mut v, pubkey_b64) = signed_req_json("sess-sig", 0, "user", "original", &key);
1000        // Enroll the pubkey, but corrupt the content so the signature no
1001        // longer matches the canonical bytes → verify_strict fails.
1002        v["content"] = json!("tampered");
1003        let _env = AllowlistGuard::set(Some(&pubkey_b64));
1004        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("verify must fail");
1005        assert!(err.contains("signature_verification_failed"), "got: {err}");
1006    }
1007
1008    #[test]
1009    fn signed_path_paired_fields_mismatch_rejected() {
1010        // Exactly one of (sig, pubkey) present → paired-fields error.
1011        let v = json!({
1012            "host_session_id": "s",
1013            "host_turn_index": 0,
1014            "role": "user",
1015            "content": "c",
1016            "host_signature_b64": "AA",
1017        });
1018        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("paired-fields");
1019        assert!(
1020            err.contains("must both be present or both absent"),
1021            "got: {err}"
1022        );
1023    }
1024
1025    #[test]
1026    fn signed_path_bad_base64_pubkey_rejected() {
1027        let _g = allowlist_lock();
1028        let v = json!({
1029            "host_session_id": "s",
1030            "host_turn_index": 0,
1031            "role": "user",
1032            "content": "c",
1033            "host_signature_b64": "AA",
1034            "host_pubkey_b64": "!!!not-base64!!!",
1035        });
1036        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("bad b64");
1037        assert!(
1038            err.contains("host_pubkey_b64 not valid base64"),
1039            "got: {err}"
1040        );
1041    }
1042
1043    #[test]
1044    fn signed_path_wrong_length_pubkey_rejected() {
1045        let _g = allowlist_lock();
1046        // Valid base64 but decodes to != 32 bytes.
1047        let short = B64_STD.encode([1u8; 16]);
1048        let v = json!({
1049            "host_session_id": "s",
1050            "host_turn_index": 0,
1051            "role": "user",
1052            "content": "c",
1053            "host_signature_b64": "AA",
1054            "host_pubkey_b64": short,
1055        });
1056        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("wrong len");
1057        assert!(err.contains("must decode to 32 bytes"), "got: {err}");
1058    }
1059
1060    #[test]
1061    fn agent_id_mismatch_rejected_in_prepare() {
1062        // #1413 — metadata.agent_id must equal the resolved caller.
1063        let v = json!({
1064            "host_session_id": "s",
1065            "host_turn_index": 0,
1066            "role": "user",
1067            "content": "c",
1068            "metadata": {"agent_id": "ai:someone-else"},
1069        });
1070        let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("agent mismatch");
1071        assert!(err.contains("does not match resolved caller"), "got: {err}");
1072    }
1073}