use crate::identity::keypair::AgentKeypair;
use crate::models::ConfidenceSource;
use crate::models::field_names;
use anyhow::Context;
use chrono::Utc;
use rusqlite::Connection;
use crate::models::{GovernancePolicy, Memory, MemoryKind, Tier};
use super::{
ConflictMode, create_link_signed, get, insert_with_conflict, resolve_governance_policy,
};
#[derive(Debug)]
pub enum ReflectError {
Validation(String),
SourceNotFound(String),
DepthExceeded {
attempted: u32,
cap: u32,
namespace: String,
},
HookVeto { reason: String, code: i32 },
Database(String),
}
impl std::fmt::Display for ReflectError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Validation(m) | Self::SourceNotFound(m) | Self::Database(m) => f.write_str(m),
Self::DepthExceeded {
attempted,
cap,
namespace,
} => write!(
f,
"reflection depth {attempted} would exceed namespace \
max_reflection_depth {cap} (namespace='{namespace}')"
),
Self::HookVeto { reason, code } => {
write!(
f,
"pre_reflect hook vetoed reflection (code={code}): {reason}"
)
}
}
}
}
impl std::error::Error for ReflectError {}
#[derive(Debug, Clone)]
pub struct ReflectOutcome {
pub id: String,
pub reflection_depth: i32,
pub reflects_on: Vec<String>,
pub namespace: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReflectHookDecision {
Allow,
Deny { reason: String, code: i32 },
}
pub struct ReflectHooks<'a> {
pub pre_reflect: Option<Box<dyn Fn(&ReflectInput) -> ReflectHookDecision + Send + Sync + 'a>>,
pub post_reflect: Option<Box<dyn Fn(&ReflectOutcome) + Send + Sync + 'a>>,
pub active_keypair: Option<&'a AgentKeypair>,
}
impl<'a> ReflectHooks<'a> {
#[must_use]
pub fn empty() -> Self {
Self {
pre_reflect: None,
post_reflect: None,
active_keypair: None,
}
}
}
impl<'a> Default for ReflectHooks<'a> {
fn default() -> Self {
Self::empty()
}
}
impl<'a> std::fmt::Debug for ReflectHooks<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReflectHooks")
.field("pre_reflect", &self.pre_reflect.as_ref().map(|_| "<fn>"))
.field("post_reflect", &self.post_reflect.as_ref().map(|_| "<fn>"))
.field(
"active_keypair",
&self.active_keypair.map(|k| k.agent_id.as_str()),
)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct ReflectInput {
pub source_ids: Vec<String>,
pub title: String,
pub content: String,
pub namespace: Option<String>,
pub tier: Tier,
pub tags: Vec<String>,
pub priority: i32,
pub confidence: f64,
pub source: String,
pub agent_id: String,
pub metadata: serde_json::Value,
}
pub fn reflect(
conn: &Connection,
input: &ReflectInput,
) -> std::result::Result<ReflectOutcome, ReflectError> {
reflect_with_hooks(conn, input, &ReflectHooks::empty())
}
#[allow(clippy::too_many_lines)]
pub fn reflect_with_hooks(
conn: &Connection,
input: &ReflectInput,
hooks: &ReflectHooks<'_>,
) -> std::result::Result<ReflectOutcome, ReflectError> {
use crate::validate;
validate::validate_title(&input.title).map_err(|e| ReflectError::Validation(e.to_string()))?;
validate::validate_content(&input.content)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
validate::validate_tags(&input.tags).map_err(|e| ReflectError::Validation(e.to_string()))?;
validate::validate_priority(input.priority)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
validate::validate_confidence(input.confidence)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
validate::validate_source(&input.source)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
validate::validate_agent_id(&input.agent_id)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
if input.source_ids.is_empty() {
return Err(ReflectError::Validation(
"source_ids cannot be empty — a reflection must reflect on at least one source memory"
.into(),
));
}
let mut seen = std::collections::HashSet::new();
for (i, id) in input.source_ids.iter().enumerate() {
validate::validate_id(id)
.map_err(|e| ReflectError::Validation(format!("source_ids[{i}]: {e}")))?;
if !seen.insert(id.as_str()) {
return Err(ReflectError::Validation(format!(
"source_ids[{i}]: duplicate id '{id}'"
)));
}
}
if let Some(ref ns) = input.namespace {
validate::validate_namespace(ns).map_err(|e| ReflectError::Validation(e.to_string()))?;
}
validate::validate_metadata(&input.metadata)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
let mut sources = Vec::with_capacity(input.source_ids.len());
for id in &input.source_ids {
match get(conn, id).map_err(|e| ReflectError::Database(e.to_string()))? {
Some(m) => sources.push(m),
None => return Err(ReflectError::SourceNotFound(id.clone())),
}
}
let max_src_depth = sources
.iter()
.map(|m| m.reflection_depth)
.max()
.unwrap_or(0);
let new_depth_i32 = max_src_depth.max(0).saturating_add(1);
#[allow(clippy::cast_sign_loss)]
let new_depth_u32: u32 = new_depth_i32 as u32;
let target_namespace = match input.namespace {
Some(ref ns) => ns.clone(),
None => sources[0].namespace.clone(),
};
let policy = resolve_governance_policy(conn, &target_namespace)
.unwrap_or_else(GovernancePolicy::default);
let cap = policy.effective_max_reflection_depth();
if let Some(pre) = hooks.pre_reflect.as_ref() {
match (pre)(input) {
ReflectHookDecision::Allow => {}
ReflectHookDecision::Deny { reason, code } => {
return Err(ReflectError::HookVeto { reason, code });
}
}
}
if new_depth_u32 > cap {
let cross_peer_refusal =
crate::federation::reflection_bookkeeping::enforce_local_cap_on_derived(
new_depth_u32,
cap,
&sources,
);
let peer_origin: Option<String> = if let Err(ref r) = cross_peer_refusal {
if let Some(ref peer) = r.imported_peer {
tracing::warn!(
target: "federation::reflection_bookkeeping",
peer = %peer,
attempted = new_depth_u32,
local_cap = cap,
namespace = %target_namespace,
"L2-2: refusing derived reflection: {}",
r,
);
}
r.imported_peer.clone()
} else {
None
};
emit_reflection_depth_exceeded_audit(
conn,
&input.agent_id,
new_depth_u32,
cap,
&target_namespace,
&input.source_ids,
&input.title,
peer_origin.as_deref(),
);
return Err(ReflectError::DepthExceeded {
attempted: new_depth_u32,
cap,
namespace: target_namespace,
});
}
let now = Utc::now().to_rfc3339();
let mut metadata = match input.metadata.clone() {
serde_json::Value::Object(map) => map,
_ => serde_json::Map::new(),
};
metadata.insert(
"agent_id".to_string(),
serde_json::Value::String(input.agent_id.clone()),
);
if !metadata.contains_key(field_names::REFLECTION_METADATA) {
let reflection_meta = serde_json::json!({
"reflected_on_source_ids": input.source_ids,
(field_names::REFLECTION_DEPTH): new_depth_i32,
"reflection_created_at": now,
});
metadata.insert(
field_names::REFLECTION_METADATA.to_string(),
reflection_meta,
);
}
let metadata_value = serde_json::Value::Object(metadata);
validate::validate_metadata(&metadata_value)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
let new_mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: input.tier.clone(),
namespace: target_namespace.clone(),
title: input.title.clone(),
content: input.content.clone(),
tags: input.tags.clone(),
priority: input.priority.clamp(1, 10),
confidence: input.confidence.clamp(0.0, 1.0),
source: input.source.clone(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: metadata_value,
reflection_depth: new_depth_i32,
memory_kind: MemoryKind::Reflection,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
conn.execute_batch(super::connection::SQL_BEGIN_IMMEDIATE)
.map_err(|e| ReflectError::Database(e.to_string()))?;
let txn_result = (|| -> std::result::Result<String, ReflectError> {
let actual_id = insert_with_conflict(conn, &new_mem, ConflictMode::Error).map_err(|e| {
if e.downcast_ref::<crate::storage::ConflictError>().is_some() {
ReflectError::Validation(format!(
"reflection title collides with an existing memory in the same namespace: {e}"
))
} else {
ReflectError::Database(e.to_string())
}
})?;
for src_id in &input.source_ids {
validate::validate_link(
&actual_id,
src_id,
crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
)
.map_err(|e| ReflectError::Validation(e.to_string()))?;
create_link_signed(
conn,
&actual_id,
src_id,
crate::models::MemoryLinkRelation::ReflectsOn.as_str(),
hooks.active_keypair,
)
.map_err(|e| ReflectError::Database(e.to_string()))?;
}
Ok(actual_id)
})();
match txn_result {
Ok(actual_id) => {
conn.execute_batch(super::connection::SQL_COMMIT)
.map_err(|e| ReflectError::Database(e.to_string()))?;
let outcome = ReflectOutcome {
id: actual_id,
reflection_depth: new_depth_i32,
reflects_on: input.source_ids.clone(),
namespace: target_namespace,
};
if let Some(post) = hooks.post_reflect.as_ref() {
(post)(&outcome);
}
Ok(outcome)
}
Err(e) => {
if let Err(rb) = conn.execute_batch(super::connection::SQL_ROLLBACK) {
tracing::error!("ROLLBACK failed in reflect: {}", rb);
}
Err(e)
}
}
}
pub fn canonical_cbor_reflection_depth_exceeded(
agent_id: &str,
attempted: u32,
cap: u32,
namespace: &str,
source_ids: &[String],
proposed_title: &str,
created_at: &str,
peer_origin: Option<&str>,
) -> anyhow::Result<Vec<u8>> {
use std::collections::BTreeMap;
let mut map: BTreeMap<&str, ciborium::Value> = BTreeMap::new();
map.insert("agent_id", ciborium::Value::Text(agent_id.to_string()));
map.insert("attempted", ciborium::Value::Integer(attempted.into()));
map.insert("cap", ciborium::Value::Integer(cap.into()));
map.insert(
field_names::CREATED_AT,
ciborium::Value::Text(created_at.to_string()),
);
map.insert("namespace", ciborium::Value::Text(namespace.to_string()));
if let Some(peer) = peer_origin {
map.insert(
field_names::PEER_ORIGIN,
ciborium::Value::Text(peer.to_string()),
);
}
map.insert(
"proposed_title",
ciborium::Value::Text(proposed_title.to_string()),
);
map.insert(
field_names::SOURCE_IDS,
ciborium::Value::Array(
source_ids
.iter()
.map(|s| ciborium::Value::Text(s.clone()))
.collect(),
),
);
let entries: Vec<(ciborium::Value, ciborium::Value)> = map
.into_iter()
.map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
.collect();
let value = ciborium::Value::Map(entries);
let mut out: Vec<u8> = Vec::with_capacity(256);
ciborium::ser::into_writer(&value, &mut out)
.context("CBOR encode reflection_depth_exceeded audit payload")?;
Ok(out)
}
pub(crate) fn emit_reflection_depth_exceeded_audit(
conn: &Connection,
agent_id: &str,
attempted: u32,
cap: u32,
namespace: &str,
source_ids: &[String],
proposed_title: &str,
peer_origin: Option<&str>,
) {
let created_at = Utc::now().to_rfc3339();
let cbor = match canonical_cbor_reflection_depth_exceeded(
agent_id,
attempted,
cap,
namespace,
source_ids,
proposed_title,
&created_at,
peer_origin,
) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
agent_id, attempted, cap, namespace,
"failed to encode canonical CBOR for reflection_depth_exceeded audit: {e}"
);
return;
}
};
let event_type = if peer_origin.is_some() {
"reflection.depth_exceeded.cross_peer"
} else {
"reflection.depth_exceeded"
};
let event = crate::signed_events::SignedEvent {
id: uuid::Uuid::new_v4().to_string(),
agent_id: agent_id.to_string(),
event_type: event_type.to_string(),
payload_hash: crate::signed_events::payload_hash(&cbor),
signature: None,
attest_level: crate::models::AttestLevel::Unsigned.as_str().to_string(),
timestamp: created_at,
..crate::signed_events::SignedEvent::default()
};
if let Err(e) = crate::signed_events::append_signed_event(conn, &event) {
tracing::warn!(
target: crate::signed_events::SIGNED_EVENTS_TRACE_TARGET,
agent_id, attempted, cap, namespace,
"failed to append reflection_depth_exceeded audit row: {e}"
);
}
}
#[cfg(test)]
mod l2_2_audit_tests {
use super::canonical_cbor_reflection_depth_exceeded;
#[test]
fn peer_origin_some_vs_none_yields_distinct_bytes() {
let base = (
"ai:test",
3_u32,
2_u32,
"ns/l2-2",
vec!["src-1".to_string()],
"title",
"2026-05-13T00:00:00+00:00",
);
let local = canonical_cbor_reflection_depth_exceeded(
base.0, base.1, base.2, base.3, &base.4, base.5, base.6, None,
)
.expect("encode None");
let cross = canonical_cbor_reflection_depth_exceeded(
base.0,
base.1,
base.2,
base.3,
&base.4,
base.5,
base.6,
Some("ai:peer-x"),
)
.expect("encode Some");
assert_ne!(local, cross, "peer_origin claim must be byte-load-bearing");
let cross_y = canonical_cbor_reflection_depth_exceeded(
base.0,
base.1,
base.2,
base.3,
&base.4,
base.5,
base.6,
Some("ai:peer-y"),
)
.expect("encode Some(other)");
assert_ne!(
cross, cross_y,
"swapping the peer_origin string must change the bytes"
);
}
#[test]
fn cross_peer_encoding_is_deterministic() {
let a = canonical_cbor_reflection_depth_exceeded(
"ai:a",
7,
3,
"ns",
&["s1".to_string(), "s2".to_string()],
"t",
"2026-05-13T00:00:00+00:00",
Some("peer-A"),
)
.expect("encode 1");
let b = canonical_cbor_reflection_depth_exceeded(
"ai:a",
7,
3,
"ns",
&["s1".to_string(), "s2".to_string()],
"t",
"2026-05-13T00:00:00+00:00",
Some("peer-A"),
)
.expect("encode 2");
assert_eq!(a, b, "cross-peer encoding must be byte-stable");
}
#[test]
fn key_ordering_is_lexicographic_via_btreemap() {
let no_peer = canonical_cbor_reflection_depth_exceeded(
"ai:test",
4,
3,
"ns",
&["s1".to_string()],
"title",
"2026-05-13T00:00:00+00:00",
None,
)
.expect("encode none");
let no_peer2 = canonical_cbor_reflection_depth_exceeded(
"ai:test",
4,
3,
"ns",
&["s1".to_string()],
"title",
"2026-05-13T00:00:00+00:00",
None,
)
.expect("encode none again");
assert_eq!(no_peer, no_peer2);
}
}