use crate::models::field_names;
use std::time::Instant;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as B64_STD;
use schemars::JsonSchema;
use serde::Deserialize;
use serde_json::{Value, json};
use sha2::{Digest, Sha256};
use crate::mcp::param_names;
use crate::mcp::registry::McpTool;
use crate::models::{Memory, MemoryKind, Tier};
use crate::signed_events::{self, SignedEvent};
pub(crate) const L4_HOST_PUBKEY_ALLOWLIST_ENV: &str = "AI_MEMORY_L4_HOST_PUBKEY_ALLOWLIST";
const ACTION_CAPTURE_TURN: &str = "capture_turn";
#[derive(Debug, Clone, Deserialize, JsonSchema)]
pub struct MemoryCaptureTurnRequest {
pub host_session_id: String,
pub host_turn_index: i64,
pub role: String,
pub content: String,
#[serde(default)]
pub host_kind: Option<String>,
#[serde(default)]
pub host_version: Option<String>,
#[serde(default)]
#[allow(dead_code)]
pub tool_calls: Vec<ToolCallSummary>,
#[serde(default)]
pub timestamp_iso: Option<String>,
#[serde(default)]
pub host_signature_b64: Option<String>,
#[serde(default)]
pub host_pubkey_b64: Option<String>,
#[serde(default)]
pub namespace: Option<String>,
#[serde(default)]
pub metadata: Option<Value>,
}
#[derive(Debug, Clone, Deserialize, JsonSchema)]
#[allow(dead_code)] pub struct ToolCallSummary {
pub tool: String,
pub brief: String,
}
pub struct MemoryCaptureTurnTool;
impl McpTool for MemoryCaptureTurnTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_CAPTURE_TURN
}
fn description() -> &'static str {
"L4 host-volunteered turn capture per RFC-0001 (mcp-turn-capture). \
Idempotent by (host_session_id, host_turn_index)."
}
fn docs() -> &'static str {
"v0.7.0 #1389 L4: host volunteers each conversation turn directly \
via the MCP protocol. Substrate stores it idempotently and writes \
a signed_events row tagged layer=L4. Replaces transcript-file \
scraping with a clean protocol-level contract. Full design at \
docs/rfc/RFC-0001-mcp-turn-capture.md. Closes the #1388 substrate \
failure mode at the protocol layer."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<MemoryCaptureTurnRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Lifecycle.name()
}
}
pub fn handle_capture_turn(
conn: &rusqlite::Connection,
params: &Value,
caller_agent_id: Option<&str>,
) -> Result<Value, String> {
let start = Instant::now();
let req: MemoryCaptureTurnRequest =
serde_json::from_value(params.clone()).map_err(|e| format!("INVALID_INPUT: {e}"))?;
let caller = caller_agent_id.unwrap_or("anonymous:mcp-unknown");
let write = prepare_capture_turn(&req, caller)?;
let attest_level = write.signed_event.attest_level.clone();
{
let gate_namespace = write.memory.namespace.clone();
let gate_payload = json!({
"id": write.memory.id,
"title": write.memory.title,
"namespace": gate_namespace,
});
use crate::permissions::{Op, PermissionContext, Permissions};
let ctx = PermissionContext {
op: Op::MemoryStore,
namespace: gate_namespace.clone(),
agent_id: caller.to_string(),
payload: gate_payload.clone(),
};
match Permissions::evaluate(&ctx, &[]) {
crate::permissions::Decision::Allow | crate::permissions::Decision::Modify(_) => {}
crate::permissions::Decision::Deny(reason) => {
return Err(crate::governance::deny_message(
ACTION_CAPTURE_TURN,
crate::governance::DenyGate::PermissionRule,
&reason,
));
}
crate::permissions::Decision::Ask(prompt) => {
return Ok(json!({
"status": "ask",
"reason": prompt,
"action": ACTION_CAPTURE_TURN,
"namespace": gate_namespace,
}));
}
}
use crate::models::{GovernanceDecision, GovernedAction};
match crate::db::enforce_governance(
conn,
GovernedAction::Store,
&gate_namespace,
caller,
Some(&write.memory.id),
Some(caller),
&gate_payload,
)
.map_err(|e| e.to_string())?
{
GovernanceDecision::Allow => {}
GovernanceDecision::Deny(refusal) => {
return Err(crate::governance::deny_message(
ACTION_CAPTURE_TURN,
crate::governance::DenyGate::Governance,
&refusal.reason,
));
}
GovernanceDecision::Pending(pending_id) => {
return Ok(json!({
"status": "pending",
(field_names::PENDING_ID): pending_id,
"reason": crate::errors::msg::GOVERNANCE_REQUIRES_APPROVAL,
"action": ACTION_CAPTURE_TURN,
"namespace": gate_namespace,
}));
}
}
}
let result = crate::storage::capture_turn_idempotent(conn, &write)?;
let elapsed_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
if result.dedup_hit {
Ok(json!({
"memory_id": result.memory_id,
"dedup_hit": true,
"layer": "L4",
(field_names::ELAPSED_MS): elapsed_ms,
}))
} else {
Ok(json!({
"memory_id": result.memory_id,
"dedup_hit": false,
"layer": "L4",
(field_names::ATTEST_LEVEL): attest_level,
(field_names::ELAPSED_MS): elapsed_ms,
}))
}
}
pub(crate) fn prepare_capture_turn(
req: &MemoryCaptureTurnRequest,
caller: &str,
) -> Result<crate::models::CaptureTurnWrite, String> {
if let Some(meta_agent) = req
.metadata
.as_ref()
.and_then(|v| v.get(param_names::AGENT_ID))
.and_then(|v| v.as_str())
&& meta_agent != caller
{
return Err(format!(
"INVALID_INPUT: metadata.agent_id ({meta_agent:?}) does not match resolved caller ({caller:?})"
));
}
let canonical = format!(
"{}\0{}\0{}\0{}",
&req.host_session_id, req.host_turn_index, &req.role, &req.content
);
let sha_vec = {
let mut hasher = Sha256::new();
hasher.update(canonical.as_bytes());
hasher.finalize().to_vec()
};
let (sig_bytes_opt, attest_level): (Option<Vec<u8>>, String) =
verify_host_signature(req, canonical.as_bytes())?;
let host_kind = req.host_kind.as_deref().unwrap_or("unknown").to_string();
let now_iso = chrono::Utc::now().to_rfc3339();
let created_at = req.timestamp_iso.clone().unwrap_or_else(|| now_iso.clone());
let mut tags = vec![
"captured-via-l4".to_string(),
format!("host:{host_kind}"),
format!("role:{}", req.role),
format!("attest:{attest_level}"),
];
if let Some(hv) = req.host_version.as_deref() {
tags.push(format!("host-version:{hv}"));
}
let title = format!(
"L4 capture {} {} turn {} ({})",
host_kind, req.host_session_id, req.host_turn_index, req.role
);
let metadata = {
let mut m = req.metadata.clone().unwrap_or_else(|| json!({}));
if let Some(obj) = m.as_object_mut() {
obj.entry("agent_id".to_string())
.or_insert_with(|| Value::String(caller.to_string()));
}
m
};
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: req
.namespace
.clone()
.unwrap_or_else(|| crate::DEFAULT_NAMESPACE.to_string()),
title,
content: req.content.clone(),
tags,
priority: 5,
confidence: 1.0,
source: "host".to_string(),
metadata,
created_at,
updated_at: now_iso.clone(),
last_accessed_at: Some(now_iso.clone()),
memory_kind: MemoryKind::Observation,
..Memory::default()
};
let signed_event = SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id: caller.to_string(),
event_type: signed_events::event_types::MEMORY_CAPTURE_TURN.to_string(),
payload_hash: sha_vec.clone(),
signature: sig_bytes_opt,
attest_level,
timestamp: now_iso,
..SignedEvent::default()
};
Ok(crate::models::CaptureTurnWrite {
memory: mem,
sha256: sha_vec,
host_kind,
host_session_id: req.host_session_id.clone(),
host_turn_index: req.host_turn_index,
recovered_at_ms: chrono::Utc::now().timestamp_millis(),
signed_event,
})
}
fn verify_host_signature(
req: &MemoryCaptureTurnRequest,
canonical_bytes: &[u8],
) -> Result<(Option<Vec<u8>>, String), String> {
match (req.host_signature_b64.as_deref(), req.host_pubkey_b64.as_deref()) {
(None, None) => Ok((None, crate::models::AttestLevel::SelfSigned.as_str().to_string())),
(Some(_), None) | (None, Some(_)) => Err(
"INVALID_INPUT: host_signature_b64 and host_pubkey_b64 must both be present or both absent"
.to_string(),
),
(Some(sig_b64), Some(pubkey_b64)) => {
let pubkey_bytes = B64_STD
.decode(pubkey_b64)
.map_err(|e| format!("INVALID_INPUT: host_pubkey_b64 not valid base64: {e}"))?;
let pubkey_arr: [u8; 32] = pubkey_bytes.try_into().map_err(|_| {
"INVALID_INPUT: host_pubkey_b64 must decode to 32 bytes (Ed25519)".to_string()
})?;
if !is_host_pubkey_enrolled(&pubkey_arr) {
return Err(format!("HOST_PUBKEY_NOT_ENROLLED: {pubkey_b64}"));
}
let verifying_key = ed25519_dalek::VerifyingKey::from_bytes(&pubkey_arr).map_err(
|e| format!("INVALID_INPUT: host_pubkey_b64 not a valid Ed25519 key: {e}"),
)?;
let sig_bytes = B64_STD
.decode(sig_b64)
.map_err(|e| format!("INVALID_INPUT: host_signature_b64 not valid base64: {e}"))?;
let sig_arr: [u8; 64] = sig_bytes.clone().try_into().map_err(|_| {
"INVALID_INPUT: host_signature_b64 must decode to 64 bytes (Ed25519)".to_string()
})?;
let signature = ed25519_dalek::Signature::from_bytes(&sig_arr);
verifying_key
.verify_strict(canonical_bytes, &signature)
.map_err(|e| {
format!("INVALID_INPUT: signature_verification_failed: {e}")
})?;
Ok((Some(sig_bytes), crate::models::AttestLevel::SignedByPeer.as_str().to_string()))
}
}
}
fn is_host_pubkey_enrolled(pubkey: &[u8; 32]) -> bool {
let Ok(raw) = std::env::var(L4_HOST_PUBKEY_ALLOWLIST_ENV) else {
return false;
};
for entry in raw.split(',').map(str::trim).filter(|s| !s.is_empty()) {
if let Ok(bytes) = B64_STD.decode(entry)
&& bytes.len() == 32
&& bytes.as_slice() == pubkey
{
return true;
}
}
false
}
#[cfg(test)]
mod d1_6_1389_tests {
use super::*;
#[test]
fn capture_turn_tool_metadata() {
assert_eq!(MemoryCaptureTurnTool::name(), "memory_capture_turn");
assert_eq!(MemoryCaptureTurnTool::family(), "lifecycle");
assert!(!MemoryCaptureTurnTool::description().is_empty());
assert!(!MemoryCaptureTurnTool::docs().is_empty());
}
#[test]
fn input_schema_is_valid_json() {
let schema = MemoryCaptureTurnTool::input_schema();
let obj = schema.as_object().expect("schema is an object");
assert!(
obj.contains_key("properties"),
"schema must advertise properties"
);
let required = obj
.get("required")
.and_then(Value::as_array)
.expect("required is an array");
let required_names: Vec<&str> = required.iter().filter_map(Value::as_str).collect();
for name in &["host_session_id", "host_turn_index", "role", "content"] {
assert!(
required_names.contains(name),
"required must include {name}"
);
}
}
}
#[cfg(test)]
mod handler_tests {
use super::*;
fn fresh_conn() -> rusqlite::Connection {
crate::storage::open(std::path::Path::new(":memory:")).expect("open in-memory db")
}
fn call_handler(conn: &rusqlite::Connection, params: &Value) -> Result<Value, String> {
handle_capture_turn(conn, params, None)
}
#[test]
fn handler_accepts_minimal_request() {
let conn = fresh_conn();
let resp = call_handler(
&conn,
&json!({
"host_session_id": "session-a",
"host_turn_index": 0,
"role": "user",
"content": "hello"
}),
)
.expect("ok");
assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
assert_eq!(resp["layer"].as_str(), Some("L4"));
assert!(resp["memory_id"].as_str().is_some());
}
#[test]
fn handler_rejects_missing_required_fields() {
let conn = fresh_conn();
let resp = call_handler(&conn, &json!({ "host_session_id": "x" }));
let err = resp.expect_err("missing required fields must error");
assert!(
err.starts_with("INVALID_INPUT"),
"error must use INVALID_INPUT prefix, got: {err}"
);
}
#[test]
fn handler_tolerates_unknown_fields_at_runtime() {
let conn = fresh_conn();
let resp = call_handler(
&conn,
&json!({
"host_session_id": "session-a",
"host_turn_index": 0,
"role": "user",
"content": "hello",
"an_unknown_extra_field": "tolerated and ignored"
}),
)
.expect(
"unknown extra fields are tolerated at runtime (post-#1052 wire-truthful contract)",
);
assert_eq!(resp["layer"].as_str(), Some("L4"));
assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
}
#[test]
fn handler_rejects_missing_required_field() {
let conn = fresh_conn();
let err = call_handler(
&conn,
&json!({
"host_session_id": "session-a",
"host_turn_index": 0,
"role": "user"
}),
)
.expect_err("missing required `content` must error");
assert!(err.starts_with("INVALID_INPUT"), "got: {err}");
}
#[test]
fn handler_accepts_full_request_with_tool_calls() {
let conn = fresh_conn();
let resp = call_handler(
&conn,
&json!({
"host_session_id": "session-a",
"host_turn_index": 5,
"role": "assistant",
"content": "running command",
"host_kind": "claude-code",
"host_version": "1.0.0",
"tool_calls": [
{"tool": "Bash", "brief": "list files"}
],
"timestamp_iso": "2026-05-28T12:00:00Z",
"namespace": "test"
}),
)
.expect("ok");
assert_eq!(resp["dedup_hit"].as_bool(), Some(false));
assert_eq!(resp["layer"].as_str(), Some("L4"));
let memory_id = resp["memory_id"].as_str().expect("memory_id is a string");
assert!(!memory_id.is_empty(), "memory_id must be non-empty");
}
#[test]
fn handler_idempotent_on_same_session_turn() {
let conn = fresh_conn();
let first = call_handler(
&conn,
&json!({
"host_session_id": "session-idem",
"host_turn_index": 0,
"role": "user",
"content": "operator directive"
}),
)
.expect("first call ok");
assert_eq!(first["dedup_hit"].as_bool(), Some(false));
let first_id = first["memory_id"].as_str().unwrap().to_string();
let second = call_handler(
&conn,
&json!({
"host_session_id": "session-idem",
"host_turn_index": 0,
"role": "user",
"content": "operator directive"
}),
)
.expect("second call ok");
assert_eq!(
second["dedup_hit"].as_bool(),
Some(true),
"second call must dedup-hit"
);
assert_eq!(
second["memory_id"].as_str().unwrap(),
first_id,
"second call returns the first call's memory_id"
);
}
#[test]
fn handler_distinct_session_turn_creates_separate_memories() {
let conn = fresh_conn();
let a = call_handler(
&conn,
&json!({
"host_session_id": "session-a",
"host_turn_index": 0,
"role": "user",
"content": "a"
}),
)
.expect("a ok");
let b = call_handler(
&conn,
&json!({
"host_session_id": "session-b",
"host_turn_index": 0,
"role": "user",
"content": "b"
}),
)
.expect("b ok");
let c = call_handler(
&conn,
&json!({
"host_session_id": "session-a",
"host_turn_index": 1,
"role": "user",
"content": "c"
}),
)
.expect("c ok");
assert_eq!(a["dedup_hit"].as_bool(), Some(false));
assert_eq!(b["dedup_hit"].as_bool(), Some(false));
assert_eq!(c["dedup_hit"].as_bool(), Some(false));
let a_id = a["memory_id"].as_str().unwrap();
let b_id = b["memory_id"].as_str().unwrap();
let c_id = c["memory_id"].as_str().unwrap();
assert_ne!(a_id, b_id, "distinct sessions produce distinct memories");
assert_ne!(a_id, c_id, "distinct turns produce distinct memories");
assert_ne!(b_id, c_id);
}
fn allowlist_lock() -> std::sync::MutexGuard<'static, ()> {
static LOCK: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
LOCK.get_or_init(|| std::sync::Mutex::new(()))
.lock()
.unwrap_or_else(|e| e.into_inner())
}
struct AllowlistGuard {
prev: Option<String>,
}
impl AllowlistGuard {
fn set(value: Option<&str>) -> Self {
let prev = std::env::var(L4_HOST_PUBKEY_ALLOWLIST_ENV).ok();
unsafe {
match value {
Some(v) => std::env::set_var(L4_HOST_PUBKEY_ALLOWLIST_ENV, v),
None => std::env::remove_var(L4_HOST_PUBKEY_ALLOWLIST_ENV),
}
}
Self { prev }
}
}
impl Drop for AllowlistGuard {
fn drop(&mut self) {
unsafe {
match &self.prev {
Some(v) => std::env::set_var(L4_HOST_PUBKEY_ALLOWLIST_ENV, v),
None => std::env::remove_var(L4_HOST_PUBKEY_ALLOWLIST_ENV),
}
}
}
}
fn req_from(v: Value) -> MemoryCaptureTurnRequest {
serde_json::from_value(v).expect("valid request shape")
}
fn signed_req_json(
session: &str,
turn: i64,
role: &str,
content: &str,
key: &ed25519_dalek::SigningKey,
) -> (Value, String) {
use ed25519_dalek::Signer;
let canonical = format!("{session}\0{turn}\0{role}\0{content}");
let sig = key.sign(canonical.as_bytes());
let pubkey_b64 = B64_STD.encode(key.verifying_key().to_bytes());
let sig_b64 = B64_STD.encode(sig.to_bytes());
let v = json!({
"host_session_id": session,
"host_turn_index": turn,
"role": role,
"content": content,
"host_signature_b64": sig_b64,
"host_pubkey_b64": pubkey_b64,
});
(v, pubkey_b64)
}
fn test_key() -> ed25519_dalek::SigningKey {
ed25519_dalek::SigningKey::from_bytes(&[7u8; 32])
}
#[test]
fn signed_path_enrolled_pubkey_verifies_signed_by_peer() {
let _g = allowlist_lock();
let key = test_key();
let (v, pubkey_b64) = signed_req_json("sess-sig", 0, "user", "signed content", &key);
let _env = AllowlistGuard::set(Some(&pubkey_b64));
let write = prepare_capture_turn(&req_from(v), "ai:caller").expect("verify ok");
assert_eq!(
write.signed_event.attest_level,
crate::models::AttestLevel::SignedByPeer.as_str()
);
assert!(write.signed_event.signature.is_some());
}
#[test]
fn signed_path_unenrolled_pubkey_refused() {
let _g = allowlist_lock();
let key = test_key();
let (v, _pubkey) = signed_req_json("sess-sig", 0, "user", "signed content", &key);
let _env = AllowlistGuard::set(Some(""));
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("must refuse");
assert!(err.starts_with("HOST_PUBKEY_NOT_ENROLLED"), "got: {err}");
}
#[test]
fn signed_path_unset_allowlist_refuses_every_signed_call() {
let _g = allowlist_lock();
let key = test_key();
let (v, _pubkey) = signed_req_json("sess-sig", 0, "user", "c", &key);
let _env = AllowlistGuard::set(None);
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("must refuse");
assert!(err.starts_with("HOST_PUBKEY_NOT_ENROLLED"), "got: {err}");
}
#[test]
fn signed_path_tampered_signature_fails_verification() {
let _g = allowlist_lock();
let key = test_key();
let (mut v, pubkey_b64) = signed_req_json("sess-sig", 0, "user", "original", &key);
v["content"] = json!("tampered");
let _env = AllowlistGuard::set(Some(&pubkey_b64));
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("verify must fail");
assert!(err.contains("signature_verification_failed"), "got: {err}");
}
#[test]
fn signed_path_paired_fields_mismatch_rejected() {
let v = json!({
"host_session_id": "s",
"host_turn_index": 0,
"role": "user",
"content": "c",
"host_signature_b64": "AA",
});
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("paired-fields");
assert!(
err.contains("must both be present or both absent"),
"got: {err}"
);
}
#[test]
fn signed_path_bad_base64_pubkey_rejected() {
let _g = allowlist_lock();
let v = json!({
"host_session_id": "s",
"host_turn_index": 0,
"role": "user",
"content": "c",
"host_signature_b64": "AA",
"host_pubkey_b64": "!!!not-base64!!!",
});
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("bad b64");
assert!(
err.contains("host_pubkey_b64 not valid base64"),
"got: {err}"
);
}
#[test]
fn signed_path_wrong_length_pubkey_rejected() {
let _g = allowlist_lock();
let short = B64_STD.encode([1u8; 16]);
let v = json!({
"host_session_id": "s",
"host_turn_index": 0,
"role": "user",
"content": "c",
"host_signature_b64": "AA",
"host_pubkey_b64": short,
});
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("wrong len");
assert!(err.contains("must decode to 32 bytes"), "got: {err}");
}
#[test]
fn agent_id_mismatch_rejected_in_prepare() {
let v = json!({
"host_session_id": "s",
"host_turn_index": 0,
"role": "user",
"content": "c",
"metadata": {"agent_id": "ai:someone-else"},
});
let err = prepare_capture_turn(&req_from(v), "ai:caller").expect_err("agent mismatch");
assert!(err.contains("does not match resolved caller"), "got: {err}");
}
}