#![allow(clippy::too_many_lines)]
use crate::db;
#[cfg(feature = "sal")]
use crate::models::Memory;
use super::AppState;
#[cfg(feature = "sal")]
use super::StorageBackend;
const AUTO_TAG_MIN_CONTENT_LEN: usize = 50;
const AUTO_TAG_MAX_TAGS: usize = 8;
#[cfg(feature = "sal")]
pub(super) async fn get_with_visibility_retry(
store: &dyn crate::store::MemoryStore,
ctx: &crate::store::CallerContext,
id: &str,
) -> crate::store::StoreResult<Memory> {
let mut attempt: u32 = 0;
loop {
match store.get(ctx, id).await {
Ok(m) => return Ok(m),
Err(crate::store::StoreError::NotFound { .. }) if attempt < 4 => {
let backoff_ms = u64::from(5 * (attempt + 1));
tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
attempt += 1;
}
Err(e) => return Err(e),
}
}
}
pub(crate) async fn maybe_auto_tag(
app: &AppState,
title: &str,
content: &str,
operator_tags: &[String],
namespace: &str,
) -> Vec<String> {
if !operator_tags.is_empty() {
return Vec::new();
}
if content.len() < AUTO_TAG_MIN_CONTENT_LEN {
return Vec::new();
}
if namespace.starts_with('_') {
return Vec::new();
}
if app.tier_config.llm_model.is_none() {
return Vec::new();
}
let llm_arc = app.llm.clone();
if llm_arc.is_none() {
return Vec::new();
}
let auto_tag_model = app.auto_tag_model.as_ref().clone();
let title_owned = title.to_string();
let content_owned = content.to_string();
let llm_timeout = app.llm_call_timeout;
let join = tokio::time::timeout(llm_timeout, async move {
let Some(llm) = llm_arc.as_ref() else {
return Ok::<Vec<String>, anyhow::Error>(Vec::new());
};
llm.auto_tag_async(&title_owned, &content_owned, auto_tag_model.as_deref())
.await
})
.await;
match join {
Ok(Ok(tags)) => tags.into_iter().take(AUTO_TAG_MAX_TAGS).collect(),
Ok(Err(e)) => {
tracing::warn!("L5: auto_tag hook failed: {e}");
Vec::new()
}
Err(_) => {
tracing::warn!(
"H8: LLM call (auto_tag) exceeded {}s timeout — falling back to no tags",
llm_timeout.as_secs()
);
Vec::new()
}
}
}
#[allow(dead_code)]
async fn maybe_detect_conflicts(
app: &AppState,
title: &str,
content: &str,
namespace: &str,
request_override: Option<bool>,
) -> Vec<ConflictReport> {
let enabled = match request_override {
Some(b) => b,
None => app.autonomous_hooks,
};
if !enabled
|| content.len() < AUTO_TAG_MIN_CONTENT_LEN
|| namespace.starts_with('_')
|| app.tier_config.llm_model.is_none()
{
return Vec::new();
}
let llm_arc = app.llm.clone();
if llm_arc.is_none() {
return Vec::new();
}
let candidates: Vec<(String, String, String)> =
match fetch_namespace_candidates(app, namespace, title, 8).await {
Ok(v) => v,
Err(e) => {
tracing::warn!("L?: maybe_detect_conflicts candidate fetch failed: {e}");
return Vec::new();
}
};
let llm_timeout = app.llm_call_timeout;
let new_content = content.to_string();
let mut out: Vec<ConflictReport> = Vec::new();
for (cand_id, cand_title, cand_content) in candidates {
let llm_arc_cl = llm_arc.clone();
let cand_content_cl = cand_content.clone();
let new_content_cl = new_content.clone();
let join = tokio::time::timeout(llm_timeout, async move {
let Some(llm) = llm_arc_cl.as_ref() else {
return Ok::<bool, anyhow::Error>(false);
};
llm.detect_contradiction_async(&new_content_cl, &cand_content_cl)
.await
})
.await;
match join {
Ok(Ok(true)) => out.push(ConflictReport {
id: cand_id,
title: cand_title,
suggested_merge: None,
}),
Ok(Ok(false)) => {}
Ok(Err(e)) => tracing::warn!("detect_contradiction LLM error for {cand_id}: {e}"),
Err(_) => tracing::warn!(
"H8: LLM call (detect_contradiction) exceeded {}s timeout for {cand_id} — skipping",
llm_timeout.as_secs()
),
}
}
out
}
#[allow(dead_code)]
async fn fetch_namespace_candidates(
app: &AppState,
namespace: &str,
new_title: &str,
limit: usize,
) -> Result<Vec<(String, String, String)>, String> {
#[cfg(feature = "sal")]
if matches!(app.storage_backend, StorageBackend::Postgres) {
let ctx =
crate::store::CallerContext::for_admin(crate::identity::sentinels::AI_HTTP_INTERNAL);
let filter = crate::store::Filter {
namespace: Some(namespace.to_string()),
limit: limit + 1,
..crate::store::Filter::default()
};
let mems = app
.store
.list(&ctx, &filter)
.await
.map_err(|e| e.to_string())?;
return Ok(mems
.into_iter()
.filter(|m| m.title != new_title)
.take(limit)
.map(|m| (m.id, m.title, m.content))
.collect());
}
let lock = app.db.lock().await;
let mems = db::list(
&lock.0,
Some(namespace),
None,
limit + 1,
0,
None,
None,
None,
None,
None,
)
.map_err(|e| e.to_string())?;
Ok(mems
.into_iter()
.filter(|m| m.title != new_title)
.take(limit)
.map(|m| (m.id, m.title, m.content))
.collect())
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct ConflictReport {
pub id: String,
pub title: String,
pub suggested_merge: Option<String>,
}
#[cfg(test)]
#[allow(clippy::too_many_lines)]
mod cov897_tests {
use super::{
AUTO_TAG_MIN_CONTENT_LEN, ConflictReport, fetch_namespace_candidates, maybe_auto_tag,
maybe_detect_conflicts,
};
use crate::config::{FeatureTier, ResolvedScoring, ResolvedTtl};
use crate::handlers::{AppState, Db, StorageBackend};
use crate::models::{Memory, Tier};
use chrono::Utc;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
use uuid::Uuid;
fn build_app(tier: FeatureTier, autonomous: bool) -> (AppState, tempfile::NamedTempFile) {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let path = tmp.path().to_path_buf();
let _ = crate::db::open(&path).expect("db::open");
let conn = crate::db::open(&path).expect("reopen");
let db: Db = Arc::new(Mutex::new((
conn,
path.clone(),
ResolvedTtl::default(),
true,
)));
#[cfg(feature = "sal")]
let store: Arc<dyn crate::store::MemoryStore> =
Arc::new(crate::store::sqlite::SqliteStore::open(&path).expect("open SqliteStore"));
let app = AppState {
db,
embedder: Arc::new(None),
vector_index: Arc::new(Mutex::new(None)),
federation: Arc::new(None),
tier_config: Arc::new(tier.config()),
scoring: Arc::new(ResolvedScoring::default()),
profile: Arc::new(crate::profile::Profile::core()),
mcp_config: Arc::new(None),
active_keypair: Arc::new(None),
family_embeddings: Arc::new(RwLock::new(Some(Vec::new()))),
storage_backend: StorageBackend::Sqlite,
#[cfg(feature = "sal")]
store,
llm: Arc::new(None),
auto_tag_model: Arc::new(None),
llm_call_timeout: std::time::Duration::from_secs(30),
replay_cache: Arc::new(crate::identity::replay::ReplayCache::default()),
verify_require_nonce: false,
federation_nonce_cache: Arc::new(
crate::identity::replay::FederationNonceCache::default(),
),
autonomous_hooks: autonomous,
recall_scope: Arc::new(None),
deferred_audit_queue: Arc::new(None),
admin_agent_ids: Arc::new(Vec::new()),
rule_cache: std::sync::Arc::new(crate::governance::rule_cache::RuleCache::new()),
resolved_models: std::sync::Arc::new(crate::config::ResolvedModels::default()),
runtime: crate::runtime_context::RuntimeContext::global_arc(),
max_page_size: crate::handlers::MAX_BULK_SIZE,
};
(app, tmp)
}
fn seed_memory(app: &AppState, namespace: &str, title: &str, content: &str) {
let now = Utc::now().to_rfc3339();
let mem = Memory {
id: Uuid::new_v4().to_string(),
title: title.to_string(),
content: content.to_string(),
namespace: namespace.to_string(),
tier: Tier::Mid,
created_at: now.clone(),
updated_at: now,
..Default::default()
};
let lock = app.db.try_lock().expect("uncontended lock for seed");
crate::db::insert(&lock.0, &mem).expect("insert");
}
#[tokio::test]
async fn cov897_maybe_auto_tag_smart_tier_no_llm_arc_short_circuits() {
let (app, _tmp) = build_app(FeatureTier::Smart, false);
let r = maybe_auto_tag(
&app,
"title",
&"x".repeat(AUTO_TAG_MIN_CONTENT_LEN + 10),
&[],
"public-ns",
)
.await;
assert!(
r.is_empty(),
"Smart tier + llm=None must short-circuit, got {r:?}"
);
}
#[tokio::test]
async fn cov897_detect_conflicts_disabled_by_default_returns_empty() {
let (app, _tmp) = build_app(FeatureTier::Smart, false);
let r = maybe_detect_conflicts(
&app,
"t",
&"x".repeat(AUTO_TAG_MIN_CONTENT_LEN + 10),
"ns",
None,
)
.await;
assert!(r.is_empty(), "disabled-by-config returns empty");
}
#[tokio::test]
async fn cov897_detect_conflicts_request_override_false_forces_off() {
let (app, _tmp) = build_app(FeatureTier::Smart, true);
let r = maybe_detect_conflicts(
&app,
"t",
&"x".repeat(AUTO_TAG_MIN_CONTENT_LEN + 10),
"ns",
Some(false),
)
.await;
assert!(r.is_empty(), "override=Some(false) returns empty");
}
#[tokio::test]
async fn cov897_detect_conflicts_short_content_returns_empty() {
let (app, _tmp) = build_app(FeatureTier::Smart, false);
let r = maybe_detect_conflicts(&app, "t", "short", "ns", Some(true)).await;
assert!(r.is_empty(), "short content returns empty");
}
#[tokio::test]
async fn cov897_detect_conflicts_internal_namespace_returns_empty() {
let (app, _tmp) = build_app(FeatureTier::Smart, false);
let r = maybe_detect_conflicts(
&app,
"t",
&"x".repeat(AUTO_TAG_MIN_CONTENT_LEN + 10),
"_internal",
Some(true),
)
.await;
assert!(r.is_empty(), "internal namespace returns empty");
}
#[tokio::test]
async fn cov897_detect_conflicts_no_llm_model_returns_empty() {
let (app, _tmp) = build_app(FeatureTier::Keyword, false);
let r = maybe_detect_conflicts(
&app,
"t",
&"x".repeat(AUTO_TAG_MIN_CONTENT_LEN + 10),
"ns",
Some(true),
)
.await;
assert!(r.is_empty(), "no llm_model returns empty");
}
#[tokio::test]
async fn cov897_detect_conflicts_smart_tier_no_llm_arc_returns_empty() {
let (app, _tmp) = build_app(FeatureTier::Smart, false);
let r = maybe_detect_conflicts(
&app,
"t",
&"x".repeat(AUTO_TAG_MIN_CONTENT_LEN + 10),
"ns",
Some(true),
)
.await;
assert!(r.is_empty(), "Smart tier + llm=None returns empty");
}
#[tokio::test]
async fn cov897_fetch_candidates_empty_namespace_returns_empty() {
let (app, _tmp) = build_app(FeatureTier::Keyword, false);
let out = fetch_namespace_candidates(&app, "empty-ns", "new-title", 8)
.await
.expect("sqlite list succeeds on empty db");
assert!(out.is_empty(), "empty namespace returns no candidates");
}
#[tokio::test]
async fn cov897_fetch_candidates_filters_byte_equal_title() {
let (app, _tmp) = build_app(FeatureTier::Keyword, false);
seed_memory(&app, "ns-cand", "alpha", "content-alpha");
seed_memory(&app, "ns-cand", "beta", "content-beta");
seed_memory(&app, "ns-cand", "gamma", "content-gamma");
let out = fetch_namespace_candidates(&app, "ns-cand", "beta", 8)
.await
.expect("sqlite list succeeds");
let titles: Vec<&str> = out.iter().map(|(_, t, _)| t.as_str()).collect();
assert_eq!(out.len(), 2, "filters byte-equal title, got {titles:?}");
assert!(titles.contains(&"alpha"), "alpha present in {titles:?}");
assert!(titles.contains(&"gamma"), "gamma present in {titles:?}");
assert!(!titles.contains(&"beta"), "beta filtered from {titles:?}");
}
#[tokio::test]
async fn cov897_fetch_candidates_honors_limit() {
let (app, _tmp) = build_app(FeatureTier::Keyword, false);
for i in 0..5 {
seed_memory(
&app,
"ns-limit",
&format!("title-{i}"),
&format!("content-{i}"),
);
}
let out = fetch_namespace_candidates(&app, "ns-limit", "no-match", 2)
.await
.expect("sqlite list succeeds");
assert_eq!(out.len(), 2, "limit honored");
}
#[test]
fn cov897_conflict_report_serializes_to_pinned_wire_shape() {
let r = ConflictReport {
id: "mem-id-123".to_string(),
title: "conflicting title".to_string(),
suggested_merge: None,
};
let v = serde_json::to_value(&r).expect("serialize");
assert_eq!(v["id"], "mem-id-123");
assert_eq!(v["title"], "conflicting title");
assert!(
v[crate::models::field_names::SUGGESTED_MERGE].is_null(),
"None ⇒ null on the wire"
);
}
}