#![allow(dead_code)]
#![allow(clippy::doc_lazy_continuation)]
pub mod sqlite;
#[cfg(feature = "sal-postgres")]
pub mod postgres;
use bitflags::bitflags;
use serde::{Deserialize, Serialize};
use crate::models::{AgentRegistration, Memory, MemoryLink, Tier};
pub use crate::models::{CaptureTurnResult, CaptureTurnWrite};
use crate::quotas::QuotaStatus;
const DEFAULT_MAX_CONNECTIONS: u32 = 16;
const DEFAULT_MIN_CONNECTIONS: u32 = 2;
const DEFAULT_ACQUIRE_TIMEOUT_SECS: u64 = 30;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct PoolConfig {
pub max_connections: u32,
pub min_connections: u32,
pub acquire_timeout_secs: u64,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
max_connections: DEFAULT_MAX_CONNECTIONS,
min_connections: DEFAULT_MIN_CONNECTIONS,
acquire_timeout_secs: DEFAULT_ACQUIRE_TIMEOUT_SECS,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum KgBackend {
Cte,
Age,
}
impl KgBackend {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Cte => "cte",
Self::Age => "age",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct KgQueryRow {
pub target_id: String,
pub relation: String,
pub depth: usize,
pub path: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct KgTimelineRow {
pub target_id: String,
pub relation: String,
pub valid_from: String,
pub valid_until: Option<String>,
pub observed_by: Option<String>,
pub title: String,
pub target_namespace: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct KgInvalidateRow {
pub found: bool,
pub valid_until: String,
pub previous_valid_until: Option<String>,
}
impl std::fmt::Display for KgBackend {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[non_exhaustive]
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("memory not found: {id}")]
NotFound { id: String },
#[error("identifier conflict on insert: {id}")]
Conflict { id: String },
#[error("caller lacks permission for {action} on {target}: {reason}")]
PermissionDenied {
action: String,
target: String,
reason: String,
},
#[error("backend unavailable: {backend}: {detail}")]
BackendUnavailable { backend: String, detail: String },
#[error("invalid input: {detail}")]
InvalidInput { detail: String },
#[error("{detail}")]
LinkRefused { detail: String },
#[error("requested capability not supported by this backend: {capability}")]
UnsupportedCapability { capability: String },
#[error("integrity check failed: {detail}")]
IntegrityFailed { detail: String },
#[error("underlying backend error: {0}")]
Backend(#[from] BoxBackendError),
}
impl StoreError {
#[must_use]
pub fn code(&self) -> &'static str {
use crate::errors::error_codes;
match self {
Self::NotFound { .. } => error_codes::NOT_FOUND,
Self::Conflict { .. } => error_codes::CONFLICT,
Self::PermissionDenied { .. } => error_codes::GOVERNANCE_REFUSED,
Self::BackendUnavailable { .. } => error_codes::STORE_BACKEND_UNAVAILABLE,
Self::InvalidInput { .. } => error_codes::VALIDATION_FAILED,
Self::LinkRefused { .. } => error_codes::CONFLICT,
Self::UnsupportedCapability { .. } => error_codes::STORE_UNSUPPORTED_CAPABILITY,
Self::IntegrityFailed { .. } => error_codes::STORE_OPERATION_FAILED,
Self::Backend(_) => error_codes::DATABASE_ERROR,
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("{0}")]
pub struct BoxBackendError(String);
impl BoxBackendError {
#[must_use]
pub fn new(msg: impl Into<String>) -> Self {
Self(msg.into())
}
}
pub type StoreResult<T> = Result<T, StoreError>;
#[must_use]
pub fn integrity_findings(mem: &Memory) -> Vec<String> {
let mut findings: Vec<String> = Vec::new();
if mem.title.trim().is_empty() {
findings.push("title is empty".to_string());
}
if mem.content.trim().is_empty() {
findings.push("content is empty".to_string());
}
if mem.metadata.get("agent_id").is_none() {
findings.push("metadata.agent_id missing".to_string());
}
if chrono::DateTime::parse_from_rfc3339(&mem.created_at).is_err() {
findings.push(format!("created_at is not RFC3339: '{}'", mem.created_at));
}
findings
}
#[derive(Debug, Clone)]
pub struct CallerContext {
pub agent_id: String,
pub as_agent: Option<String>,
pub request_id: Option<String>,
pub bypass_visibility: bool,
}
impl CallerContext {
#[must_use]
pub fn for_agent(agent_id: impl Into<String>) -> Self {
Self {
agent_id: agent_id.into(),
as_agent: None,
request_id: None,
bypass_visibility: false,
}
}
#[must_use]
pub fn for_admin(agent_id: impl Into<String>) -> Self {
Self {
agent_id: agent_id.into(),
as_agent: None,
request_id: None,
bypass_visibility: true,
}
}
#[must_use]
pub fn for_admin_checked(agent_id: impl Into<String>, is_admin: bool) -> Self {
if is_admin {
Self::for_admin(agent_id)
} else {
Self::for_agent(agent_id)
}
}
#[must_use]
pub fn effective_principal(&self) -> &str {
self.as_agent.as_deref().unwrap_or(&self.agent_id)
}
}
pub use crate::visibility::is_visible_to_caller;
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Capabilities: u32 {
const TRANSACTIONS = 0b0000_0001;
const NATIVE_VECTOR = 0b0000_0010;
const FULLTEXT = 0b0000_0100;
const DURABLE = 0b0000_1000;
const STRONG_CONSISTENCY = 0b0001_0000;
const TTL_NATIVE = 0b0010_0000;
const ATOMIC_MULTI_WRITE = 0b0100_0000;
}
}
#[async_trait::async_trait]
pub trait Transaction: Send {
async fn commit(self: Box<Self>) -> StoreResult<()>;
async fn rollback(self: Box<Self>) -> StoreResult<()>;
}
#[derive(Debug, Default, Clone)]
pub struct Filter {
pub namespace: Option<String>,
pub tier: Option<Tier>,
pub tags_any: Vec<String>,
pub agent_id: Option<String>,
pub since: Option<chrono::DateTime<chrono::Utc>>,
pub until: Option<chrono::DateTime<chrono::Utc>>,
pub limit: usize,
}
#[async_trait::async_trait]
pub trait MemoryStore: Send + Sync {
fn capabilities(&self) -> Capabilities;
async fn schema_version(&self) -> StoreResult<i64> {
Ok(0)
}
async fn store(&self, ctx: &CallerContext, memory: &Memory) -> StoreResult<String>;
async fn store_with_embedding(
&self,
ctx: &CallerContext,
memory: &Memory,
_embedding: Option<&[f32]>,
) -> StoreResult<String> {
self.store(ctx, memory).await
}
async fn store_batch(
&self,
ctx: &CallerContext,
memories: &[Memory],
) -> StoreResult<Vec<String>> {
let mut ids = Vec::with_capacity(memories.len());
for memory in memories {
ids.push(self.store(ctx, memory).await?);
}
Ok(ids)
}
async fn update_embedding(
&self,
_ctx: &CallerContext,
_id: &str,
_embedding: Option<&[f32]>,
) -> StoreResult<()> {
Ok(())
}
async fn list_unembedded(
&self,
_ctx: &CallerContext,
_limit: usize,
) -> StoreResult<Vec<(String, String, String)>> {
Ok(Vec::new())
}
async fn set_embeddings_batch(
&self,
ctx: &CallerContext,
entries: &[(String, Vec<f32>)],
) -> StoreResult<usize> {
let mut written = 0usize;
for (id, vec) in entries {
self.update_embedding(ctx, id, Some(vec)).await?;
written += 1;
}
Ok(written)
}
async fn execute_pending_action(
&self,
_ctx: &CallerContext,
_pending_id: &str,
) -> StoreResult<Option<String>> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_EXECUTE_PENDING".to_string(),
})
}
async fn capture_turn_idempotent(
&self,
_ctx: &CallerContext,
_write: &CaptureTurnWrite,
) -> StoreResult<CaptureTurnResult> {
Err(StoreError::UnsupportedCapability {
capability: "L4_CAPTURE_TURN".to_string(),
})
}
async fn get(&self, ctx: &CallerContext, id: &str) -> StoreResult<Memory>;
async fn update(&self, ctx: &CallerContext, id: &str, patch: UpdatePatch) -> StoreResult<()>;
async fn delete(&self, ctx: &CallerContext, id: &str) -> StoreResult<()>;
async fn list(&self, ctx: &CallerContext, filter: &Filter) -> StoreResult<Vec<Memory>>;
async fn list_by_namespace_prefix(
&self,
_ctx: &CallerContext,
_prefix: &str,
_limit: usize,
) -> StoreResult<Vec<Memory>> {
Err(StoreError::UnsupportedCapability {
capability: "list_by_namespace_prefix (per-adapter implementation required; #1625)"
.to_string(),
})
}
async fn search(
&self,
ctx: &CallerContext,
query: &str,
filter: &Filter,
) -> StoreResult<Vec<Memory>>;
async fn verify(&self, ctx: &CallerContext, id: &str) -> StoreResult<VerifyReport>;
async fn begin_transaction(&self, _ctx: &CallerContext) -> StoreResult<Box<dyn Transaction>> {
Err(StoreError::UnsupportedCapability {
capability: "TRANSACTIONS".to_string(),
})
}
async fn link(&self, ctx: &CallerContext, link: &MemoryLink) -> StoreResult<()>;
async fn link_signed(
&self,
ctx: &CallerContext,
link: &MemoryLink,
keypair: Option<&crate::identity::keypair::AgentKeypair>,
) -> StoreResult<&'static str> {
let _ = keypair;
self.link(ctx, link).await?;
Ok(crate::models::AttestLevel::Unsigned.as_str())
}
async fn list_links(&self, namespace: Option<&str>) -> StoreResult<Vec<MemoryLink>>;
async fn get_links_for_anchor(&self, _anchor_id: &str) -> StoreResult<Vec<MemoryLink>> {
Err(StoreError::UnsupportedCapability {
capability: "GET_LINKS_FOR_ANCHOR".to_string(),
})
}
async fn register_agent(
&self,
ctx: &CallerContext,
agent: &AgentRegistration,
) -> StoreResult<()>;
async fn bind_agent_pubkey(
&self,
_ctx: &CallerContext,
_agent_id: &str,
_pubkey_b64: &str,
) -> StoreResult<()> {
Err(StoreError::UnsupportedCapability {
capability: "BIND_AGENT_PUBKEY".to_string(),
})
}
async fn agent_pubkey(&self, _agent_id: &str) -> StoreResult<Option<String>> {
Ok(None)
}
async fn revoke_agent_pubkey(&self, _ctx: &CallerContext, _agent_id: &str) -> StoreResult<()> {
Err(StoreError::UnsupportedCapability {
capability: "REVOKE_AGENT_PUBKEY".to_string(),
})
}
fn as_any(&self) -> &dyn std::any::Any {
&()
}
#[deprecated(
since = "0.7.0",
note = "use `MemoryStore::as_any` directly; will be removed in v0.8.0"
)]
fn as_any_for_postgres(&self) -> &dyn std::any::Any {
self.as_any()
}
async fn list_memories_updated_since(
&self,
_since: Option<&str>,
_limit: usize,
) -> StoreResult<Vec<Memory>> {
Err(StoreError::UnsupportedCapability {
capability: "FEDERATION_LIST_SINCE".to_string(),
})
}
async fn apply_remote_memory(
&self,
_ctx: &CallerContext,
_memory: &Memory,
) -> StoreResult<String> {
Err(StoreError::UnsupportedCapability {
capability: "FEDERATION_APPLY_REMOTE".to_string(),
})
}
async fn apply_remote_link(
&self,
ctx: &CallerContext,
link: &MemoryLink,
attest_level: &str,
) -> StoreResult<()> {
let _ = attest_level;
self.link(ctx, link).await
}
async fn apply_remote_deletion(&self, ctx: &CallerContext, id: &str) -> StoreResult<bool> {
match self.delete(ctx, id).await {
Ok(()) => Ok(true),
Err(StoreError::NotFound { .. }) => Ok(false),
Err(e) => Err(e),
}
}
async fn recall_hybrid(
&self,
ctx: &CallerContext,
query: &str,
_query_embedding: Option<&[f32]>,
filter: &Filter,
) -> StoreResult<Vec<(Memory, f64)>> {
let mems = self.search(ctx, query, filter).await?;
let scored = mems
.into_iter()
.enumerate()
.map(|(i, m)| {
#[allow(clippy::cast_precision_loss)]
let synthetic = 1.0 - (i as f64) * 0.01;
(m, synthetic)
})
.collect();
Ok(scored)
}
async fn touch_after_recall(&self, _ids: &[String]) -> StoreResult<()> {
Ok(())
}
async fn pending_decide(
&self,
_ctx: &CallerContext,
_id: &str,
_approve: bool,
_decided_by: &str,
) -> StoreResult<bool> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_PENDING_DECIDE".to_string(),
})
}
async fn get_pending(
&self,
_ctx: &CallerContext,
_id: &str,
) -> StoreResult<Option<crate::models::PendingAction>> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_GET_PENDING".to_string(),
})
}
async fn set_namespace_standard(
&self,
_ctx: &CallerContext,
_namespace: &str,
_standard_id: &str,
_parent: Option<&str>,
) -> StoreResult<()> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_SET_STANDARD".to_string(),
})
}
async fn clear_namespace_standard(
&self,
_ctx: &CallerContext,
_namespace: &str,
) -> StoreResult<bool> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_CLEAR_STANDARD".to_string(),
})
}
async fn get_namespace_standard(
&self,
_ctx: &CallerContext,
_namespace: &str,
) -> StoreResult<Option<(String, Option<String>)>> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_GET_STANDARD".to_string(),
})
}
async fn forget(
&self,
_ctx: &CallerContext,
_namespace: Option<&str>,
_pattern: Option<&str>,
_tier: Option<&Tier>,
_archive: bool,
) -> StoreResult<usize> {
Err(StoreError::UnsupportedCapability {
capability: "FORGET".to_string(),
})
}
async fn consolidate(
&self,
_ctx: &CallerContext,
_ids: &[String],
_title: &str,
_summary: &str,
_namespace: &str,
_tier: &Tier,
_source: &str,
_consolidator_agent_id: &str,
) -> StoreResult<String> {
Err(StoreError::UnsupportedCapability {
capability: "CONSOLIDATE".to_string(),
})
}
async fn reflect(
&self,
_ctx: &CallerContext,
_input: &crate::storage::reflect::ReflectInput,
_signing_key: Option<&crate::identity::keypair::AgentKeypair>,
) -> Result<crate::storage::reflect::ReflectOutcome, crate::storage::reflect::ReflectError>
{
Err(crate::storage::reflect::ReflectError::Database(
"reflect is not supported on this storage backend".to_string(),
))
}
async fn get_reflection_origin(
&self,
_id: &str,
) -> StoreResult<Option<crate::federation::reflection_bookkeeping::ReflectionOrigin>> {
Err(StoreError::UnsupportedCapability {
capability: "REFLECTION_ORIGIN".to_string(),
})
}
async fn list_recall_observations(
&self,
_recall_id: Option<&str>,
_consumed: Option<bool>,
_since: Option<&str>,
_until: Option<&str>,
_limit: usize,
) -> StoreResult<Vec<crate::observations::Observation>> {
Err(StoreError::UnsupportedCapability {
capability: "RECALL_OBSERVATIONS".to_string(),
})
}
async fn run_gc(&self, _archive: bool) -> StoreResult<usize> {
Err(StoreError::UnsupportedCapability {
capability: "GC".to_string(),
})
}
async fn archive_restore(&self, _ctx: &CallerContext, _id: &str) -> StoreResult<bool> {
Err(StoreError::UnsupportedCapability {
capability: "ARCHIVE_RESTORE".to_string(),
})
}
async fn archive_purge(
&self,
_ctx: &CallerContext,
_older_than_days: Option<i64>,
) -> StoreResult<usize> {
Err(StoreError::UnsupportedCapability {
capability: "ARCHIVE_PURGE".to_string(),
})
}
async fn archive_by_ids(
&self,
_ctx: &CallerContext,
_ids: &[String],
_reason: Option<&str>,
) -> StoreResult<usize> {
Err(StoreError::UnsupportedCapability {
capability: "ARCHIVE_BY_IDS".to_string(),
})
}
async fn export_memories(&self) -> StoreResult<Vec<Memory>> {
Err(StoreError::UnsupportedCapability {
capability: "EXPORT".to_string(),
})
}
async fn export_links(&self) -> StoreResult<Vec<MemoryLink>> {
Err(StoreError::UnsupportedCapability {
capability: "EXPORT_LINKS".to_string(),
})
}
async fn notify(
&self,
_ctx: &CallerContext,
_target_agent: &str,
_title: &str,
_payload: &str,
_priority: Option<i32>,
_tier: Option<&Tier>,
) -> StoreResult<String> {
Err(StoreError::UnsupportedCapability {
capability: "NOTIFY".to_string(),
})
}
async fn build_namespace_chain(&self, namespace: &str) -> StoreResult<Vec<String>> {
Ok(vec![namespace.to_string()])
}
async fn resolve_governance_policy(
&self,
_namespace: &str,
) -> StoreResult<Option<crate::models::GovernancePolicy>> {
Ok(None)
}
async fn governance_approve_with_consensus(
&self,
_ctx: &CallerContext,
_pending_id: &str,
_approver_agent_id: &str,
) -> StoreResult<ApproveOutcome> {
Err(StoreError::UnsupportedCapability {
capability: "GOVERNANCE_CONSENSUS".to_string(),
})
}
async fn is_registered_agent(&self, _agent_id: &str) -> StoreResult<bool> {
Ok(false)
}
async fn enforce_governance_action(
&self,
_action: GovernedAction,
_namespace: &str,
_agent_id: &str,
_memory_id: Option<&str>,
_memory_owner: Option<&str>,
_payload: &serde_json::Value,
) -> StoreResult<crate::models::GovernanceDecision> {
Ok(crate::models::GovernanceDecision::Allow)
}
async fn quota_status(&self, _agent_id: &str) -> StoreResult<QuotaStatus> {
Err(StoreError::UnsupportedCapability {
capability: "QUOTA_STATUS".to_string(),
})
}
async fn quota_status_ns(&self, _agent_id: &str, _namespace: &str) -> StoreResult<QuotaStatus> {
Err(StoreError::UnsupportedCapability {
capability: "QUOTA_STATUS_NS".to_string(),
})
}
async fn quota_status_list(&self) -> StoreResult<Vec<QuotaStatus>> {
Err(StoreError::UnsupportedCapability {
capability: "QUOTA_STATUS_LIST".to_string(),
})
}
async fn quota_status_list_ns(&self, _namespace: &str) -> StoreResult<Vec<QuotaStatus>> {
Err(StoreError::UnsupportedCapability {
capability: "QUOTA_STATUS_LIST_NS".to_string(),
})
}
async fn verify_link(&self, _filter: VerifyFilter) -> StoreResult<VerifyLinkReport> {
Err(StoreError::UnsupportedCapability {
capability: "VERIFY_LINK".to_string(),
})
}
async fn find_paths(
&self,
_ctx: &CallerContext,
_source_id: &str,
_target_id: &str,
_max_depth: Option<usize>,
_max_results: Option<usize>,
) -> StoreResult<Vec<Vec<String>>> {
Err(StoreError::UnsupportedCapability {
capability: "FIND_PATHS".to_string(),
})
}
async fn list_namespaces(&self) -> StoreResult<Vec<crate::models::NamespaceCount>> {
Err(StoreError::UnsupportedCapability {
capability: "LIST_NAMESPACES".to_string(),
})
}
async fn get_taxonomy(
&self,
_namespace_prefix: Option<&str>,
_max_depth: usize,
_limit: usize,
) -> StoreResult<crate::models::Taxonomy> {
Err(StoreError::UnsupportedCapability {
capability: "GET_TAXONOMY".to_string(),
})
}
async fn list_agents(&self) -> StoreResult<Vec<AgentRegistration>> {
Err(StoreError::UnsupportedCapability {
capability: "LIST_AGENTS".to_string(),
})
}
async fn list_pending_actions(
&self,
_status: Option<&str>,
_limit: usize,
) -> StoreResult<Vec<crate::models::PendingAction>> {
Err(StoreError::UnsupportedCapability {
capability: "LIST_PENDING_ACTIONS".to_string(),
})
}
async fn entity_get_by_alias(
&self,
_alias: &str,
_namespace: Option<&str>,
) -> StoreResult<Option<crate::models::EntityRecord>> {
Err(StoreError::UnsupportedCapability {
capability: "ENTITY_GET_BY_ALIAS".to_string(),
})
}
async fn health_check(&self) -> StoreResult<bool> {
Err(StoreError::UnsupportedCapability {
capability: "HEALTH_CHECK".to_string(),
})
}
async fn stats(&self) -> StoreResult<crate::models::Stats> {
Err(StoreError::UnsupportedCapability {
capability: "STATS".to_string(),
})
}
async fn find_by_title_namespace(
&self,
_title: &str,
_namespace: &str,
) -> StoreResult<Option<String>> {
Err(StoreError::UnsupportedCapability {
capability: "FIND_BY_TITLE_NAMESPACE".to_string(),
})
}
async fn next_versioned_title(
&self,
_base_title: &str,
_namespace: &str,
) -> StoreResult<String> {
Err(StoreError::UnsupportedCapability {
capability: "NEXT_VERSIONED_TITLE".to_string(),
})
}
async fn find_contradictions(
&self,
_title: &str,
_namespace: &str,
) -> StoreResult<Vec<Memory>> {
Err(StoreError::UnsupportedCapability {
capability: "FIND_CONTRADICTIONS".to_string(),
})
}
async fn invalidate_link(
&self,
_source_id: &str,
_target_id: &str,
_relation: &str,
_valid_until: Option<&str>,
) -> StoreResult<KgInvalidateRow> {
Err(StoreError::UnsupportedCapability {
capability: "INVALIDATE_LINK".to_string(),
})
}
async fn check_duplicate_with_text(
&self,
_query_embedding: &[f32],
_query_text: &str,
_namespace: Option<&str>,
_threshold: f32,
) -> StoreResult<crate::models::DuplicateCheck> {
Err(StoreError::UnsupportedCapability {
capability: "CHECK_DUPLICATE_WITH_TEXT".to_string(),
})
}
async fn approve_with_approver_type(
&self,
ctx: &CallerContext,
pending_id: &str,
approver_agent_id: &str,
) -> StoreResult<ApproveOutcome> {
self.governance_approve_with_consensus(ctx, pending_id, approver_agent_id)
.await
}
async fn decide_pending_action(
&self,
ctx: &CallerContext,
id: &str,
approve: bool,
decided_by: &str,
) -> StoreResult<bool> {
self.pending_decide(ctx, id, approve, decided_by).await
}
async fn kg_query(
&self,
_source_id: &str,
_max_depth: usize,
_include_invalidated: bool,
) -> StoreResult<Vec<KgQueryRow>> {
Err(StoreError::UnsupportedCapability {
capability: "KG_QUERY".to_string(),
})
}
async fn kg_timeline(
&self,
_source_id: &str,
_since: Option<&str>,
_until: Option<&str>,
_limit: Option<usize>,
) -> StoreResult<Vec<KgTimelineRow>> {
Err(StoreError::UnsupportedCapability {
capability: "KG_TIMELINE".to_string(),
})
}
async fn entity_register(
&self,
_ctx: &CallerContext,
_canonical_name: &str,
_namespace: &str,
_aliases: &[String],
_extra_metadata: &serde_json::Value,
_agent_id: Option<&str>,
) -> StoreResult<crate::models::EntityRegistration> {
Err(StoreError::UnsupportedCapability {
capability: "ENTITY_REGISTER".to_string(),
})
}
async fn list_archived(
&self,
_namespace: Option<&str>,
_limit: usize,
_offset: usize,
) -> StoreResult<Vec<serde_json::Value>> {
Err(StoreError::UnsupportedCapability {
capability: "LIST_ARCHIVED".to_string(),
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GovernedAction {
Store,
Delete,
Promote,
Reflect,
}
impl GovernedAction {
#[must_use]
pub fn as_str(self) -> &'static str {
match self {
Self::Store => "store",
Self::Delete => "delete",
Self::Promote => "promote",
Self::Reflect => "reflect",
}
}
}
impl From<crate::models::GovernedAction> for GovernedAction {
fn from(value: crate::models::GovernedAction) -> Self {
match value {
crate::models::GovernedAction::Store => Self::Store,
crate::models::GovernedAction::Delete => Self::Delete,
crate::models::GovernedAction::Promote => Self::Promote,
crate::models::GovernedAction::Reflect => Self::Reflect,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ApproveOutcome {
Approved,
Pending { votes: usize, quorum: u32 },
Rejected(String),
}
#[derive(Debug, Default, Clone)]
pub struct UpdatePatch {
pub title: Option<String>,
pub content: Option<String>,
pub tier: Option<Tier>,
pub namespace: Option<String>,
pub tags: Option<Vec<String>>,
pub priority: Option<i32>,
pub confidence: Option<f64>,
pub metadata: Option<serde_json::Value>,
pub source_uri: Option<String>,
pub expires_at: Option<String>,
}
#[derive(Debug, Clone)]
pub struct VerifyReport {
pub memory_id: String,
pub integrity_ok: bool,
pub findings: Vec<String>,
pub signature_verified: bool,
}
#[derive(Debug, Clone, Default)]
pub struct VerifyFilter {
pub source_id: Option<String>,
pub target_id: Option<String>,
pub link_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VerifyLinkReport {
pub source_id: String,
pub target_id: String,
pub relation: String,
pub verified: bool,
pub attest_level: String,
pub signature_present: bool,
pub observed_by: Option<String>,
pub findings: Vec<String>,
}
pub async fn run_embedding_backfill_on_store(
store: &dyn MemoryStore,
ctx: &CallerContext,
emb: &dyn crate::embeddings::Embed,
batch_size: usize,
) -> usize {
let batch_size = if batch_size == 0 {
crate::mcp::DEFAULT_EMBED_BACKFILL_BATCH_SIZE
} else {
batch_size
};
let mut total = 0usize;
loop {
let chunk = match store.list_unembedded(ctx, batch_size).await {
Ok(rows) => rows,
Err(e) => {
tracing::warn!("embedding backfill: unembedded scan failed: {e} (sweep stopped)");
break;
}
};
if chunk.is_empty() {
break;
}
let texts: Vec<String> = chunk
.iter()
.map(|(_, t, c)| crate::embeddings::embedding_document(t, c))
.collect();
let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
let embeddings = match emb.embed_batch(&text_refs) {
Ok(v) => v,
Err(e) => {
tracing::warn!(
"embedding backfill: embed_batch failed for chunk of {} rows: {e} \
(sweep stopped; remaining rows retry on next boot)",
chunk.len()
);
break;
}
};
if embeddings.len() != chunk.len() {
tracing::warn!(
"embedding backfill: embed_batch returned {} vectors for {} inputs (sweep stopped)",
embeddings.len(),
chunk.len()
);
break;
}
let entries: Vec<(String, Vec<f32>)> = chunk
.iter()
.zip(embeddings)
.map(|((id, _, _), v)| (id.clone(), v))
.collect();
let written = match store.set_embeddings_batch(ctx, &entries).await {
Ok(n) => n,
Err(e) => {
tracing::warn!(
"embedding backfill: set_embeddings_batch failed for chunk of {} rows: {e} \
(sweep stopped; remaining rows retry on next boot)",
entries.len()
);
break;
}
};
total += written;
tracing::info!(
"embedding backfill: wrote {written}/{} embeddings this pass ({total} total)",
entries.len()
);
if written == 0 {
tracing::warn!(
"embedding backfill: zero-progress pass on {} candidate row(s); sweep stopped",
chunk.len()
);
break;
}
}
total
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn caller_context_builder_defaults() {
let ctx = CallerContext::for_agent("alice");
assert_eq!(ctx.agent_id, "alice");
assert!(ctx.as_agent.is_none());
assert!(ctx.request_id.is_none());
}
#[test]
fn pool_config_default_equals_named_constants() {
let d = PoolConfig::default();
assert_eq!(d.max_connections, DEFAULT_MAX_CONNECTIONS);
assert_eq!(d.min_connections, DEFAULT_MIN_CONNECTIONS);
assert_eq!(d.acquire_timeout_secs, DEFAULT_ACQUIRE_TIMEOUT_SECS);
assert_eq!(d.max_connections, 16);
assert_eq!(d.min_connections, 2);
assert_eq!(d.acquire_timeout_secs, 30);
}
#[test]
fn capabilities_bitflags_compose() {
let caps = Capabilities::TRANSACTIONS | Capabilities::DURABLE;
assert!(caps.contains(Capabilities::TRANSACTIONS));
assert!(caps.contains(Capabilities::DURABLE));
assert!(!caps.contains(Capabilities::NATIVE_VECTOR));
}
#[test]
fn store_error_display_is_human_readable() {
let err = StoreError::NotFound {
id: "abc".to_string(),
};
assert_eq!(err.to_string(), "memory not found: abc");
let err = StoreError::PermissionDenied {
action: "read".to_string(),
target: "memory/abc".to_string(),
reason: "row-level ACL".to_string(),
};
assert!(err.to_string().contains("read"));
assert!(err.to_string().contains("row-level ACL"));
}
#[test]
fn default_begin_transaction_errors() {
let err = StoreError::UnsupportedCapability {
capability: "TRANSACTIONS".to_string(),
};
assert!(err.to_string().contains("TRANSACTIONS"));
}
#[test]
fn filter_defaults_are_empty() {
let f = Filter::default();
assert!(f.namespace.is_none());
assert!(f.tier.is_none());
assert!(f.tags_any.is_empty());
}
#[test]
fn kg_backend_serializes_snake_case() {
let cte = serde_json::to_string(&KgBackend::Cte).unwrap();
let age = serde_json::to_string(&KgBackend::Age).unwrap();
assert_eq!(cte, "\"cte\"");
assert_eq!(age, "\"age\"");
let cte_round: KgBackend = serde_json::from_str("\"cte\"").unwrap();
let age_round: KgBackend = serde_json::from_str("\"age\"").unwrap();
assert_eq!(cte_round, KgBackend::Cte);
assert_eq!(age_round, KgBackend::Age);
}
#[test]
fn kg_backend_as_str_matches_display() {
assert_eq!(KgBackend::Cte.as_str(), "cte");
assert_eq!(KgBackend::Age.as_str(), "age");
assert_eq!(format!("{}", KgBackend::Cte), "cte");
assert_eq!(format!("{}", KgBackend::Age), "age");
}
use crate::models::{AgentRegistration, Memory, MemoryLink, Tier};
use async_trait::async_trait;
struct MinimalStore;
fn dummy_memory(id: &str) -> Memory {
Memory {
id: id.to_string(),
tier: Tier::Mid,
namespace: "mock".to_string(),
title: "mock title".to_string(),
content: "mock content".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "mock".to_string(),
access_count: 0,
created_at: chrono::Utc::now().to_rfc3339(),
updated_at: chrono::Utc::now().to_rfc3339(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id": "alice"}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[async_trait]
impl MemoryStore for MinimalStore {
fn capabilities(&self) -> Capabilities {
Capabilities::DURABLE
}
async fn store(&self, _ctx: &CallerContext, mem: &Memory) -> StoreResult<String> {
Ok(mem.id.clone())
}
async fn get(&self, _ctx: &CallerContext, id: &str) -> StoreResult<Memory> {
if id == "exists" {
Ok(dummy_memory(id))
} else {
Err(StoreError::NotFound { id: id.to_string() })
}
}
async fn update(
&self,
_ctx: &CallerContext,
_id: &str,
_patch: UpdatePatch,
) -> StoreResult<()> {
Ok(())
}
async fn delete(&self, _ctx: &CallerContext, _id: &str) -> StoreResult<()> {
Ok(())
}
async fn list(&self, _ctx: &CallerContext, _filter: &Filter) -> StoreResult<Vec<Memory>> {
Ok(vec![dummy_memory("listed")])
}
async fn search(
&self,
_ctx: &CallerContext,
_query: &str,
_filter: &Filter,
) -> StoreResult<Vec<Memory>> {
Ok(vec![dummy_memory("searched")])
}
async fn verify(&self, _ctx: &CallerContext, id: &str) -> StoreResult<VerifyReport> {
Ok(VerifyReport {
memory_id: id.to_string(),
integrity_ok: true,
findings: vec![],
signature_verified: false,
})
}
async fn link(&self, _ctx: &CallerContext, _link: &MemoryLink) -> StoreResult<()> {
Ok(())
}
async fn list_links(&self, _ns: Option<&str>) -> StoreResult<Vec<MemoryLink>> {
Ok(vec![])
}
async fn register_agent(
&self,
_ctx: &CallerContext,
_agent: &AgentRegistration,
) -> StoreResult<()> {
Ok(())
}
}
#[tokio::test]
async fn default_schema_version_returns_zero() {
let s = MinimalStore;
assert_eq!(s.schema_version().await.expect("schema_version"), 0);
}
#[tokio::test]
async fn default_store_with_embedding_falls_through_to_store() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let mem = dummy_memory("with-emb");
let id = s
.store_with_embedding(&ctx, &mem, Some(&[0.1_f32, 0.2, 0.3]))
.await
.expect("store_with_embedding default");
assert_eq!(id, "with-emb");
}
#[tokio::test]
async fn default_update_embedding_is_noop() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
s.update_embedding(&ctx, "any", Some(&[0.5_f32]))
.await
.expect("noop");
}
#[tokio::test]
async fn default_execute_pending_action_unsupported() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let err = s.execute_pending_action(&ctx, "any").await.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_begin_transaction_returns_unsupported() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let result = s.begin_transaction(&ctx).await.map(|_| "got txn");
let err = match result {
Ok(_) => panic!("expected UnsupportedCapability"),
Err(e) => e,
};
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_link_signed_forwards_and_reports_unsigned() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let link = MemoryLink {
source_id: "a".to_string(),
target_id: "b".to_string(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
let level = s
.link_signed(&ctx, &link, None)
.await
.expect("default link_signed");
assert_eq!(level, "unsigned");
}
#[tokio::test]
async fn default_apply_remote_memory_unsupported() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let err = s
.apply_remote_memory(&ctx, &dummy_memory("rem"))
.await
.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_list_memories_updated_since_unsupported() {
let s = MinimalStore;
let err = s.list_memories_updated_since(None, 10).await.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_apply_remote_link_forwards_to_link() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let link = MemoryLink {
source_id: "a".to_string(),
target_id: "b".to_string(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
s.apply_remote_link(&ctx, &link, "unsigned")
.await
.expect("apply_remote_link default");
}
#[tokio::test]
async fn default_apply_remote_deletion_true_on_ok_false_on_notfound() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let gone = s
.apply_remote_deletion(&ctx, "any")
.await
.expect("delete ok");
assert!(gone, "Ok delete must surface as true");
struct NotFoundDeleter;
#[async_trait]
impl MemoryStore for NotFoundDeleter {
fn capabilities(&self) -> Capabilities {
Capabilities::DURABLE
}
async fn store(&self, _: &CallerContext, m: &Memory) -> StoreResult<String> {
Ok(m.id.clone())
}
async fn get(&self, _: &CallerContext, id: &str) -> StoreResult<Memory> {
Err(StoreError::NotFound { id: id.to_string() })
}
async fn update(&self, _: &CallerContext, _: &str, _: UpdatePatch) -> StoreResult<()> {
Ok(())
}
async fn delete(&self, _: &CallerContext, id: &str) -> StoreResult<()> {
Err(StoreError::NotFound { id: id.to_string() })
}
async fn list(&self, _: &CallerContext, _: &Filter) -> StoreResult<Vec<Memory>> {
Ok(vec![])
}
async fn search(
&self,
_: &CallerContext,
_: &str,
_: &Filter,
) -> StoreResult<Vec<Memory>> {
Ok(vec![])
}
async fn verify(&self, _: &CallerContext, id: &str) -> StoreResult<VerifyReport> {
Ok(VerifyReport {
memory_id: id.to_string(),
integrity_ok: true,
findings: vec![],
signature_verified: false,
})
}
async fn link(&self, _: &CallerContext, _: &MemoryLink) -> StoreResult<()> {
Ok(())
}
async fn list_links(&self, _: Option<&str>) -> StoreResult<Vec<MemoryLink>> {
Ok(vec![])
}
async fn register_agent(
&self,
_: &CallerContext,
_: &AgentRegistration,
) -> StoreResult<()> {
Ok(())
}
}
let n = NotFoundDeleter;
let still = n
.apply_remote_deletion(&ctx, "missing")
.await
.expect("notfound branch");
assert!(!still, "NotFound must surface as false");
}
#[tokio::test]
async fn default_recall_hybrid_falls_back_to_search() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let filter = Filter::default();
let scored = s
.recall_hybrid(&ctx, "q", None, &filter)
.await
.expect("default recall_hybrid");
assert_eq!(scored.len(), 1);
assert!(scored[0].1 > 0.0);
}
#[tokio::test]
async fn default_touch_after_recall_is_ok_for_any_ids() {
let s = MinimalStore;
s.touch_after_recall(&["a".to_string(), "b".to_string()])
.await
.expect("touch default ok");
}
#[tokio::test]
async fn default_governance_methods_unsupported_or_safe_default() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let err = s
.pending_decide(&ctx, "any", true, "alice")
.await
.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
let err = s.get_pending(&ctx, "any").await.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
let err = s
.set_namespace_standard(&ctx, "ns", "sid", None)
.await
.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
let err = s.clear_namespace_standard(&ctx, "ns").await.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
let err = s.get_namespace_standard(&ctx, "ns").await.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
let err = s
.governance_approve_with_consensus(&ctx, "pid", "alice")
.await
.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
let yes = s.is_registered_agent("alice").await.expect("default");
assert!(!yes);
let decision = s
.enforce_governance_action(
GovernedAction::Store,
"ns",
"alice",
None,
None,
&serde_json::json!({}),
)
.await
.expect("default Allow");
assert!(matches!(decision, crate::models::GovernanceDecision::Allow));
let chain = s.build_namespace_chain("leaf").await.expect("chain");
assert_eq!(chain, vec!["leaf".to_string()]);
let policy = s.resolve_governance_policy("ns").await.expect("policy");
assert!(policy.is_none());
}
#[tokio::test]
async fn default_lifecycle_methods_unsupported() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
assert!(matches!(
s.forget(&ctx, Some("ns"), None, None, false)
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.consolidate(&ctx, &[], "t", "s", "ns", &Tier::Mid, "src", "alice")
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.run_gc(false).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.archive_restore(&ctx, "id").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.archive_purge(&ctx, None).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.archive_by_ids(&ctx, &[], None).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.export_memories().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.export_links().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.notify(&ctx, "agent", "t", "p", None, None)
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
}
#[tokio::test]
async fn default_quota_and_verify_methods_unsupported() {
let s = MinimalStore;
assert!(matches!(
s.quota_status("agent").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.quota_status_list().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.verify_link(VerifyFilter::default()).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
let ctx = CallerContext::for_agent("alice");
assert!(matches!(
s.find_paths(&ctx, "a", "b", None, None).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
}
#[test]
fn default_as_any_is_unit() {
let s: Box<dyn MemoryStore> = Box::new(MinimalStore);
let any = s.as_any();
assert!(any.downcast_ref::<()>().is_some());
}
#[test]
fn arch_15_as_any_for_postgres_alias_delegates_to_as_any() {
let s: Box<dyn MemoryStore> = Box::new(MinimalStore);
#[allow(deprecated)]
let any_legacy = s.as_any_for_postgres();
let any_new = s.as_any();
assert!(any_legacy.downcast_ref::<()>().is_some());
assert!(any_new.downcast_ref::<()>().is_some());
}
#[test]
fn governed_action_string_round_trip() {
assert_eq!(GovernedAction::Store.as_str(), "store");
assert_eq!(GovernedAction::Delete.as_str(), "delete");
assert_eq!(GovernedAction::Promote.as_str(), "promote");
assert_eq!(GovernedAction::Reflect.as_str(), "reflect");
}
#[test]
fn governed_action_from_models_matches_local_enum() {
assert!(matches!(
GovernedAction::from(crate::models::GovernedAction::Store),
GovernedAction::Store
));
assert!(matches!(
GovernedAction::from(crate::models::GovernedAction::Delete),
GovernedAction::Delete
));
assert!(matches!(
GovernedAction::from(crate::models::GovernedAction::Promote),
GovernedAction::Promote
));
assert!(matches!(
GovernedAction::from(crate::models::GovernedAction::Reflect),
GovernedAction::Reflect
));
}
#[test]
fn store_error_invalid_input_and_integrity_displays() {
let e = StoreError::InvalidInput {
detail: "missing source_id".to_string(),
};
assert!(e.to_string().contains("missing source_id"));
let e = StoreError::IntegrityFailed {
detail: "checksum mismatch".to_string(),
};
assert!(e.to_string().contains("checksum mismatch"));
let e = StoreError::Conflict {
id: "dup-id".to_string(),
};
assert!(e.to_string().contains("dup-id"));
let e = StoreError::UnsupportedCapability {
capability: "FOO".to_string(),
};
assert!(e.to_string().contains("FOO"));
let e = StoreError::BackendUnavailable {
backend: "postgres".to_string(),
detail: "connection refused".to_string(),
};
assert!(e.to_string().contains("postgres"));
assert!(e.to_string().contains("connection refused"));
let e = StoreError::Backend(BoxBackendError::new("raw"));
assert!(e.to_string().contains("raw"));
}
#[test]
fn box_backend_error_display_round_trips() {
let e = BoxBackendError::new("a custom error");
assert!(format!("{e}").contains("a custom error"));
}
#[test]
fn approve_outcome_variants_distinct() {
let a = ApproveOutcome::Approved;
let p = ApproveOutcome::Pending {
votes: 1,
quorum: 3,
};
let r = ApproveOutcome::Rejected("nope".to_string());
assert!(a != p);
assert!(p != r);
assert!(a != r);
}
#[test]
fn verify_filter_default_fields_unset() {
let f = VerifyFilter::default();
assert!(f.source_id.is_none());
assert!(f.target_id.is_none());
assert!(f.link_id.is_none());
}
#[test]
fn verify_report_construction_round_trip() {
let r = VerifyReport {
memory_id: "id".to_string(),
integrity_ok: true,
findings: vec!["finding".to_string()],
signature_verified: false,
};
assert_eq!(r.memory_id, "id");
assert!(r.integrity_ok);
assert_eq!(r.findings.len(), 1);
assert!(!r.signature_verified);
}
#[tokio::test]
async fn default_probe_methods_unsupported() {
let s = MinimalStore;
assert!(matches!(
s.health_check().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.stats().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.find_by_title_namespace("t", "ns").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.next_versioned_title("t", "ns").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.find_contradictions("t", "ns").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.check_duplicate_with_text(&[0.1_f32, 0.2], "title content", Some("ns"), 0.9)
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
}
#[tokio::test]
async fn default_kg_archive_methods_unsupported() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
assert!(matches!(
s.kg_query("src", 2, false).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.kg_timeline("src", None, None, Some(10))
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.entity_register(
&ctx,
"Acme",
"ns",
&["acme".to_string()],
&serde_json::json!({}),
Some("alice")
)
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.list_archived(Some("ns"), 10, 0).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.invalidate_link("src", "tgt", "related_to", Some("2026-01-01T00:00:00Z"))
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
}
#[tokio::test]
async fn default_listing_methods_unsupported() {
let s = MinimalStore;
assert!(matches!(
s.list_namespaces().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.get_taxonomy(Some("ns"), 5, 100).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.list_agents().await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.list_pending_actions(Some("pending"), 50)
.await
.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.entity_get_by_alias("acme", Some("ns")).await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
}
#[tokio::test]
async fn default_quota_namespace_scoped_unsupported() {
let s = MinimalStore;
assert!(matches!(
s.quota_status_ns("alice", "ns").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
assert!(matches!(
s.quota_status_list_ns("ns").await.unwrap_err(),
StoreError::UnsupportedCapability { .. }
));
}
#[tokio::test]
async fn default_approve_with_approver_type_forwards_to_consensus() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let err = s
.approve_with_approver_type(&ctx, "pid", "alice")
.await
.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_decide_pending_action_forwards_to_pending_decide() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let err = s
.decide_pending_action(&ctx, "any", true, "alice")
.await
.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_link_signed_with_signature_still_forwards() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let link = MemoryLink {
source_id: "a".to_string(),
target_id: "b".to_string(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: Some(b"sig-bytes".to_vec()),
attest_level: Some("ed25519".to_string()),
};
let level = s
.link_signed(&ctx, &link, None)
.await
.expect("default link_signed forwards");
assert_eq!(level, "unsigned");
}
#[tokio::test]
async fn default_verify_link_with_populated_filter_unsupported() {
let s = MinimalStore;
let filter = VerifyFilter {
source_id: Some("src".to_string()),
target_id: Some("tgt".to_string()),
link_id: Some("lid".to_string()),
};
let err = s.verify_link(filter).await.unwrap_err();
assert!(matches!(err, StoreError::UnsupportedCapability { .. }));
}
#[tokio::test]
async fn default_schema_version_zero_invariant() {
let s = MinimalStore;
let v = s.schema_version().await.expect("default schema_version");
assert_eq!(v, 0, "default body MUST return 0");
}
struct StalledBackfillStore {
rows: usize,
written_per_chunk: usize,
}
#[async_trait]
impl MemoryStore for StalledBackfillStore {
fn capabilities(&self) -> Capabilities {
Capabilities::DURABLE
}
async fn store(&self, _: &CallerContext, m: &Memory) -> StoreResult<String> {
Ok(m.id.clone())
}
async fn get(&self, _: &CallerContext, id: &str) -> StoreResult<Memory> {
Err(StoreError::NotFound { id: id.to_string() })
}
async fn update(&self, _: &CallerContext, _: &str, _: UpdatePatch) -> StoreResult<()> {
Ok(())
}
async fn delete(&self, _: &CallerContext, _: &str) -> StoreResult<()> {
Ok(())
}
async fn list(&self, _: &CallerContext, _: &Filter) -> StoreResult<Vec<Memory>> {
Ok(vec![])
}
async fn search(&self, _: &CallerContext, _: &str, _: &Filter) -> StoreResult<Vec<Memory>> {
Ok(vec![])
}
async fn verify(&self, _: &CallerContext, id: &str) -> StoreResult<VerifyReport> {
Ok(VerifyReport {
memory_id: id.to_string(),
integrity_ok: true,
findings: vec![],
signature_verified: false,
})
}
async fn link(&self, _: &CallerContext, _: &MemoryLink) -> StoreResult<()> {
Ok(())
}
async fn list_links(&self, _: Option<&str>) -> StoreResult<Vec<MemoryLink>> {
Ok(vec![])
}
async fn register_agent(
&self,
_: &CallerContext,
_: &AgentRegistration,
) -> StoreResult<()> {
Ok(())
}
async fn list_unembedded(
&self,
_ctx: &CallerContext,
limit: usize,
) -> StoreResult<Vec<(String, String, String)>> {
Ok((0..self.rows.min(limit))
.map(|i| {
(
format!("stalled-{i}"),
format!("title {i}"),
format!("content {i}"),
)
})
.collect())
}
async fn set_embeddings_batch(
&self,
_ctx: &CallerContext,
_entries: &[(String, Vec<f32>)],
) -> StoreResult<usize> {
Ok(self.written_per_chunk)
}
}
#[tokio::test]
async fn backfill_sweep_is_noop_on_default_list_unembedded() {
let s = MinimalStore;
let ctx = CallerContext::for_admin(crate::identity::sentinels::EMBEDDING_BACKFILL);
let emb = crate::embeddings::test_support::MockEmbedder::new_ollama();
let written = run_embedding_backfill_on_store(&s, &ctx, &emb, 8).await;
assert_eq!(
written, 0,
"default (sqlite-shape) adapters must make the sweep a no-op"
);
}
#[tokio::test]
async fn default_set_embeddings_batch_loops_update_embedding() {
let s = MinimalStore;
let ctx = CallerContext::for_agent("alice");
let entries = vec![
("a".to_string(), vec![0.1_f32]),
("b".to_string(), vec![0.2_f32]),
];
let written = s
.set_embeddings_batch(&ctx, &entries)
.await
.expect("default batch write");
assert_eq!(written, 2, "default body reports one row per entry");
}
#[tokio::test]
async fn backfill_sweep_zero_progress_guard_terminates() {
let s = StalledBackfillStore {
rows: 3,
written_per_chunk: 0,
};
let ctx = CallerContext::for_admin(crate::identity::sentinels::EMBEDDING_BACKFILL);
let emb = crate::embeddings::test_support::MockEmbedder::new_ollama();
let written = tokio::time::timeout(
std::time::Duration::from_secs(10),
run_embedding_backfill_on_store(&s, &ctx, &emb, 0),
)
.await
.expect("zero-progress sweep must terminate, not loop forever");
assert_eq!(written, 0, "zero-progress pass writes nothing");
}
#[tokio::test]
async fn backfill_sweep_stops_on_embedder_vector_count_mismatch() {
struct MisalignedEmbedder;
impl crate::embeddings::Embed for MisalignedEmbedder {
fn embed(&self, _text: &str) -> anyhow::Result<Vec<f32>> {
Ok(vec![0.0_f32])
}
fn embed_batch(&self, _texts: &[&str]) -> anyhow::Result<Vec<Vec<f32>>> {
Ok(vec![vec![0.0_f32]])
}
}
let s = StalledBackfillStore {
rows: 3,
written_per_chunk: 3,
};
let ctx = CallerContext::for_admin(crate::identity::sentinels::EMBEDDING_BACKFILL);
let written = tokio::time::timeout(
std::time::Duration::from_secs(10),
run_embedding_backfill_on_store(&s, &ctx, &MisalignedEmbedder, 8),
)
.await
.expect("misaligned sweep must terminate, not loop forever");
assert_eq!(written, 0, "misaligned chunk must be dropped, not written");
}
struct DefaultImplProbeStore;
#[async_trait::async_trait]
impl MemoryStore for DefaultImplProbeStore {
fn capabilities(&self) -> Capabilities {
Capabilities::empty()
}
async fn store(&self, _ctx: &CallerContext, _memory: &Memory) -> StoreResult<String> {
Err(StoreError::UnsupportedCapability {
capability: "store".to_string(),
})
}
async fn get(&self, _ctx: &CallerContext, id: &str) -> StoreResult<Memory> {
Err(StoreError::NotFound { id: id.to_string() })
}
async fn update(
&self,
_ctx: &CallerContext,
id: &str,
_patch: UpdatePatch,
) -> StoreResult<()> {
Err(StoreError::NotFound { id: id.to_string() })
}
async fn delete(&self, _ctx: &CallerContext, id: &str) -> StoreResult<()> {
Err(StoreError::NotFound { id: id.to_string() })
}
async fn list(&self, _ctx: &CallerContext, _filter: &Filter) -> StoreResult<Vec<Memory>> {
Ok(Vec::new())
}
async fn search(
&self,
_ctx: &CallerContext,
_query: &str,
_filter: &Filter,
) -> StoreResult<Vec<Memory>> {
Ok(Vec::new())
}
async fn verify(&self, _ctx: &CallerContext, id: &str) -> StoreResult<VerifyReport> {
Ok(VerifyReport {
memory_id: id.to_string(),
integrity_ok: true,
findings: Vec::new(),
signature_verified: false,
})
}
async fn link(&self, _ctx: &CallerContext, _link: &MemoryLink) -> StoreResult<()> {
Ok(())
}
async fn list_links(&self, _namespace: Option<&str>) -> StoreResult<Vec<MemoryLink>> {
Ok(vec![])
}
async fn register_agent(
&self,
_ctx: &CallerContext,
_agent: &AgentRegistration,
) -> StoreResult<()> {
Ok(())
}
}
#[test]
fn default_begin_transaction_default_impl_returns_unsupported() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("runtime");
let store = DefaultImplProbeStore;
let ctx = CallerContext::for_agent("test-agent");
match rt.block_on(store.begin_transaction(&ctx)) {
Err(StoreError::UnsupportedCapability { capability }) => {
assert_eq!(capability, "TRANSACTIONS");
}
Err(other) => panic!("expected UnsupportedCapability, got: {other}"),
Ok(_) => panic!("default begin_transaction must error"),
}
}
#[test]
fn store_error_remaining_variants_display_their_detail() {
let conflict = StoreError::Conflict {
id: "dup-1".to_string(),
};
assert_eq!(conflict.to_string(), "identifier conflict on insert: dup-1");
let unavailable = StoreError::BackendUnavailable {
backend: "postgres".to_string(),
detail: "connection refused".to_string(),
};
assert!(unavailable.to_string().contains("postgres"));
assert!(unavailable.to_string().contains("connection refused"));
let invalid = StoreError::InvalidInput {
detail: "empty title".to_string(),
};
assert_eq!(invalid.to_string(), "invalid input: empty title");
let integrity = StoreError::IntegrityFailed {
detail: "missing agent_id".to_string(),
};
assert!(integrity.to_string().contains("missing agent_id"));
let boxed: StoreError = BoxBackendError::new("native driver oops").into();
assert_eq!(
boxed.to_string(),
"underlying backend error: native driver oops"
);
}
#[test]
fn kg_query_row_serde_round_trips() {
let row = KgQueryRow {
target_id: "mem-2".to_string(),
relation: "related_to".to_string(),
depth: 2,
path: "mem-0->mem-1->mem-2".to_string(),
};
let json = serde_json::to_string(&row).unwrap();
assert!(json.contains("\"target_id\":\"mem-2\""));
assert!(json.contains("\"depth\":2"));
let back: KgQueryRow = serde_json::from_str(&json).unwrap();
assert_eq!(back, row);
}
#[test]
fn kg_timeline_row_serde_round_trips_with_and_without_optionals() {
let full = KgTimelineRow {
target_id: "mem-9".to_string(),
relation: "supersedes".to_string(),
valid_from: "2026-01-01T00:00:00Z".to_string(),
valid_until: Some("2026-02-01T00:00:00Z".to_string()),
observed_by: Some("agent-a".to_string()),
title: "Title".to_string(),
target_namespace: "ns".to_string(),
};
let back: KgTimelineRow =
serde_json::from_str(&serde_json::to_string(&full).unwrap()).unwrap();
assert_eq!(back, full);
let legacy = KgTimelineRow {
valid_until: None,
observed_by: None,
..full
};
let back: KgTimelineRow =
serde_json::from_str(&serde_json::to_string(&legacy).unwrap()).unwrap();
assert_eq!(back, legacy);
}
#[test]
fn kg_invalidate_row_serde_round_trips_both_outcomes() {
let matched = KgInvalidateRow {
found: true,
valid_until: "2026-03-01T00:00:00Z".to_string(),
previous_valid_until: Some("2026-02-01T00:00:00Z".to_string()),
};
let back: KgInvalidateRow =
serde_json::from_str(&serde_json::to_string(&matched).unwrap()).unwrap();
assert_eq!(back, matched);
let missed = KgInvalidateRow {
found: false,
valid_until: String::new(),
previous_valid_until: None,
};
let back: KgInvalidateRow =
serde_json::from_str(&serde_json::to_string(&missed).unwrap()).unwrap();
assert_eq!(back, missed);
}
#[test]
fn verify_report_signature_flag_is_independent_of_integrity() {
let report = VerifyReport {
memory_id: "mem-7".to_string(),
integrity_ok: true,
findings: vec!["structural check only".to_string()],
signature_verified: false,
};
let cloned = report.clone();
assert!(cloned.integrity_ok);
assert!(!cloned.signature_verified);
let dbg = format!("{report:?}");
assert!(dbg.contains("signature_verified: false"), "got: {dbg}");
}
#[test]
fn update_patch_default_touches_nothing() {
let patch = UpdatePatch::default();
assert!(patch.title.is_none());
assert!(patch.content.is_none());
assert!(patch.tier.is_none());
assert!(patch.namespace.is_none());
assert!(patch.tags.is_none());
assert!(patch.priority.is_none());
assert!(patch.confidence.is_none());
assert!(patch.metadata.is_none());
}
#[test]
fn minimal_store_dispatches_through_dyn_trait_object() {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.expect("runtime");
let store: Box<dyn MemoryStore> = Box::new(DefaultImplProbeStore);
let ctx = CallerContext::for_agent("test-agent");
assert_eq!(store.capabilities(), Capabilities::empty());
let listed = rt
.block_on(store.list(&ctx, &Filter::default()))
.expect("list");
assert!(listed.is_empty());
let report = rt.block_on(store.verify(&ctx, "mem-1")).expect("verify");
assert_eq!(report.memory_id, "mem-1");
assert!(report.integrity_ok);
let err = rt.block_on(store.get(&ctx, "missing")).unwrap_err();
assert!(matches!(err, StoreError::NotFound { id } if id == "missing"));
}
#[test]
fn integrity_findings_union_checks_1624() {
let now = chrono::Utc::now().to_rfc3339();
let mut mem = Memory {
id: "v-1624".to_string(),
tier: Tier::Mid,
namespace: "ns".to_string(),
title: " ".to_string(),
content: String::new(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "not-a-timestamp".to_string(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
..Memory::default()
};
let findings = integrity_findings(&mem);
assert!(
findings.iter().any(|f| f == "title is empty"),
"{findings:?}"
);
assert!(
findings.iter().any(|f| f == "content is empty"),
"{findings:?}"
);
assert!(
findings.iter().any(|f| f == "metadata.agent_id missing"),
"{findings:?}"
);
assert!(
findings
.iter()
.any(|f| f.starts_with("created_at is not RFC3339")),
"{findings:?}"
);
mem.title = "t".to_string();
mem.content = "c".to_string();
mem.metadata = serde_json::json!({"agent_id": "alice"});
mem.created_at = chrono::Utc::now().to_rfc3339();
assert!(integrity_findings(&mem).is_empty());
}
#[test]
fn for_admin_checked_constructor_cov() {
let admin = CallerContext::for_admin_checked("ops:admin", true);
assert!(admin.bypass_visibility, "is_admin=true ⇒ bypass");
let not_admin = CallerContext::for_admin_checked("ops:admin", false);
assert!(!not_admin.bypass_visibility, "is_admin=false ⇒ no bypass");
}
#[test]
fn track_j_row_shapes_serde_roundtrip_cov() {
let tl = KgTimelineRow {
target_id: "t".into(),
relation: "related_to".into(),
valid_from: "2026-01-01T00:00:00Z".into(),
valid_until: None,
observed_by: Some("ai:obs".into()),
title: "ti".into(),
target_namespace: "ns".into(),
};
let j = serde_json::to_string(&tl).expect("ser KgTimelineRow");
let back: KgTimelineRow = serde_json::from_str(&j).expect("de KgTimelineRow");
assert_eq!(back, tl);
}
#[tokio::test]
async fn mock_adapters_method_surface_conformance_cov() {
let ctx = CallerContext::for_agent("cov-agent");
let mem = {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: "cov-mem".into(),
tier: Tier::Mid,
namespace: "cov".into(),
title: "t".into(),
content: "c".into(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".into(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
..Memory::default()
}
};
let link = MemoryLink {
source_id: "a".into(),
target_id: "b".into(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
let reg = AgentRegistration {
agent_id: "ai:cov".into(),
agent_type: "nhi".into(),
capabilities: vec![],
registered_at: chrono::Utc::now().to_rfc3339(),
last_seen_at: chrono::Utc::now().to_rfc3339(),
};
let filter = Filter::default();
let sb = StalledBackfillStore {
rows: 1,
written_per_chunk: 1,
};
assert!(!sb.capabilities().is_empty());
assert_eq!(sb.store(&ctx, &mem).await.unwrap(), mem.id);
assert!(sb.get(&ctx, "x").await.is_err());
sb.update(&ctx, "x", UpdatePatch::default()).await.unwrap();
sb.delete(&ctx, "x").await.unwrap();
assert!(sb.list(&ctx, &filter).await.unwrap().is_empty());
assert!(sb.search(&ctx, "q", &filter).await.unwrap().is_empty());
assert!(sb.verify(&ctx, "x").await.unwrap().integrity_ok);
sb.link(&ctx, &link).await.unwrap();
assert!(sb.list_links(None).await.unwrap().is_empty());
sb.register_agent(&ctx, ®).await.unwrap();
let dp = DefaultImplProbeStore;
assert!(dp.capabilities().is_empty());
assert!(dp.store(&ctx, &mem).await.is_err());
assert!(dp.get(&ctx, "x").await.is_err());
assert!(dp.update(&ctx, "x", UpdatePatch::default()).await.is_err());
assert!(dp.delete(&ctx, "x").await.is_err());
assert!(dp.list(&ctx, &filter).await.unwrap().is_empty());
assert!(dp.search(&ctx, "q", &filter).await.unwrap().is_empty());
assert!(dp.verify(&ctx, "x").await.unwrap().integrity_ok);
dp.link(&ctx, &link).await.unwrap();
assert!(dp.list_links(None).await.unwrap().is_empty());
dp.register_agent(&ctx, ®).await.unwrap();
assert!(
matches!(
dp.list_by_namespace_prefix(&ctx, "x", 10).await,
Err(StoreError::UnsupportedCapability { .. })
),
"#1625: default surfaces UnsupportedCapability"
);
assert!(
dp.store_with_embedding(&ctx, &mem, Some(&[0.1]))
.await
.is_err()
);
assert!(dp.list_unembedded(&ctx, 8).await.unwrap().is_empty());
dp.update_embedding(&ctx, "x", None).await.unwrap();
}
}