use crate::models::ConfidenceSource;
use crate::models::field_names;
use std::path::PathBuf;
use std::sync::Arc;
use rusqlite::OptionalExtension;
use tokio::sync::Mutex;
use crate::db;
use crate::models::{AgentRegistration, Memory, MemoryLink, Tier};
use super::{
BoxBackendError, CallerContext, Capabilities, CaptureTurnResult, CaptureTurnWrite, Filter,
MemoryStore, StoreError, StoreResult, UpdatePatch, VerifyFilter, VerifyLinkReport,
VerifyReport, is_visible_to_caller,
};
use crate::quotas::{self, QuotaStatus};
pub struct SqliteStore {
state: Arc<Mutex<rusqlite::Connection>>,
path: PathBuf,
}
impl SqliteStore {
pub fn open(path: impl Into<PathBuf>) -> StoreResult<Self> {
let path = path.into();
let conn = db::open(&path).map_err(box_err)?;
Ok(Self {
state: Arc::new(Mutex::new(conn)),
path,
})
}
#[must_use]
pub fn path(&self) -> &std::path::Path {
&self.path
}
}
fn box_err<E: std::fmt::Display>(e: E) -> StoreError {
StoreError::Backend(BoxBackendError::new(e.to_string()))
}
#[async_trait::async_trait]
impl MemoryStore for SqliteStore {
fn capabilities(&self) -> Capabilities {
Capabilities::FULLTEXT | Capabilities::DURABLE | Capabilities::STRONG_CONSISTENCY
}
async fn schema_version(&self) -> StoreResult<i64> {
let conn = self.state.lock().await;
let v: i64 = conn
.query_row(
crate::storage::migrations::SELECT_SCHEMA_VERSION_SQL,
[],
|row| row.get(0),
)
.unwrap_or(0);
Ok(v)
}
async fn store(&self, _ctx: &CallerContext, memory: &Memory) -> StoreResult<String> {
let conn = self.state.lock().await;
db::insert(&conn, memory).map_err(box_err)
}
async fn capture_turn_idempotent(
&self,
_ctx: &CallerContext,
write: &CaptureTurnWrite,
) -> StoreResult<CaptureTurnResult> {
let conn = self.state.lock().await;
db::capture_turn_idempotent(&conn, write).map_err(box_err)
}
async fn get(&self, ctx: &CallerContext, id: &str) -> StoreResult<Memory> {
let conn = self.state.lock().await;
match db::get(&conn, id).map_err(box_err)? {
Some(mem) => {
if ctx.bypass_visibility || is_visible_to_caller(&mem, ctx.effective_principal()) {
Ok(mem)
} else {
Err(StoreError::NotFound { id: id.to_string() })
}
}
None => Err(StoreError::NotFound { id: id.to_string() }),
}
}
async fn update(&self, _ctx: &CallerContext, id: &str, patch: UpdatePatch) -> StoreResult<()> {
let conn = self.state.lock().await;
let (found, _content_changed) = db::update_with_expected_version(
&conn,
id,
patch.title.as_deref(),
patch.content.as_deref(),
patch.tier.as_ref(),
patch.namespace.as_deref(),
patch.tags.as_ref(),
patch.priority,
patch.confidence,
patch.expires_at.as_deref(),
patch.metadata.as_ref(),
patch.source_uri.as_deref(),
None,
)
.map_err(box_err)?;
if found {
Ok(())
} else {
Err(StoreError::NotFound { id: id.to_string() })
}
}
async fn delete(&self, _ctx: &CallerContext, id: &str) -> StoreResult<()> {
let conn = self.state.lock().await;
let removed = db::delete(&conn, id).map_err(box_err)?;
if removed {
Ok(())
} else {
Err(StoreError::NotFound { id: id.to_string() })
}
}
async fn list(&self, ctx: &CallerContext, filter: &Filter) -> StoreResult<Vec<Memory>> {
let conn = self.state.lock().await;
let tags_first = filter.tags_any.first().map(String::as_str);
let since = filter.since.map(|d| d.to_rfc3339());
let until = filter.until.map(|d| d.to_rfc3339());
let rows = db::list(
&conn,
filter.namespace.as_deref(),
filter.tier.as_ref(),
if filter.limit == 0 { 100 } else { filter.limit },
0,
None,
since.as_deref(),
until.as_deref(),
tags_first,
filter.agent_id.as_deref(),
)
.map_err(box_err)?;
if ctx.bypass_visibility {
return Ok(rows);
}
let caller = ctx.effective_principal();
Ok(rows
.into_iter()
.filter(|m| is_visible_to_caller(m, caller))
.collect())
}
async fn list_by_namespace_prefix(
&self,
ctx: &CallerContext,
prefix: &str,
limit: usize,
) -> StoreResult<Vec<Memory>> {
const PAGE: usize = 256;
let conn = self.state.lock().await;
let caller = ctx.effective_principal().to_string();
let mut out: Vec<Memory> = Vec::new();
let mut offset = 0usize;
loop {
let rows = db::list(
&conn, None, None, PAGE, offset, None, None, None, None, None,
)
.map_err(box_err)?;
let page_len = rows.len();
for m in rows {
if !m.namespace.starts_with(prefix) {
continue;
}
if !ctx.bypass_visibility && !is_visible_to_caller(&m, &caller) {
continue;
}
out.push(m);
if out.len() >= limit {
return Ok(out);
}
}
if page_len < PAGE {
return Ok(out);
}
offset += PAGE;
}
}
async fn search(
&self,
ctx: &CallerContext,
query: &str,
filter: &Filter,
) -> StoreResult<Vec<Memory>> {
let conn = self.state.lock().await;
let tags_first = filter.tags_any.first().map(String::as_str);
let since = filter.since.map(|d| d.to_rfc3339());
let until = filter.until.map(|d| d.to_rfc3339());
let rows = db::search(
&conn,
query,
filter.namespace.as_deref(),
filter.tier.as_ref(),
if filter.limit == 0 { 100 } else { filter.limit },
None,
since.as_deref(),
until.as_deref(),
tags_first,
filter.agent_id.as_deref(),
ctx.as_agent.as_deref(),
false,
)
.map_err(box_err)?;
if ctx.bypass_visibility {
return Ok(rows);
}
let caller = ctx.effective_principal();
Ok(rows
.into_iter()
.filter(|m| is_visible_to_caller(m, caller))
.collect())
}
async fn verify(&self, _ctx: &CallerContext, id: &str) -> StoreResult<VerifyReport> {
let conn = self.state.lock().await;
let Some(mem) = db::get(&conn, id).map_err(box_err)? else {
return Err(StoreError::NotFound { id: id.to_string() });
};
let findings = super::integrity_findings(&mem);
Ok(VerifyReport {
memory_id: id.to_string(),
integrity_ok: findings.is_empty(),
findings,
signature_verified: false,
})
}
async fn link(&self, _ctx: &CallerContext, link: &MemoryLink) -> StoreResult<()> {
let conn = self.state.lock().await;
db::create_link(
&conn,
&link.source_id,
&link.target_id,
link.relation.as_str(),
)
.map_err(box_err)
}
async fn link_signed(
&self,
_ctx: &CallerContext,
link: &MemoryLink,
keypair: Option<&crate::identity::keypair::AgentKeypair>,
) -> StoreResult<&'static str> {
let conn = self.state.lock().await;
db::create_link_signed(
&conn,
&link.source_id,
&link.target_id,
link.relation.as_str(),
keypair,
)
.map_err(box_err)
}
async fn get_links_for_anchor(&self, anchor_id: &str) -> StoreResult<Vec<MemoryLink>> {
let conn = self.state.lock().await;
db::get_links(&conn, anchor_id).map_err(box_err)
}
async fn list_links(&self, namespace: Option<&str>) -> StoreResult<Vec<MemoryLink>> {
let conn = self.state.lock().await;
let mut stmt = conn
.prepare(
"SELECT ml.source_id, ml.target_id, ml.relation, ml.created_at,
ml.valid_from, ml.valid_until, ml.observed_by, ml.signature
FROM memory_links ml
WHERE ?1 IS NULL
OR EXISTS (SELECT 1 FROM memories m
WHERE m.id = ml.source_id AND m.namespace = ?1)
ORDER BY ml.source_id, ml.target_id, ml.relation",
)
.map_err(box_err)?;
let rows = stmt
.query_map(rusqlite::params![namespace], |row| {
let relation_str: String = row.get(2)?;
Ok(MemoryLink {
source_id: row.get(0)?,
target_id: row.get(1)?,
relation: crate::models::MemoryLinkRelation::from_str(&relation_str)
.unwrap_or_default(),
created_at: row.get(3)?,
valid_from: row.get::<_, Option<String>>(4)?,
valid_until: row.get::<_, Option<String>>(5)?,
observed_by: row.get::<_, Option<String>>(6)?,
signature: row.get::<_, Option<Vec<u8>>>(7)?,
attest_level: None,
})
})
.map_err(box_err)?;
rows.collect::<rusqlite::Result<Vec<_>>>().map_err(box_err)
}
async fn register_agent(
&self,
_ctx: &CallerContext,
agent: &AgentRegistration,
) -> StoreResult<()> {
let conn = self.state.lock().await;
db::register_agent(
&conn,
&agent.agent_id,
&agent.agent_type,
&agent.capabilities,
)
.map_err(box_err)
.map(|_id| ())
}
async fn bind_agent_pubkey(
&self,
_ctx: &CallerContext,
agent_id: &str,
pubkey_b64: &str,
) -> StoreResult<()> {
let conn = self.state.lock().await;
db::bind_agent_pubkey(&conn, agent_id, pubkey_b64).map_err(box_err)
}
async fn agent_pubkey(&self, agent_id: &str) -> StoreResult<Option<String>> {
let conn = self.state.lock().await;
db::agent_pubkey(&conn, agent_id).map_err(box_err)
}
async fn revoke_agent_pubkey(&self, _ctx: &CallerContext, agent_id: &str) -> StoreResult<()> {
let conn = self.state.lock().await;
db::revoke_agent_pubkey(&conn, agent_id).map_err(box_err)
}
async fn list_memories_updated_since(
&self,
since: Option<&str>,
limit: usize,
) -> StoreResult<Vec<Memory>> {
let conn = self.state.lock().await;
let capped = limit.clamp(1, 10_000);
db::memories_updated_since(&conn, since, capped).map_err(box_err)
}
async fn apply_remote_memory(
&self,
_ctx: &CallerContext,
memory: &Memory,
) -> StoreResult<String> {
let conn = self.state.lock().await;
db::insert_if_newer(&conn, memory).map_err(box_err)
}
async fn apply_remote_link(
&self,
_ctx: &CallerContext,
link: &MemoryLink,
attest_level: &str,
) -> StoreResult<()> {
let conn = self.state.lock().await;
db::create_link_inbound(&conn, link, attest_level).map_err(box_err)
}
async fn apply_remote_deletion(&self, _ctx: &CallerContext, id: &str) -> StoreResult<bool> {
let conn = self.state.lock().await;
db::delete(&conn, id).map_err(box_err)
}
async fn recall_hybrid(
&self,
ctx: &CallerContext,
query: &str,
query_embedding: Option<&[f32]>,
filter: &Filter,
) -> StoreResult<Vec<(Memory, f64)>> {
let conn = self.state.lock().await;
let tags_first = filter.tags_any.first().map(String::as_str);
let since = filter.since.map(|d| d.to_rfc3339());
let until = filter.until.map(|d| d.to_rfc3339());
let limit = if filter.limit == 0 { 10 } else { filter.limit };
let scoring = crate::config::ResolvedScoring::default();
let results = if let Some(qe) = query_embedding {
db::recall_hybrid(
&conn,
query,
qe,
filter.namespace.as_deref(),
limit,
tags_first,
since.as_deref(),
until.as_deref(),
None, crate::SECS_PER_HOUR,
crate::SECS_PER_DAY,
ctx.as_agent.as_deref(),
None,
&scoring,
false,
None,
)
.map_err(box_err)?
.0
} else {
db::recall(
&conn,
query,
filter.namespace.as_deref(),
limit,
tags_first,
since.as_deref(),
until.as_deref(),
crate::SECS_PER_HOUR,
crate::SECS_PER_DAY,
ctx.as_agent.as_deref(),
None,
false,
None,
)
.map_err(box_err)?
.0
};
if ctx.bypass_visibility {
return Ok(results);
}
let caller = ctx.effective_principal();
Ok(results
.into_iter()
.filter(|(m, _)| is_visible_to_caller(m, caller))
.collect())
}
async fn touch_after_recall(&self, ids: &[String]) -> StoreResult<()> {
if ids.is_empty() {
return Ok(());
}
let conn = self.state.lock().await;
let id_refs: Vec<&str> = ids.iter().map(String::as_str).collect();
if let Err(e) = db::touch_many(&conn, &id_refs, crate::SECS_PER_HOUR, crate::SECS_PER_DAY) {
tracing::warn!("touch_many failed for {} memories: {e}", ids.len());
}
if crate::confidence::decay::decay_enabled() {
if let Err(e) = conn.execute_batch(crate::storage::connection::SQL_BEGIN_IMMEDIATE) {
tracing::warn!("decay-touch BEGIN failed: {e}");
} else {
for id in ids {
if let Err(e) = crate::confidence::decay::apply_decay_touch(&conn, id) {
tracing::warn!("confidence decay touch failed for memory {id}: {e}");
}
}
if let Err(e) = conn.execute_batch(crate::storage::connection::SQL_COMMIT) {
tracing::warn!("decay-touch COMMIT failed: {e}");
let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
}
}
}
Ok(())
}
async fn pending_decide(
&self,
_ctx: &CallerContext,
id: &str,
approve: bool,
decided_by: &str,
) -> StoreResult<bool> {
let conn = self.state.lock().await;
db::decide_pending_action(&conn, id, approve, decided_by).map_err(box_err)
}
async fn get_pending(
&self,
_ctx: &CallerContext,
id: &str,
) -> StoreResult<Option<crate::models::PendingAction>> {
let conn = self.state.lock().await;
db::get_pending_action(&conn, id).map_err(box_err)
}
async fn set_namespace_standard(
&self,
_ctx: &CallerContext,
namespace: &str,
standard_id: &str,
parent: Option<&str>,
) -> StoreResult<()> {
let conn = self.state.lock().await;
db::set_namespace_standard(&conn, namespace, standard_id, parent).map_err(box_err)
}
async fn clear_namespace_standard(
&self,
_ctx: &CallerContext,
namespace: &str,
) -> StoreResult<bool> {
let conn = self.state.lock().await;
db::clear_namespace_standard(&conn, namespace).map_err(box_err)
}
async fn get_namespace_standard(
&self,
_ctx: &CallerContext,
namespace: &str,
) -> StoreResult<Option<(String, Option<String>)>> {
let conn = self.state.lock().await;
let mut stmt = conn
.prepare(
"SELECT standard_id, parent_namespace FROM namespace_meta WHERE namespace = ?1",
)
.map_err(box_err)?;
let mut rows = stmt
.query_map(rusqlite::params![namespace], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
})
.map_err(box_err)?;
match rows.next() {
Some(Ok(tuple)) => Ok(Some(tuple)),
Some(Err(e)) => Err(box_err(e)),
None => Ok(None),
}
}
async fn forget(
&self,
_ctx: &CallerContext,
namespace: Option<&str>,
pattern: Option<&str>,
tier: Option<&Tier>,
archive: bool,
) -> StoreResult<usize> {
if namespace.is_none() && pattern.is_none() && tier.is_none() {
return Err(StoreError::InvalidInput {
detail: crate::errors::msg::FORGET_FILTER_REQUIRED.to_string(),
});
}
let conn = self.state.lock().await;
db::forget(&conn, namespace, pattern, tier, archive).map_err(box_err)
}
async fn consolidate(
&self,
_ctx: &CallerContext,
ids: &[String],
title: &str,
summary: &str,
namespace: &str,
tier: &Tier,
source: &str,
consolidator_agent_id: &str,
) -> StoreResult<String> {
let conn = self.state.lock().await;
db::consolidate(
&conn,
ids,
title,
summary,
namespace,
tier,
source,
consolidator_agent_id,
)
.map_err(box_err)
}
async fn reflect(
&self,
_ctx: &CallerContext,
input: &crate::storage::reflect::ReflectInput,
signing_key: Option<&crate::identity::keypair::AgentKeypair>,
) -> Result<crate::storage::reflect::ReflectOutcome, crate::storage::reflect::ReflectError>
{
let conn = self.state.lock().await;
let mut hooks = db::ReflectHooks::empty();
hooks.active_keypair = signing_key;
db::reflect_with_hooks(&conn, input, &hooks)
}
async fn get_reflection_origin(
&self,
id: &str,
) -> StoreResult<Option<crate::federation::reflection_bookkeeping::ReflectionOrigin>> {
let conn = self.state.lock().await;
crate::federation::reflection_bookkeeping::reflection_origin(&conn, id).map_err(box_err)
}
async fn list_recall_observations(
&self,
recall_id: Option<&str>,
consumed: Option<bool>,
since: Option<&str>,
until: Option<&str>,
limit: usize,
) -> StoreResult<Vec<crate::observations::Observation>> {
let conn = self.state.lock().await;
crate::observations::list_observations(&conn, recall_id, consumed, since, until, limit)
.map_err(box_err)
}
async fn run_gc(&self, archive: bool) -> StoreResult<usize> {
let conn = self.state.lock().await;
db::gc(&conn, archive).map_err(box_err)
}
async fn archive_restore(&self, _ctx: &CallerContext, id: &str) -> StoreResult<bool> {
let conn = self.state.lock().await;
db::restore_archived(&conn, id).map_err(box_err)
}
async fn archive_purge(
&self,
ctx: &CallerContext,
older_than_days: Option<i64>,
) -> StoreResult<usize> {
let conn = self.state.lock().await;
if ctx.bypass_visibility {
db::purge_archive(&conn, older_than_days).map_err(box_err)
} else {
db::purge_archive_for_caller(&conn, ctx.effective_principal(), older_than_days)
.map_err(box_err)
}
}
async fn archive_by_ids(
&self,
_ctx: &CallerContext,
ids: &[String],
reason: Option<&str>,
) -> StoreResult<usize> {
let conn = self.state.lock().await;
let mut moved = 0usize;
for id in ids {
match db::archive_memory(&conn, id, reason) {
Ok(true) => moved += 1,
Ok(false) => {}
Err(e) => return Err(box_err(e)),
}
}
Ok(moved)
}
async fn export_memories(&self) -> StoreResult<Vec<Memory>> {
let conn = self.state.lock().await;
db::export_all(&conn).map_err(box_err)
}
async fn export_links(&self) -> StoreResult<Vec<MemoryLink>> {
let conn = self.state.lock().await;
db::export_links(&conn).map_err(box_err)
}
async fn build_namespace_chain(&self, namespace: &str) -> StoreResult<Vec<String>> {
let conn = self.state.lock().await;
Ok(db::build_namespace_chain(&conn, namespace))
}
async fn resolve_governance_policy(
&self,
namespace: &str,
) -> StoreResult<Option<crate::models::GovernancePolicy>> {
let conn = self.state.lock().await;
Ok(db::resolve_governance_policy(&conn, namespace))
}
async fn governance_approve_with_consensus(
&self,
_ctx: &CallerContext,
pending_id: &str,
approver_agent_id: &str,
) -> StoreResult<super::ApproveOutcome> {
let conn = self.state.lock().await;
let outcome = db::approve_with_approver_type(&conn, pending_id, approver_agent_id)
.map_err(box_err)?;
let sal_outcome = match outcome {
db::ApproveOutcome::Approved => super::ApproveOutcome::Approved,
db::ApproveOutcome::Pending { votes, quorum } => {
super::ApproveOutcome::Pending { votes, quorum }
}
db::ApproveOutcome::NotFound => {
return Err(super::StoreError::NotFound {
id: pending_id.to_string(),
});
}
db::ApproveOutcome::Rejected(reason) => super::ApproveOutcome::Rejected(reason),
};
Ok(sal_outcome)
}
async fn is_registered_agent(&self, agent_id: &str) -> StoreResult<bool> {
let conn = self.state.lock().await;
Ok(db::is_registered_agent(&conn, agent_id))
}
async fn enforce_governance_action(
&self,
action: super::GovernedAction,
namespace: &str,
agent_id: &str,
memory_id: Option<&str>,
memory_owner: Option<&str>,
payload: &serde_json::Value,
) -> StoreResult<crate::models::GovernanceDecision> {
let db_action = match action {
super::GovernedAction::Store => crate::models::GovernedAction::Store,
super::GovernedAction::Delete => crate::models::GovernedAction::Delete,
super::GovernedAction::Promote => crate::models::GovernedAction::Promote,
super::GovernedAction::Reflect => crate::models::GovernedAction::Reflect,
};
let conn = self.state.lock().await;
db::enforce_governance(
&conn,
db_action,
namespace,
agent_id,
memory_id,
memory_owner,
payload,
)
.map_err(box_err)
}
async fn quota_status(&self, agent_id: &str) -> StoreResult<QuotaStatus> {
let conn = self.state.lock().await;
quotas::get_aggregate_status(&conn, agent_id).map_err(box_err)
}
async fn quota_status_ns(&self, agent_id: &str, namespace: &str) -> StoreResult<QuotaStatus> {
let conn = self.state.lock().await;
quotas::get_status(&conn, agent_id, namespace).map_err(box_err)
}
async fn quota_status_list(&self) -> StoreResult<Vec<QuotaStatus>> {
let conn = self.state.lock().await;
quotas::list_status(&conn, None).map_err(box_err)
}
async fn quota_status_list_ns(&self, namespace: &str) -> StoreResult<Vec<QuotaStatus>> {
let conn = self.state.lock().await;
quotas::list_status(&conn, Some(namespace)).map_err(box_err)
}
async fn verify_link(&self, filter: VerifyFilter) -> StoreResult<VerifyLinkReport> {
if filter.source_id.is_none() && filter.link_id.is_none() {
return Err(StoreError::InvalidInput {
detail: crate::errors::msg::VERIFY_LINK_ARGS_REQUIRED.to_string(),
});
}
let (source_id, target_id, relation_filter) = if let Some(link_id) =
filter.link_id.as_deref()
{
let parts: Vec<&str> = link_id.split('|').collect();
if parts.len() != 3 {
return Err(StoreError::InvalidInput {
detail: format!(
"link_id must be canonical source_id|target_id|relation triple, got {link_id}"
),
});
}
(
parts[0].to_string(),
Some(parts[1].to_string()),
Some(parts[2].to_string()),
)
} else {
(filter.source_id.unwrap_or_default(), filter.target_id, None)
};
let conn = self.state.lock().await;
let row: Option<(
String,
String,
String,
Option<String>,
Option<String>,
Option<String>,
Option<Vec<u8>>,
Option<String>,
)> = match (target_id.as_deref(), relation_filter.as_deref()) {
(Some(t), Some(r)) => conn
.query_row(
"SELECT source_id, target_id, relation, valid_from, valid_until, \
observed_by, signature, attest_level
FROM memory_links \
WHERE source_id = ?1 AND target_id = ?2 AND relation = ?3 \
LIMIT 1",
rusqlite::params![source_id, t, r],
|r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
r.get::<_, Option<String>>(3)?,
r.get::<_, Option<String>>(4)?,
r.get::<_, Option<String>>(5)?,
r.get::<_, Option<Vec<u8>>>(6)?,
r.get::<_, Option<String>>(7)?,
))
},
)
.optional()
.map_err(box_err)?,
(Some(t), None) => conn
.query_row(
"SELECT source_id, target_id, relation, valid_from, valid_until, \
observed_by, signature, attest_level
FROM memory_links \
WHERE source_id = ?1 AND target_id = ?2 \
ORDER BY created_at ASC LIMIT 1",
rusqlite::params![source_id, t],
|r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
r.get::<_, Option<String>>(3)?,
r.get::<_, Option<String>>(4)?,
r.get::<_, Option<String>>(5)?,
r.get::<_, Option<Vec<u8>>>(6)?,
r.get::<_, Option<String>>(7)?,
))
},
)
.optional()
.map_err(box_err)?,
(None, _) => conn
.query_row(
"SELECT source_id, target_id, relation, valid_from, valid_until, \
observed_by, signature, attest_level
FROM memory_links \
WHERE source_id = ?1 \
ORDER BY created_at ASC LIMIT 1",
rusqlite::params![source_id],
|r| {
Ok((
r.get::<_, String>(0)?,
r.get::<_, String>(1)?,
r.get::<_, String>(2)?,
r.get::<_, Option<String>>(3)?,
r.get::<_, Option<String>>(4)?,
r.get::<_, Option<String>>(5)?,
r.get::<_, Option<Vec<u8>>>(6)?,
r.get::<_, Option<String>>(7)?,
))
},
)
.optional()
.map_err(box_err)?,
};
let Some((src, tgt, rel, vf, vu, obs, sig, attest)) = row else {
return Err(StoreError::NotFound {
id: format!(
"link {source_id} -> {} {}",
target_id.as_deref().unwrap_or("?"),
relation_filter.as_deref().unwrap_or("?")
),
});
};
let attest_level =
attest.unwrap_or_else(|| crate::models::AttestLevel::Unsigned.as_str().to_string());
let signature_present = sig.is_some();
let mut findings: Vec<String> = Vec::new();
let verified = if signature_present {
let observed = obs.as_deref().unwrap_or("");
match crate::identity::verify::lookup_peer_public_key(observed) {
None => {
findings.push(format!(
"signature present but no enrolled public key for observed_by={observed}"
));
false
}
Some(pubkey) => {
let signable = crate::identity::sign::SignableLink {
src_id: &src,
dst_id: &tgt,
relation: &rel,
observed_by: obs.as_deref(),
valid_from: vf.as_deref(),
valid_until: vu.as_deref(),
};
let sig_bytes = sig.as_deref().unwrap_or(&[]);
match crate::identity::verify::verify(&pubkey, &signable, sig_bytes) {
Ok(()) => true,
Err(e) => {
findings.push(crate::errors::msg::signature_verify_failed(e));
false
}
}
}
}
} else {
true
};
Ok(VerifyLinkReport {
source_id: src,
target_id: tgt,
relation: rel,
verified,
attest_level,
signature_present,
observed_by: obs,
findings,
})
}
async fn find_paths(
&self,
ctx: &CallerContext,
source_id: &str,
target_id: &str,
max_depth: Option<usize>,
max_results: Option<usize>,
) -> StoreResult<Vec<Vec<String>>> {
let conn = self.state.lock().await;
let paths = db::find_paths(&conn, source_id, target_id, max_depth, max_results, false)
.map_err(box_err)?;
if ctx.bypass_visibility {
return Ok(paths);
}
let caller = ctx.effective_principal();
let mut visible_cache: std::collections::HashMap<String, bool> =
std::collections::HashMap::new();
let mut filtered: Vec<Vec<String>> = Vec::with_capacity(paths.len());
'outer: for path in paths {
for node in &path {
let entry = visible_cache.entry(node.clone()).or_insert_with(|| {
match db::get(&conn, node) {
Ok(Some(mem)) => is_visible_to_caller(&mem, caller),
Ok(None) | Err(_) => false,
}
});
if !*entry {
continue 'outer;
}
}
filtered.push(path);
}
Ok(filtered)
}
async fn list_namespaces(&self) -> StoreResult<Vec<crate::models::NamespaceCount>> {
let conn = self.state.lock().await;
db::list_namespaces(&conn).map_err(box_err)
}
async fn get_taxonomy(
&self,
namespace_prefix: Option<&str>,
max_depth: usize,
limit: usize,
) -> StoreResult<crate::models::Taxonomy> {
let conn = self.state.lock().await;
db::get_taxonomy(&conn, namespace_prefix, max_depth, limit).map_err(box_err)
}
async fn list_agents(&self) -> StoreResult<Vec<AgentRegistration>> {
let conn = self.state.lock().await;
db::list_agents(&conn).map_err(box_err)
}
async fn list_pending_actions(
&self,
status: Option<&str>,
limit: usize,
) -> StoreResult<Vec<crate::models::PendingAction>> {
let conn = self.state.lock().await;
db::list_pending_actions(&conn, status, limit).map_err(box_err)
}
async fn entity_get_by_alias(
&self,
alias: &str,
namespace: Option<&str>,
) -> StoreResult<Option<crate::models::EntityRecord>> {
let conn = self.state.lock().await;
db::entity_get_by_alias(&conn, alias, namespace).map_err(box_err)
}
async fn health_check(&self) -> StoreResult<bool> {
let conn = self.state.lock().await;
db::health_check(&conn).map_err(box_err)
}
async fn stats(&self) -> StoreResult<crate::models::Stats> {
let conn = self.state.lock().await;
db::stats(&conn, &self.path).map_err(box_err)
}
async fn update_embedding(
&self,
_ctx: &CallerContext,
id: &str,
embedding: Option<&[f32]>,
) -> StoreResult<()> {
let conn = self.state.lock().await;
match embedding {
Some(vec) => db::set_embedding(&conn, id, vec).map_err(box_err),
None => db::set_embedding(&conn, id, &[]).map_err(box_err),
}
}
async fn find_by_title_namespace(
&self,
title: &str,
namespace: &str,
) -> StoreResult<Option<String>> {
let conn = self.state.lock().await;
db::find_by_title_namespace(&conn, title, namespace).map_err(box_err)
}
async fn next_versioned_title(&self, base_title: &str, namespace: &str) -> StoreResult<String> {
let conn = self.state.lock().await;
db::next_versioned_title(&conn, base_title, namespace).map_err(box_err)
}
async fn find_contradictions(&self, title: &str, namespace: &str) -> StoreResult<Vec<Memory>> {
let conn = self.state.lock().await;
db::find_contradictions(&conn, title, namespace).map_err(box_err)
}
async fn invalidate_link(
&self,
source_id: &str,
target_id: &str,
relation: &str,
valid_until: Option<&str>,
) -> StoreResult<crate::store::KgInvalidateRow> {
let conn = self.state.lock().await;
match db::invalidate_link(&conn, source_id, target_id, relation, valid_until)
.map_err(box_err)?
{
Some(res) => Ok(crate::store::KgInvalidateRow {
found: true,
valid_until: res.valid_until,
previous_valid_until: res.previous_valid_until,
}),
None => Ok(crate::store::KgInvalidateRow {
found: false,
valid_until: String::new(),
previous_valid_until: None,
}),
}
}
async fn check_duplicate_with_text(
&self,
query_embedding: &[f32],
query_text: &str,
namespace: Option<&str>,
threshold: f32,
) -> StoreResult<crate::models::DuplicateCheck> {
let conn = self.state.lock().await;
db::check_duplicate_with_text(&conn, query_embedding, query_text, namespace, threshold)
.map_err(box_err)
}
async fn notify(
&self,
ctx: &CallerContext,
target_agent: &str,
title: &str,
payload: &str,
priority: Option<i32>,
tier: Option<&Tier>,
) -> StoreResult<String> {
let now = chrono::Utc::now().to_rfc3339();
let resolved_tier = tier.cloned().unwrap_or(Tier::Short);
let priority = priority.unwrap_or(5);
let metadata = serde_json::json!({
"agent_id": &ctx.agent_id,
(field_names::TARGET_AGENT_ID): target_agent,
"notify": true,
});
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: resolved_tier,
namespace: crate::inbox_namespace(target_agent),
title: title.to_string(),
content: payload.to_string(),
tags: vec!["notify".to_string()],
priority,
confidence: 1.0,
source: "notify".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let conn = self.state.lock().await;
db::insert(&conn, &mem).map_err(box_err)
}
async fn execute_pending_action(
&self,
_ctx: &CallerContext,
pending_id: &str,
) -> StoreResult<Option<String>> {
let conn = self.state.lock().await;
db::execute_pending_action(&conn, pending_id).map_err(box_err)
}
async fn approve_with_approver_type(
&self,
_ctx: &CallerContext,
pending_id: &str,
approver_agent_id: &str,
) -> StoreResult<super::ApproveOutcome> {
let conn = self.state.lock().await;
let outcome = db::approve_with_approver_type(&conn, pending_id, approver_agent_id)
.map_err(box_err)?;
let sal = match outcome {
db::ApproveOutcome::Approved => super::ApproveOutcome::Approved,
db::ApproveOutcome::Pending { votes, quorum } => {
super::ApproveOutcome::Pending { votes, quorum }
}
db::ApproveOutcome::NotFound => {
return Err(super::StoreError::NotFound {
id: pending_id.to_string(),
});
}
db::ApproveOutcome::Rejected(reason) => super::ApproveOutcome::Rejected(reason),
};
Ok(sal)
}
async fn decide_pending_action(
&self,
_ctx: &CallerContext,
id: &str,
approve: bool,
decided_by: &str,
) -> StoreResult<bool> {
let conn = self.state.lock().await;
db::decide_pending_action(&conn, id, approve, decided_by).map_err(box_err)
}
async fn kg_query(
&self,
source_id: &str,
max_depth: usize,
include_invalidated: bool,
) -> StoreResult<Vec<super::KgQueryRow>> {
let conn = self.state.lock().await;
let nodes = db::kg_query(
&conn,
source_id,
max_depth,
None,
None,
None,
include_invalidated,
)
.map_err(box_err)?;
Ok(nodes
.into_iter()
.map(|n| super::KgQueryRow {
target_id: n.target_id,
relation: n.relation,
depth: n.depth,
path: n.path,
})
.collect())
}
async fn kg_timeline(
&self,
source_id: &str,
since: Option<&str>,
until: Option<&str>,
limit: Option<usize>,
) -> StoreResult<Vec<super::KgTimelineRow>> {
let conn = self.state.lock().await;
let events = db::kg_timeline(&conn, source_id, since, until, limit).map_err(box_err)?;
Ok(events
.into_iter()
.map(|e| super::KgTimelineRow {
target_id: e.target_id,
relation: e.relation,
valid_from: e.valid_from,
valid_until: e.valid_until,
observed_by: e.observed_by,
title: e.title,
target_namespace: e.target_namespace,
})
.collect())
}
async fn entity_register(
&self,
_ctx: &CallerContext,
canonical_name: &str,
namespace: &str,
aliases: &[String],
extra_metadata: &serde_json::Value,
agent_id: Option<&str>,
) -> StoreResult<crate::models::EntityRegistration> {
let conn = self.state.lock().await;
db::entity_register(
&conn,
canonical_name,
namespace,
aliases,
extra_metadata,
agent_id,
)
.map_err(box_err)
}
async fn list_archived(
&self,
namespace: Option<&str>,
limit: usize,
offset: usize,
) -> StoreResult<Vec<serde_json::Value>> {
let conn = self.state.lock().await;
db::list_archived(&conn, namespace, limit, offset).map_err(box_err)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::Tier;
fn test_memory(title: &str, content: &str) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: "sal-test".to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec!["test".to_string()],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id": "alice"}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[tokio::test]
async fn inherited_trait_defaults_roundtrip_cov() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
let m = test_memory("def-emb", "store_with_embedding default forwards to store");
let id = store
.store_with_embedding(&ctx, &m, Some(&[0.1f32, 0.2, 0.3]))
.await
.expect("store_with_embedding default");
assert_eq!(id, m.id);
assert!(store.get(&ctx, &m.id).await.expect("get").id == m.id);
let batch = vec![
test_memory("def-batch-1", "batch row one body"),
test_memory("def-batch-2", "batch row two body"),
];
let ids = store
.store_batch(&ctx, &batch)
.await
.expect("store_batch default");
assert_eq!(ids.len(), 2);
let unembedded = store
.list_unembedded(&ctx, 10)
.await
.expect("list_unembedded default");
assert!(unembedded.is_empty(), "default list_unembedded is empty");
let written = store
.set_embeddings_batch(&ctx, &[(m.id.clone(), vec![0.4f32, 0.5])])
.await
.expect("set_embeddings_batch default");
assert_eq!(
written, 1,
"default set_embeddings_batch counts the no-op writes"
);
}
#[tokio::test]
async fn list_by_namespace_prefix_finds_matches_beyond_first_page_1625() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
for i in 0..260 {
let mut m = test_memory(&format!("bulk-{i}"), "filler row");
m.namespace = "bulk/noise".to_string();
m.priority = 9;
store.store(&ctx, &m).await.expect("store bulk");
}
for i in 0..2 {
let mut m = test_memory(&format!("pfx-{i}"), "target row");
m.namespace = "pfx/sub".to_string();
m.priority = 1;
store.store(&ctx, &m).await.expect("store pfx");
}
let got = store
.list_by_namespace_prefix(&ctx, "pfx", 10)
.await
.expect("prefix list");
assert_eq!(
got.len(),
2,
"#1625: both prefix matches must surface despite 260 noise rows sorting first"
);
assert!(got.iter().all(|m| m.namespace.starts_with("pfx")));
}
#[tokio::test]
async fn trait_update_threads_expires_at_1634() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
let m = test_memory("exp-1634", "expiry-thread fixture body");
store.store(&ctx, &m).await.expect("store");
let want = "2027-01-01T00:00:00+00:00";
let patch = UpdatePatch {
expires_at: Some(want.to_string()),
..Default::default()
};
store.update(&ctx, &m.id, patch).await.expect("update");
let got = store.get(&ctx, &m.id).await.expect("get");
assert_eq!(
got.expires_at.as_deref(),
Some(want),
"#1634: patch.expires_at must reach the row"
);
}
#[tokio::test]
async fn roundtrip_store_get() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("hello", "world one two three four five six seven");
let stored_id = store.store(&ctx, &mem).await.expect("store");
let loaded = store.get(&ctx, &stored_id).await.expect("get");
assert_eq!(loaded.title, "hello");
}
#[tokio::test]
async fn get_missing_returns_not_found() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
let err = store
.get(&ctx, "00000000-0000-0000-0000-000000000000")
.await
.expect_err("should be NotFound");
assert!(matches!(err, StoreError::NotFound { .. }));
}
#[tokio::test]
async fn capabilities_declare_sqlite_reality() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let caps = store.capabilities();
assert!(caps.contains(Capabilities::DURABLE));
assert!(caps.contains(Capabilities::FULLTEXT));
assert!(caps.contains(Capabilities::STRONG_CONSISTENCY));
assert!(!caps.contains(Capabilities::NATIVE_VECTOR));
assert!(!caps.contains(Capabilities::TRANSACTIONS));
assert!(!caps.contains(Capabilities::ATOMIC_MULTI_WRITE));
}
#[tokio::test]
async fn verify_flags_empty_content() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
let mut mem = test_memory("hello", "x content long enough to pass validate");
mem.content = "nonempty for store".to_string();
let id = store.store(&ctx, &mem).await.expect("store");
store
.update(
&ctx,
&id,
UpdatePatch {
metadata: Some(serde_json::json!({})),
..Default::default()
},
)
.await
.expect("update");
let report = store.verify(&ctx, &id).await.expect("verify");
assert!(!report.integrity_ok);
assert!(
report
.findings
.iter()
.any(|f| f.contains("metadata.agent_id"))
);
}
fn fresh_store() -> SqliteStore {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let path = tmp.path().to_path_buf();
std::mem::forget(tmp);
SqliteStore::open(&path).expect("open SqliteStore")
}
#[tokio::test]
async fn schema_version_returns_nonzero_after_open() {
let store = fresh_store();
let v = store.schema_version().await.expect("schema_version");
assert!(v > 0, "expected positive schema_version, got {v}");
}
#[tokio::test]
async fn list_returns_stored_memories() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("listme", "content for list query");
let id = store.store(&ctx, &mem).await.expect("store");
let filter = Filter {
namespace: Some("sal-test".to_string()),
limit: 10,
..Filter::default()
};
let rows = store.list(&ctx, &filter).await.expect("list");
assert!(rows.iter().any(|m| m.id == id), "list omitted stored id");
}
#[tokio::test]
async fn list_default_limit_when_zero() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("default-limit", "needs sufficient content for fts");
store.store(&ctx, &mem).await.expect("store");
let filter = Filter {
namespace: Some("sal-test".to_string()),
limit: 0,
..Filter::default()
};
let rows = store.list(&ctx, &filter).await.expect("list zero-limit");
assert!(
!rows.is_empty(),
"zero-limit should fall back to default 100"
);
}
#[tokio::test]
async fn search_finds_keyword_match() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("searchable", "fts5 token jellyfish for unique grep");
store.store(&ctx, &mem).await.expect("store");
let filter = Filter {
limit: 10,
..Filter::default()
};
let hits = store
.search(&ctx, "jellyfish", &filter)
.await
.expect("search");
assert!(
hits.iter().any(|m| m.title == "searchable"),
"fts search missed the unique token"
);
}
#[tokio::test]
async fn update_missing_returns_not_found() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let err = store
.update(
&ctx,
"11111111-1111-1111-1111-111111111111",
UpdatePatch {
title: Some("never".to_string()),
..Default::default()
},
)
.await
.expect_err("update missing id");
assert!(matches!(err, StoreError::NotFound { .. }));
}
#[tokio::test]
async fn delete_missing_returns_not_found() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let err = store
.delete(&ctx, "22222222-2222-2222-2222-222222222222")
.await
.expect_err("delete missing");
assert!(matches!(err, StoreError::NotFound { .. }));
}
#[tokio::test]
async fn delete_then_get_chain() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("ephemeral", "stored briefly for delete test");
let id = store.store(&ctx, &mem).await.expect("store");
store.delete(&ctx, &id).await.expect("delete existing");
let err = store.get(&ctx, &id).await.expect_err("get after delete");
assert!(matches!(err, StoreError::NotFound { .. }));
}
#[tokio::test]
async fn verify_missing_returns_not_found() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let err = store
.verify(&ctx, "33333333-3333-3333-3333-333333333333")
.await
.expect_err("verify missing");
assert!(matches!(err, StoreError::NotFound { .. }));
}
#[tokio::test]
async fn link_and_list_links_round_trip() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("source-mem", "content for link source");
let b = test_memory("target-mem", "content for link target");
let a_id = store.store(&ctx, &a).await.expect("store a");
let b_id = store.store(&ctx, &b).await.expect("store b");
let link = MemoryLink {
source_id: a_id.clone(),
target_id: b_id.clone(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
store.link(&ctx, &link).await.expect("link insert");
let listed = store.list_links(None).await.expect("list_links");
assert!(
listed
.iter()
.any(|l| l.source_id == a_id && l.target_id == b_id),
"list_links missed the just-inserted row"
);
let same_ns = store
.list_links(Some("sal-test"))
.await
.expect("list_links by ns");
assert!(
same_ns
.iter()
.any(|l| l.source_id == a_id && l.target_id == b_id),
"namespace filter dropped a same-ns link"
);
let missing_ns = store
.list_links(Some("nonexistent"))
.await
.expect("list_links missing ns");
assert!(
!missing_ns
.iter()
.any(|l| l.source_id == a_id && l.target_id == b_id),
"namespace filter must exclude links whose source lives elsewhere"
);
}
#[tokio::test]
async fn get_links_for_anchor_returns_inbound_and_outbound() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("anchor", "central memory for the probe");
let b = test_memory("downstream", "memory that anchor points to");
let c = test_memory("upstream", "memory that points to anchor");
let a_id = store.store(&ctx, &a).await.expect("store anchor");
let b_id = store.store(&ctx, &b).await.expect("store downstream");
let c_id = store.store(&ctx, &c).await.expect("store upstream");
store
.link(
&ctx,
&MemoryLink {
source_id: a_id.clone(),
target_id: b_id.clone(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
},
)
.await
.expect("link a->b");
store
.link(
&ctx,
&MemoryLink {
source_id: c_id.clone(),
target_id: a_id.clone(),
relation: crate::models::MemoryLinkRelation::Contradicts,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
},
)
.await
.expect("link c->a");
let edges = store
.get_links_for_anchor(&a_id)
.await
.expect("get_links_for_anchor");
assert_eq!(edges.len(), 2, "expected exactly 2 edges for the anchor");
assert!(
edges
.iter()
.any(|l| l.source_id == a_id && l.target_id == b_id),
"missing outbound edge anchor->downstream"
);
assert!(
edges
.iter()
.any(|l| l.source_id == c_id && l.target_id == a_id),
"missing inbound edge upstream->anchor"
);
}
#[tokio::test]
async fn get_links_for_anchor_empty_for_unlinked_id() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let m = test_memory("alone", "no edges from or to this memory");
let id = store.store(&ctx, &m).await.expect("store");
let edges = store
.get_links_for_anchor(&id)
.await
.expect("get_links_for_anchor on unlinked id");
assert!(edges.is_empty(), "unlinked id must yield empty vec");
}
#[tokio::test]
async fn get_links_for_anchor_projects_attest_level_and_temporal() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("anchor-temp", "anchor for temporal-fields probe");
let b = test_memory("target-temp", "target for temporal-fields probe");
let a_id = store.store(&ctx, &a).await.expect("store a");
let b_id = store.store(&ctx, &b).await.expect("store b");
{
let conn = store.state.lock().await;
conn.execute(
"INSERT INTO memory_links (source_id, target_id, relation, created_at,
valid_from, valid_until, observed_by, attest_level)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
&a_id,
&b_id,
"related_to",
chrono::Utc::now().to_rfc3339(),
"2026-01-01T00:00:00Z",
"2026-12-31T23:59:59Z",
"ai:tester@host",
"unsigned",
],
)
.expect("temporal insert");
}
let edges = store
.get_links_for_anchor(&a_id)
.await
.expect("get_links_for_anchor");
let row = edges
.iter()
.find(|l| l.source_id == a_id && l.target_id == b_id)
.expect("just-inserted edge");
assert_eq!(row.valid_from.as_deref(), Some("2026-01-01T00:00:00Z"));
assert_eq!(row.valid_until.as_deref(), Some("2026-12-31T23:59:59Z"));
assert_eq!(row.observed_by.as_deref(), Some("ai:tester@host"));
assert_eq!(row.attest_level.as_deref(), Some("unsigned"));
}
#[tokio::test]
async fn link_signed_unsigned_falls_through() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("ls-a", "content for ls a");
let b = test_memory("ls-b", "content for ls b");
let a_id = store.store(&ctx, &a).await.expect("a");
let b_id = store.store(&ctx, &b).await.expect("b");
let link = MemoryLink {
source_id: a_id,
target_id: b_id,
relation: crate::models::MemoryLinkRelation::Supersedes,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
let attest = store
.link_signed(&ctx, &link, None)
.await
.expect("link_signed unsigned path");
assert_eq!(attest, "unsigned");
}
#[tokio::test]
async fn register_agent_then_is_registered() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let agent = AgentRegistration {
agent_id: "ai:tester@host".to_string(),
agent_type: "ai".to_string(),
capabilities: vec!["memory.read".to_string()],
registered_at: chrono::Utc::now().to_rfc3339(),
last_seen_at: chrono::Utc::now().to_rfc3339(),
};
store
.register_agent(&ctx, &agent)
.await
.expect("register_agent");
let yes = store
.is_registered_agent("ai:tester@host")
.await
.expect("is_registered yes");
assert!(yes, "registered agent must be detected");
let no = store
.is_registered_agent("ai:unknown@host")
.await
.expect("is_registered no");
assert!(!no, "unknown agent must be unregistered");
}
#[tokio::test]
async fn list_memories_updated_since_no_filter() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("since-test", "content for since-query test");
store.store(&ctx, &mem).await.expect("store");
let all = store
.list_memories_updated_since(None, 100)
.await
.expect("list_since none");
assert!(
all.iter().any(|m| m.title == "since-test"),
"no-since filter must return all memories"
);
}
#[tokio::test]
async fn apply_remote_memory_is_idempotent() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("remote", "remote content for apply path");
let id1 = store
.apply_remote_memory(&ctx, &mem)
.await
.expect("apply 1");
let id2 = store
.apply_remote_memory(&ctx, &mem)
.await
.expect("apply 2 idempotent");
assert_eq!(id1, id2, "insert_if_newer must be idempotent on same row");
}
#[tokio::test]
#[allow(clippy::await_holding_lock)] async fn apply_remote_link_attest_threading() {
let _gate = crate::config::lock_permissions_mode_for_test();
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("rl-a", "content rl a");
let b = test_memory("rl-b", "content rl b");
let a_id = store.store(&ctx, &a).await.expect("a");
let b_id = store.store(&ctx, &b).await.expect("b");
let link = MemoryLink {
source_id: a_id,
target_id: b_id,
relation: crate::models::MemoryLinkRelation::DerivedFrom,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
store
.apply_remote_link(&ctx, &link, "unsigned")
.await
.expect("apply_remote_link");
}
#[tokio::test]
async fn apply_remote_deletion_returns_false_for_missing() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let gone = store
.apply_remote_deletion(&ctx, "44444444-4444-4444-4444-444444444444")
.await
.expect("apply_remote_deletion missing");
assert!(
!gone,
"apply_remote_deletion must return false for missing id"
);
}
#[tokio::test]
async fn recall_hybrid_keyword_fallback_no_embedding() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory(
"recall-target",
"indigo elephant chess fts5 token recall test",
);
store.store(&ctx, &mem).await.expect("store");
let filter = Filter {
limit: 10,
..Filter::default()
};
let hits = store
.recall_hybrid(&ctx, "elephant", None, &filter)
.await
.expect("recall_hybrid keyword fallback");
assert!(
!hits.is_empty(),
"recall_hybrid keyword fallback returned nothing"
);
assert!(hits[0].1 > 0.0, "score must be positive");
}
#[tokio::test]
async fn touch_after_recall_is_noop_on_empty_ids() {
let store = fresh_store();
store
.touch_after_recall(&[])
.await
.expect("touch_after_recall empty");
}
#[tokio::test]
async fn touch_after_recall_warn_path_on_missing_id() {
let store = fresh_store();
let unknown = vec!["55555555-5555-5555-5555-555555555555".to_string()];
store
.touch_after_recall(&unknown)
.await
.expect("touch must tolerate unknown ids");
}
#[tokio::test]
async fn forget_invalid_input_without_filter() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let err = store
.forget(&ctx, None, None, None, false)
.await
.expect_err("forget without filter");
assert!(matches!(err, StoreError::InvalidInput { .. }));
}
#[tokio::test]
async fn forget_by_namespace_succeeds_even_on_empty() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let n = store
.forget(&ctx, Some("nonexistent-ns"), None, None, false)
.await
.expect("forget by ns");
assert_eq!(n, 0);
}
#[tokio::test]
async fn run_gc_returns_zero_on_empty_db() {
let store = fresh_store();
let n = store.run_gc(false).await.expect("gc empty");
assert_eq!(n, 0);
}
#[tokio::test]
async fn archive_purge_zero_threshold_purges_all() {
let store = fresh_store();
let admin = CallerContext::for_admin("ops:admin");
let n = store
.archive_purge(&admin, Some(0))
.await
.expect("archive_purge");
assert_eq!(n, 0);
let n = store
.archive_purge(&admin, None)
.await
.expect("archive_purge all");
assert_eq!(n, 0);
}
#[tokio::test]
async fn archive_by_ids_is_zero_for_unknown_ids() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let moved = store
.archive_by_ids(
&ctx,
&["66666666-6666-6666-6666-666666666666".to_string()],
Some("manual"),
)
.await
.expect("archive_by_ids unknown");
assert_eq!(moved, 0);
}
#[tokio::test]
async fn archive_restore_returns_false_for_missing() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let restored = store
.archive_restore(&ctx, "77777777-7777-7777-7777-777777777777")
.await
.expect("archive_restore missing");
assert!(!restored);
}
#[tokio::test]
async fn export_memories_and_links_round_trip() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("export-me", "content for export round trip");
store.store(&ctx, &mem).await.expect("store");
let memories = store.export_memories().await.expect("export_memories");
assert!(memories.iter().any(|m| m.title == "export-me"));
let links = store.export_links().await.expect("export_links");
assert!(links.is_empty() || links.iter().all(|l| !l.source_id.is_empty()));
}
#[tokio::test]
async fn build_namespace_chain_includes_self() {
let store = fresh_store();
let chain = store
.build_namespace_chain("project/foo")
.await
.expect("build_namespace_chain");
assert!(
chain.iter().any(|s| s == "project/foo"),
"chain must include leaf, got {chain:?}"
);
}
#[tokio::test]
async fn resolve_governance_policy_none_on_fresh_db() {
let store = fresh_store();
let policy = store
.resolve_governance_policy("any/ns")
.await
.expect("resolve_governance_policy");
assert!(policy.is_none(), "fresh DB must have no policy");
}
#[tokio::test]
async fn enforce_governance_action_allow_on_fresh_db() {
let store = fresh_store();
let decision = store
.enforce_governance_action(
super::super::GovernedAction::Store,
"free-ns",
"alice",
None,
None,
&serde_json::json!({}),
)
.await
.expect("enforce_governance_action");
assert!(matches!(decision, crate::models::GovernanceDecision::Allow));
}
#[tokio::test]
async fn get_namespace_standard_none_initially() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let std_row = store
.get_namespace_standard(&ctx, "no-such-ns")
.await
.expect("get_namespace_standard");
assert!(std_row.is_none());
}
#[tokio::test]
async fn set_then_get_then_clear_namespace_standard() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let std_mem = test_memory("std-doc", "documentation for ns standard");
let std_id = store.store(&ctx, &std_mem).await.expect("store std");
store
.set_namespace_standard(&ctx, "ns/with/standard", &std_id, None)
.await
.expect("set_namespace_standard");
let got = store
.get_namespace_standard(&ctx, "ns/with/standard")
.await
.expect("get_namespace_standard");
assert_eq!(got.as_ref().map(|(s, _)| s.as_str()), Some(std_id.as_str()));
let removed = store
.clear_namespace_standard(&ctx, "ns/with/standard")
.await
.expect("clear_namespace_standard");
assert!(removed);
let after = store
.get_namespace_standard(&ctx, "ns/with/standard")
.await
.expect("get after clear");
assert!(after.is_none());
}
#[tokio::test]
async fn quota_status_auto_inserts_default_row() {
let store = fresh_store();
let q = store
.quota_status("ai:quota-test")
.await
.expect("quota_status");
assert_eq!(q.agent_id, "ai:quota-test");
}
#[tokio::test]
async fn quota_status_list_returns_inserted_row() {
let store = fresh_store();
let _ = store.quota_status("ai:listed").await.expect("seed");
let rows = store.quota_status_list().await.expect("quota_status_list");
assert!(rows.iter().any(|r| r.agent_id == "ai:listed"));
}
#[tokio::test]
async fn verify_link_rejects_missing_filter() {
let store = fresh_store();
let filter = VerifyFilter::default();
let err = store
.verify_link(filter)
.await
.expect_err("verify_link without source/link_id");
assert!(matches!(err, StoreError::InvalidInput { .. }));
}
#[tokio::test]
async fn verify_link_rejects_malformed_link_id() {
let store = fresh_store();
let filter = VerifyFilter {
link_id: Some("notatriple".to_string()),
..Default::default()
};
let err = store
.verify_link(filter)
.await
.expect_err("verify_link malformed link_id");
assert!(matches!(err, StoreError::InvalidInput { .. }));
}
#[tokio::test]
async fn verify_link_resolves_unsigned_link() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("vl-a", "content for vl a");
let b = test_memory("vl-b", "content for vl b");
let a_id = store.store(&ctx, &a).await.expect("a");
let b_id = store.store(&ctx, &b).await.expect("b");
let link = MemoryLink {
source_id: a_id.clone(),
target_id: b_id.clone(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
store.link(&ctx, &link).await.expect("insert link");
let report = store
.verify_link(VerifyFilter {
source_id: Some(a_id.clone()),
target_id: Some(b_id.clone()),
link_id: None,
})
.await
.expect("verify_link");
assert_eq!(report.source_id, a_id);
assert_eq!(report.target_id, b_id);
assert!(report.verified);
assert!(!report.signature_present);
assert_eq!(report.attest_level, "unsigned");
}
#[tokio::test]
async fn verify_link_source_only_resolves_first_outbound() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("solo-source", "content for solo source");
let b = test_memory("solo-target", "content for solo target");
let a_id = store.store(&ctx, &a).await.expect("a");
let b_id = store.store(&ctx, &b).await.expect("b");
let link = MemoryLink {
source_id: a_id.clone(),
target_id: b_id,
relation: crate::models::MemoryLinkRelation::Supersedes,
created_at: chrono::Utc::now().to_rfc3339(),
valid_from: None,
valid_until: None,
observed_by: None,
signature: None,
attest_level: None,
};
store.link(&ctx, &link).await.expect("link");
let report = store
.verify_link(VerifyFilter {
source_id: Some(a_id),
..Default::default()
})
.await
.expect("source-only verify_link");
assert!(report.verified);
}
#[tokio::test]
async fn find_paths_returns_empty_for_unknown_endpoints() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let paths = store
.find_paths(
&ctx,
"88888888-8888-8888-8888-888888888888",
"99999999-9999-9999-9999-999999999999",
None,
None,
)
.await
.expect("find_paths");
assert!(paths.is_empty());
}
#[tokio::test]
async fn notify_creates_inbox_row() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let id = store
.notify(
&ctx,
"ai:notify-target",
"hello",
"payload body",
None,
None,
)
.await
.expect("notify");
let mem = store.get(&ctx, &id).await.expect("get notify");
assert_eq!(mem.namespace, "_inbox/ai:notify-target");
assert!(mem.tags.iter().any(|t| t == "notify"));
}
#[tokio::test]
async fn consolidate_round_trips_two_sources() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let a = test_memory("consolidate-source-a", "content a one two three four");
let b = test_memory("consolidate-source-b", "content b one two three four");
let a_id = store.store(&ctx, &a).await.expect("store a");
let b_id = store.store(&ctx, &b).await.expect("store b");
let consolidated_id = store
.consolidate(
&ctx,
&[a_id, b_id],
"merged-title",
"merged summary content for the consolidator",
"sal-test",
&Tier::Mid,
"consolidate-test",
"alice",
)
.await
.expect("consolidate two sources");
let mem = store
.get(&ctx, &consolidated_id)
.await
.expect("get consolidated");
assert_eq!(mem.title, "merged-title");
}
#[tokio::test]
async fn begin_transaction_stays_unsupported_1643() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let store = SqliteStore::open(tmp.path()).expect("open");
let ctx = CallerContext::for_agent("alice");
let err = match store.begin_transaction(&ctx).await {
Ok(_) => panic!("begin_transaction must be unsupported"),
Err(e) => e,
};
assert!(
matches!(err, StoreError::UnsupportedCapability { .. }),
"got: {err:?}"
);
}
#[tokio::test]
async fn store_path_accessor_returns_open_path() {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let path = tmp.path().to_path_buf();
let store = SqliteStore::open(&path).expect("open");
assert_eq!(store.path(), path.as_path());
}
#[tokio::test]
async fn pending_decide_false_when_no_row_matches() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let res = store
.pending_decide(&ctx, "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", true, "alice")
.await
.expect("pending_decide miss");
assert!(!res, "pending_decide must return false for unknown id");
}
#[tokio::test]
async fn get_pending_returns_none_for_unknown() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let row = store
.get_pending(&ctx, "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb")
.await
.expect("get_pending miss");
assert!(row.is_none());
}
#[tokio::test]
async fn list_namespaces_groups_and_orders_by_count() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
for (ns, n) in &[("alpha", 3usize), ("beta", 1usize), ("gamma", 2usize)] {
for i in 0..*n {
let mut m =
test_memory(&format!("{ns}-{i}"), "content body for the namespace probe");
m.namespace = (*ns).to_string();
store.store(&ctx, &m).await.expect("store");
}
}
let rows = store.list_namespaces().await.expect("list_namespaces");
let alpha = rows.iter().find(|r| r.namespace == "alpha").expect("alpha");
let beta = rows.iter().find(|r| r.namespace == "beta").expect("beta");
let gamma = rows.iter().find(|r| r.namespace == "gamma").expect("gamma");
assert_eq!(alpha.count, 3);
assert_eq!(beta.count, 1);
assert_eq!(gamma.count, 2);
let alpha_pos = rows
.iter()
.position(|r| r.namespace == "alpha")
.expect("alpha pos");
let beta_pos = rows
.iter()
.position(|r| r.namespace == "beta")
.expect("beta pos");
assert!(
alpha_pos < beta_pos,
"expected alpha (count=3) before beta (count=1)"
);
}
#[tokio::test]
async fn list_namespaces_empty_store_returns_empty_vec() {
let store = fresh_store();
let rows = store
.list_namespaces()
.await
.expect("list_namespaces on empty store");
assert!(rows.is_empty(), "empty store must yield empty vec");
}
#[tokio::test]
async fn get_taxonomy_assembles_hierarchical_tree() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
for (ns, n) in &[
("alphaone", 1usize),
("alphaone/team", 2usize),
("alphaone/team/secrets", 1usize),
] {
for i in 0..*n {
let mut m = test_memory(&format!("{ns}-{i}"), "taxonomy fixture body content");
m.namespace = (*ns).to_string();
store.store(&ctx, &m).await.expect("store");
}
}
let tax = store
.get_taxonomy(Some("alphaone"), 8, 100)
.await
.expect("get_taxonomy");
assert_eq!(tax.total_count, 4, "total prefix count");
assert_eq!(tax.tree.namespace, "alphaone");
assert_eq!(tax.tree.subtree_count, 4);
assert!(!tax.truncated);
}
#[tokio::test]
async fn get_taxonomy_empty_prefix_yields_empty_total() {
let store = fresh_store();
let tax = store
.get_taxonomy(Some("nonexistent"), 8, 100)
.await
.expect("get_taxonomy");
assert_eq!(tax.total_count, 0);
assert!(tax.tree.children.is_empty());
}
#[tokio::test]
async fn list_agents_roundtrip_through_register() {
let store = fresh_store();
let ctx = CallerContext::for_agent("daemon");
let agent = AgentRegistration {
agent_id: "ai:tester@host".to_string(),
agent_type: "test".to_string(),
capabilities: vec!["recall".to_string(), "store".to_string()],
registered_at: String::new(),
last_seen_at: String::new(),
};
store
.register_agent(&ctx, &agent)
.await
.expect("register_agent");
let listed = store.list_agents().await.expect("list_agents");
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].agent_id, "ai:tester@host");
assert_eq!(listed[0].agent_type, "test");
assert!(listed[0].capabilities.contains(&"recall".to_string()));
assert!(!listed[0].registered_at.is_empty());
}
#[tokio::test]
async fn list_agents_empty_store_returns_empty_vec() {
let store = fresh_store();
let listed = store.list_agents().await.expect("list_agents");
assert!(listed.is_empty());
}
#[tokio::test]
async fn list_pending_actions_filters_by_status() {
use crate::models::GovernedAction;
let store = fresh_store();
{
let conn = store.state.lock().await;
db::queue_pending_action(
&conn,
GovernedAction::Store,
"ns",
None,
"alice",
&serde_json::json!({"title":"t","content":"c"}),
)
.expect("queue 1");
db::queue_pending_action(
&conn,
GovernedAction::Store,
"ns",
None,
"bob",
&serde_json::json!({"title":"t2","content":"c2"}),
)
.expect("queue 2");
}
let all = store
.list_pending_actions(None, 100)
.await
.expect("list all");
assert_eq!(all.len(), 2);
let pending = store
.list_pending_actions(Some("pending"), 100)
.await
.expect("list pending");
assert_eq!(pending.len(), 2, "both rows start pending");
let approved = store
.list_pending_actions(Some("approved"), 100)
.await
.expect("list approved");
assert!(approved.is_empty(), "no approved rows yet");
}
#[tokio::test]
async fn entity_get_by_alias_resolves_canonical_record() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mut m = test_memory("alphaone-co", "company entity row body fixture");
m.namespace = "alphaone".to_string();
m.metadata = serde_json::json!({
"kind": "entity",
"agent_id": "alice",
});
let id = store.store(&ctx, &m).await.expect("store");
{
let conn = store.state.lock().await;
conn.execute(
"INSERT INTO entity_aliases (entity_id, alias, created_at) VALUES (?1, ?2, ?3)",
rusqlite::params![&id, "AlphaOne", chrono::Utc::now().to_rfc3339()],
)
.expect("insert alias");
}
let rec = store
.entity_get_by_alias("AlphaOne", Some("alphaone"))
.await
.expect("entity_get_by_alias");
let rec = rec.expect("entity must resolve");
assert_eq!(rec.entity_id, id);
assert_eq!(rec.canonical_name, "alphaone-co");
assert_eq!(rec.namespace, "alphaone");
assert!(rec.aliases.iter().any(|a| a == "AlphaOne"));
}
#[tokio::test]
async fn entity_get_by_alias_returns_none_for_unknown() {
let store = fresh_store();
let rec = store
.entity_get_by_alias("never-registered", None)
.await
.expect("entity_get_by_alias miss");
assert!(rec.is_none());
}
#[tokio::test]
async fn entity_get_by_alias_empty_alias_returns_none() {
let store = fresh_store();
let rec = store
.entity_get_by_alias(" ", None)
.await
.expect("entity_get_by_alias whitespace");
assert!(rec.is_none());
}
#[tokio::test]
async fn health_check_returns_true_on_open_store() {
let store = fresh_store();
let ok = store.health_check().await.expect("health_check");
assert!(ok);
}
#[tokio::test]
async fn stats_projects_full_shape() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
for i in 0..3 {
let mut m = test_memory(
&format!("title-{i}"),
"stats fixture body content adequate length",
);
m.namespace = "alphaone".to_string();
store.store(&ctx, &m).await.expect("store");
}
let s = store.stats().await.expect("stats");
assert_eq!(s.total, 3);
let alpha = s
.by_namespace
.iter()
.find(|r| r.namespace == "alphaone")
.expect("alphaone in stats.by_namespace");
assert_eq!(alpha.count, 3);
assert!(s.db_size_bytes > 0, "expected non-zero db file size");
}
#[tokio::test]
async fn update_embedding_persists_via_set_embedding() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("with-embed", "embedding-fixture body content");
let id = store.store(&ctx, &mem).await.expect("store");
let vec = vec![0.1_f32, 0.2, 0.3, 0.4];
store
.update_embedding(&ctx, &id, Some(&vec))
.await
.expect("update_embedding");
let conn = store.state.lock().await;
let blob: Vec<u8> = conn
.query_row(
"SELECT embedding FROM memories WHERE id = ?1",
rusqlite::params![&id],
|r| r.get(0),
)
.expect("read embedding");
assert!(!blob.is_empty(), "embedding blob should be populated");
}
#[tokio::test]
async fn find_by_title_namespace_resolves_id() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mem = test_memory("conflict-target", "find_by_title body");
let id = store.store(&ctx, &mem).await.expect("store");
let found = store
.find_by_title_namespace(&mem.title, &mem.namespace)
.await
.expect("find_by_title_namespace");
assert_eq!(found.as_deref(), Some(id.as_str()));
}
#[tokio::test]
async fn find_by_title_namespace_returns_none_for_unknown() {
let store = fresh_store();
let found = store
.find_by_title_namespace("never-stored", "alphaone")
.await
.expect("find_by_title_namespace miss");
assert!(found.is_none());
}
#[tokio::test]
async fn next_versioned_title_first_use_returns_base() {
let store = fresh_store();
let picked = store
.next_versioned_title("My Title", "alphaone")
.await
.expect("next_versioned_title");
assert_eq!(picked, "My Title");
}
#[tokio::test]
async fn next_versioned_title_appends_suffix_on_collision() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mut mem = test_memory("dup-title", "versioned body content");
mem.namespace = "alphaone".to_string();
store.store(&ctx, &mem).await.expect("store");
let picked = store
.next_versioned_title("dup-title", "alphaone")
.await
.expect("next_versioned_title");
assert_eq!(picked, "dup-title (2)");
}
#[tokio::test]
async fn find_contradictions_returns_fts_matches() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mut a = test_memory("rust language semantics", "rust language safety guarantees");
a.namespace = "alphaone".to_string();
let mut b = test_memory(
"completely unrelated cookbook",
"fish stew recipe and instructions",
);
b.namespace = "alphaone".to_string();
store.store(&ctx, &a).await.expect("store a");
store.store(&ctx, &b).await.expect("store b");
let hits = store
.find_contradictions("rust language", "alphaone")
.await
.expect("find_contradictions");
assert!(
hits.iter().any(|m| m.title.contains("rust language")),
"FTS match should surface the rust-language row"
);
assert!(
!hits.iter().any(|m| m.title.contains("cookbook")),
"unrelated row must not appear"
);
}
#[tokio::test]
async fn invalidate_link_marks_found_with_previous_value() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let src = test_memory("src-row", "source memory body content");
let dst = test_memory("dst-row", "destination memory body content");
let src_id = store.store(&ctx, &src).await.expect("store src");
let dst_id = store.store(&ctx, &dst).await.expect("store dst");
let link = crate::models::MemoryLink {
source_id: src_id.clone(),
target_id: dst_id.clone(),
relation: crate::models::MemoryLinkRelation::RelatedTo,
created_at: chrono::Utc::now().to_rfc3339(),
signature: None,
observed_by: None,
valid_from: None,
valid_until: None,
attest_level: None,
};
store.link(&ctx, &link).await.expect("create link");
let row = store
.invalidate_link(&src_id, &dst_id, "related_to", Some("2030-01-01T00:00:00Z"))
.await
.expect("invalidate_link");
assert!(row.found, "link must be marked found");
assert_eq!(row.valid_until, "2030-01-01T00:00:00Z");
assert!(row.previous_valid_until.is_none(), "no prior invalidation");
}
#[tokio::test]
async fn invalidate_link_returns_not_found_for_unknown_triple() {
let store = fresh_store();
let row = store
.invalidate_link("nope-src", "nope-dst", "related_to", None)
.await
.expect("invalidate_link miss");
assert!(!row.found);
assert!(row.valid_until.is_empty());
}
#[tokio::test]
async fn check_duplicate_with_text_exact_content_hash_short_circuits() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mut mem = test_memory("dup-test-title", "dup-test body content");
mem.namespace = "alphaone".to_string();
store.store(&ctx, &mem).await.expect("store");
let query_text = format!("{} {}", mem.title, mem.content);
let check = store
.check_duplicate_with_text(&[], &query_text, Some("alphaone"), 0.8)
.await
.expect("check_duplicate_with_text");
assert!(check.is_duplicate);
let n = check.nearest.expect("nearest must be populated on dup");
assert!((n.similarity - 1.0).abs() < f32::EPSILON);
}
#[tokio::test]
async fn check_duplicate_with_text_no_match_returns_false() {
let store = fresh_store();
let check = store
.check_duplicate_with_text(&[], "no-match text", Some("alphaone"), 0.8)
.await
.expect("check_duplicate_with_text empty");
assert!(!check.is_duplicate);
assert_eq!(check.candidates_scanned, 0);
}
#[tokio::test]
async fn fx_c2_batch5_decide_pending_action_alias_matches_pending_decide() {
use crate::models::GovernedAction;
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let pid = {
let conn = store.state.lock().await;
db::queue_pending_action(
&conn,
GovernedAction::Store,
"ns-decide-alias",
None,
"alice",
&serde_json::json!({"title":"t","content":"c"}),
)
.expect("queue")
};
let result = store
.decide_pending_action(&ctx, &pid, true, "alice")
.await
.expect("decide_pending_action");
assert!(result, "first decide must transition the row");
let second = store
.decide_pending_action(&ctx, &pid, true, "alice")
.await
.expect("decide_pending_action second");
assert!(!second, "already-decided rows must be no-op");
}
#[tokio::test]
async fn fx_c2_batch5_approve_with_approver_type_matches_governance_path() {
use crate::models::GovernedAction;
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let pid = {
let conn = store.state.lock().await;
db::queue_pending_action(
&conn,
GovernedAction::Store,
"ns-approve-alias",
None,
"alice",
&serde_json::json!({"title":"t","content":"c"}),
)
.expect("queue")
};
let outcome = store
.approve_with_approver_type(&ctx, &pid, "approver")
.await
.expect("approve_with_approver_type");
assert!(matches!(outcome, crate::store::ApproveOutcome::Approved));
}
#[tokio::test]
async fn fx_c2_batch5_execute_pending_action_sqlite_override() {
use crate::models::GovernedAction;
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let memory_payload = serde_json::to_value(test_memory("fx-c2-b5-exec", "executed payload"))
.expect("serialize memory");
let pid = {
let conn = store.state.lock().await;
let pid = db::queue_pending_action(
&conn,
GovernedAction::Store,
"alphaone",
None,
"alice",
&memory_payload,
)
.expect("queue");
db::approve_with_approver_type(&conn, &pid, "alice").expect("approve");
pid
};
let executed = store
.execute_pending_action(&ctx, &pid)
.await
.expect("execute_pending_action");
assert!(executed.is_some(), "store action must return a memory id");
}
#[tokio::test]
async fn fx_c2_batch5_kg_query_returns_outbound_neighbors() {
use crate::models::{MemoryLink, MemoryLinkRelation};
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let src = store
.store(&ctx, &test_memory("kg-src", "source body"))
.await
.expect("src");
let dst = store
.store(&ctx, &test_memory("kg-dst", "target body"))
.await
.expect("dst");
let now = chrono::Utc::now().to_rfc3339();
let link = MemoryLink {
source_id: src.clone(),
target_id: dst.clone(),
relation: MemoryLinkRelation::RelatedTo,
created_at: now.clone(),
valid_from: Some(now.clone()),
valid_until: None,
observed_by: Some("alice".to_string()),
attest_level: Some("unsigned".to_string()),
signature: None,
};
store.link(&ctx, &link).await.expect("link");
let rows = store.kg_query(&src, 2, false).await.expect("kg_query");
assert_eq!(rows.len(), 1, "exactly one neighbor expected");
assert_eq!(rows[0].target_id, dst);
assert_eq!(rows[0].depth, 1);
}
#[tokio::test]
async fn fx_c2_batch5_kg_timeline_orders_by_valid_from() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let src = store
.store(&ctx, &test_memory("tl-src", "tl source body"))
.await
.expect("src");
let dst_old = store
.store(&ctx, &test_memory("tl-dst-old", "tl old body"))
.await
.expect("dst-old");
let dst_new = store
.store(&ctx, &test_memory("tl-dst-new", "tl new body"))
.await
.expect("dst-new");
{
let conn = store.state.lock().await;
conn.execute(
"INSERT INTO memory_links \
(source_id, target_id, relation, created_at, valid_from, attest_level) \
VALUES (?1, ?2, 'related_to', ?3, ?4, 'unsigned')",
rusqlite::params![
&src,
&dst_new,
"2030-01-02T00:00:01Z",
"2030-01-02T00:00:00Z"
],
)
.expect("insert new link");
conn.execute(
"INSERT INTO memory_links \
(source_id, target_id, relation, created_at, valid_from, attest_level) \
VALUES (?1, ?2, 'related_to', ?3, ?4, 'unsigned')",
rusqlite::params![
&src,
&dst_old,
"2030-01-01T00:00:01Z",
"2030-01-01T00:00:00Z"
],
)
.expect("insert old link");
}
let events = store
.kg_timeline(&src, None, None, None)
.await
.expect("kg_timeline");
assert_eq!(events.len(), 2, "two timeline events expected");
assert_eq!(events[0].target_id, dst_old, "older event first");
assert_eq!(events[1].target_id, dst_new, "newer event second");
}
#[tokio::test]
async fn fx_c2_batch5_entity_register_creates_new_entity() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let reg = store
.entity_register(
&ctx,
"Acme Corp",
"alphaone-test",
&["ACME".to_string(), "acme-corp".to_string()],
&serde_json::json!({"website":"https://acme.example"}),
Some("alice"),
)
.await
.expect("entity_register");
assert!(reg.created, "first registration must create the entity row");
assert_eq!(reg.canonical_name, "Acme Corp");
assert_eq!(reg.namespace, "alphaone-test");
assert!(reg.aliases.iter().any(|a| a == "ACME"));
}
#[tokio::test]
async fn fx_c2_batch5_entity_register_unions_aliases_on_reregister() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
store
.entity_register(
&ctx,
"BetaCo",
"alphaone-test",
&["beta".to_string()],
&serde_json::json!({}),
Some("alice"),
)
.await
.expect("first");
let reg = store
.entity_register(
&ctx,
"BetaCo",
"alphaone-test",
&["BETA-CORP".to_string()],
&serde_json::json!({}),
Some("alice"),
)
.await
.expect("reregister");
assert!(!reg.created, "re-registration must NOT create a new row");
assert!(reg.aliases.iter().any(|a| a == "beta"));
assert!(reg.aliases.iter().any(|a| a == "BETA-CORP"));
}
#[tokio::test]
async fn fx_c2_batch5_list_archived_returns_archived_rows() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let id = store
.store(&ctx, &test_memory("archived-row", "to be archived"))
.await
.expect("store");
let archived = store
.forget(&ctx, Some("sal-test"), None, None, true)
.await
.expect("forget");
assert!(archived > 0, "forget must archive at least one row");
let listed = store
.list_archived(Some("sal-test"), 100, 0)
.await
.expect("list_archived");
assert_eq!(listed.len(), 1, "one archived row expected");
let row = &listed[0];
assert_eq!(
row.get("id").and_then(|v| v.as_str()),
Some(id.as_str()),
"archived row id must match"
);
}
#[tokio::test]
async fn fx_c2_batch5_list_archived_namespace_filter_excludes_other_tenants() {
let store = fresh_store();
let ctx = CallerContext::for_agent("alice");
let mut m = test_memory("ns-a-row", "body");
m.namespace = "tenant-a".to_string();
store.store(&ctx, &m).await.expect("store-a");
let mut m2 = test_memory("ns-b-row", "body");
m2.namespace = "tenant-b".to_string();
store.store(&ctx, &m2).await.expect("store-b");
store
.forget(&ctx, Some("tenant-a"), None, None, true)
.await
.expect("forget-a");
store
.forget(&ctx, Some("tenant-b"), None, None, true)
.await
.expect("forget-b");
let tenant_a = store
.list_archived(Some("tenant-a"), 100, 0)
.await
.expect("list a");
assert_eq!(tenant_a.len(), 1);
assert_eq!(
tenant_a[0].get("namespace").and_then(|v| v.as_str()),
Some("tenant-a")
);
let global = store.list_archived(None, 100, 0).await.expect("list all");
assert_eq!(global.len(), 2, "global list must surface both tenants");
}
}