use crate::memory_core::palace::Drawer;
use anyhow::{Context, Result, anyhow};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct SemanticConsolidationConfig {
pub enabled: bool,
pub model: String,
pub similarity_threshold: f32,
pub max_batch_size: usize,
pub max_calls_per_cycle: usize,
}
impl Default for SemanticConsolidationConfig {
fn default() -> Self {
Self {
enabled: true,
model: "anthropic/claude-haiku-4-5".to_string(),
similarity_threshold: 0.75,
max_batch_size: 8,
max_calls_per_cycle: 20,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum ConsolidationAction {
Alias { from: String, to: String },
Merge {
canonical_content: String,
superseded_ids: Vec<Uuid>,
},
Flag { drawer_id: Uuid, reason: String },
}
#[derive(Debug, Default)]
pub struct ConsolidationResult {
pub canonical_drawers: Vec<CanonicalDrawer>,
pub aliases: Vec<(String, String)>,
pub superseded_ids: Vec<Uuid>,
pub flagged_ids: Vec<(Uuid, String)>,
pub llm_calls: usize,
pub cache_hits: usize,
}
#[derive(Debug, Clone)]
pub struct CanonicalDrawer {
pub content: String,
pub importance: f32,
pub tags: Vec<String>,
pub canonical_for: Vec<Uuid>,
}
#[async_trait]
pub trait Inference: Send + Sync {
fn name(&self) -> &str;
async fn consolidate(&self, drawers: &[Drawer]) -> Result<Vec<ConsolidationAction>>;
}
pub fn inference_available(openrouter_api_key: &str, local_model_enabled: bool) -> bool {
if !openrouter_api_key.trim().is_empty() {
return true;
}
if local_model_enabled {
return true;
}
let env_key = std::env::var("OPENROUTER_API_KEY").unwrap_or_default();
!env_key.trim().is_empty()
}
pub struct OpenRouterInference {
api_key: String,
model: String,
}
impl OpenRouterInference {
pub fn new(api_key: impl Into<String>, model: impl Into<String>) -> Self {
Self {
api_key: api_key.into(),
model: model.into(),
}
}
}
const OPENROUTER_COMPLETIONS_URL: &str = "https://openrouter.ai/api/v1/chat/completions";
const CONSOLIDATION_PROMPT_SYSTEM: &str = r#"You are a knowledge consolidation assistant for a personal memory system. Given a batch of memory entries (drawers), identify:
1. Aliases: different names for the same concept (e.g. "ts" = "trusty-search")
2. Merge candidates: closely related facts that should be one canonical summary
3. Contradictions: entries that conflict (flag for human review; do NOT auto-resolve)
Return a JSON array of actions. Each action MUST have an "action" field.
Valid action types:
- {"action": "alias", "from": "<term>", "to": "<canonical_term>"}
- {"action": "merge", "canonical_content": "<single best summary>", "superseded_ids": ["<uuid>", ...]}
- {"action": "flag", "drawer_id": "<uuid>", "reason": "<why contradictory>"}
Rules:
- Be conservative: only merge if the entries express the SAME fact.
- Preserve nuance: if entries are related but distinct, do NOT merge.
- Return an empty array [] if no consolidation is warranted.
- The canonical_content for a merge MUST be a complete, standalone sentence or paragraph.
- Return ONLY the JSON array, no other text."#;
fn build_consolidation_prompt(drawers: &[Drawer]) -> String {
let mut lines = Vec::new();
for d in drawers {
lines.push(format!("ID: {}\nContent: {}\n", d.id, d.content));
}
lines.join("---\n")
}
#[derive(Deserialize)]
struct OpenAiChatResponse {
choices: Vec<OpenAiChoice>,
}
#[derive(Deserialize)]
struct OpenAiChoice {
message: OpenAiMessage,
}
#[derive(Deserialize)]
struct OpenAiMessage {
content: Option<String>,
}
#[async_trait]
impl Inference for OpenRouterInference {
fn name(&self) -> &str {
"openrouter"
}
async fn consolidate(&self, drawers: &[Drawer]) -> Result<Vec<ConsolidationAction>> {
if self.api_key.is_empty() {
return Err(anyhow!("OpenRouter API key is empty"));
}
if drawers.is_empty() {
return Ok(vec![]);
}
let user_content = build_consolidation_prompt(drawers);
let messages = vec![
serde_json::json!({"role": "system", "content": CONSOLIDATION_PROMPT_SYSTEM}),
serde_json::json!({"role": "user", "content": user_content}),
];
let body = serde_json::json!({
"model": self.model,
"messages": messages,
"stream": false,
});
let client = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(120))
.build()
.context("build reqwest client for OpenRouterInference")?;
let resp = client
.post(OPENROUTER_COMPLETIONS_URL)
.bearer_auth(&self.api_key)
.header("HTTP-Referer", "https://github.com/bobmatnyc/trusty-tools")
.header("X-Title", "trusty-memory")
.json(&body)
.send()
.await
.context("POST OpenRouter consolidation")?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(anyhow!("OpenRouter HTTP {status}: {text}"));
}
let payload: OpenAiChatResponse = resp
.json()
.await
.context("parse OpenRouter consolidation response")?;
let raw_content = payload
.choices
.into_iter()
.next()
.and_then(|c| c.message.content)
.unwrap_or_default();
parse_consolidation_actions(&raw_content)
}
}
pub struct OllamaInference {
base_url: String,
model: String,
}
impl OllamaInference {
pub fn new(base_url: impl Into<String>, model: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
model: model.into(),
}
}
}
#[async_trait]
impl Inference for OllamaInference {
fn name(&self) -> &str {
"ollama"
}
async fn consolidate(&self, drawers: &[Drawer]) -> Result<Vec<ConsolidationAction>> {
if drawers.is_empty() {
return Ok(vec![]);
}
let user_content = build_consolidation_prompt(drawers);
let messages = vec![
serde_json::json!({"role": "system", "content": CONSOLIDATION_PROMPT_SYSTEM}),
serde_json::json!({"role": "user", "content": user_content}),
];
let body = serde_json::json!({
"model": self.model,
"messages": messages,
"stream": false,
});
let url = format!(
"{}/v1/chat/completions",
self.base_url.trim_end_matches('/')
);
let client = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(5))
.timeout(std::time::Duration::from_secs(120))
.build()
.context("build reqwest client for OllamaInference")?;
let resp = client
.post(&url)
.json(&body)
.send()
.await
.with_context(|| format!("POST {url}"))?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.unwrap_or_default();
return Err(anyhow!("Ollama HTTP {status}: {text}"));
}
let payload: OpenAiChatResponse = resp
.json()
.await
.context("parse Ollama consolidation response")?;
let raw_content = payload
.choices
.into_iter()
.next()
.and_then(|c| c.message.content)
.unwrap_or_default();
parse_consolidation_actions(&raw_content)
}
}
pub struct MockInference {
pub fixture_actions: Vec<ConsolidationAction>,
pub call_count: std::sync::Arc<std::sync::atomic::AtomicUsize>,
}
impl MockInference {
pub fn new(fixture_actions: Vec<ConsolidationAction>) -> Self {
Self {
fixture_actions,
call_count: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
}
}
pub fn no_op() -> Self {
Self::new(vec![])
}
}
#[async_trait]
impl Inference for MockInference {
fn name(&self) -> &str {
"mock"
}
async fn consolidate(&self, _drawers: &[Drawer]) -> Result<Vec<ConsolidationAction>> {
self.call_count
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(self.fixture_actions.clone())
}
}
pub fn parse_consolidation_actions(raw: &str) -> Result<Vec<ConsolidationAction>> {
let stripped = raw
.trim()
.trim_start_matches("```json")
.trim_start_matches("```")
.trim_end_matches("```")
.trim();
let start = stripped.find('[').unwrap_or(0);
let end = stripped.rfind(']').map(|i| i + 1).unwrap_or(stripped.len());
if start >= end {
return Ok(vec![]);
}
let json_slice = &stripped[start..end];
match serde_json::from_str::<Vec<ConsolidationAction>>(json_slice) {
Ok(actions) => Ok(actions),
Err(e) => {
tracing::debug!(
raw = json_slice,
error = %e,
"failed to parse consolidation actions; treating as empty"
);
Ok(vec![])
}
}
}
type ConsolidationCache = HashMap<String, Vec<ConsolidationAction>>;
pub struct SemanticConsolidator {
inference: Arc<dyn Inference>,
pub config: SemanticConsolidationConfig,
cache: parking_lot::Mutex<ConsolidationCache>,
}
impl SemanticConsolidator {
pub fn new(inference: Arc<dyn Inference>, config: SemanticConsolidationConfig) -> Self {
Self {
inference,
config,
cache: parking_lot::Mutex::new(HashMap::new()),
}
}
pub async fn consolidate(&self, drawers: &[Drawer]) -> ConsolidationResult {
let mut result = ConsolidationResult::default();
let mut calls_made = 0usize;
for batch in drawers.chunks(self.config.max_batch_size) {
if calls_made >= self.config.max_calls_per_cycle {
tracing::debug!(
budget = self.config.max_calls_per_cycle,
"semantic consolidation call budget exhausted"
);
break;
}
let cache_key = batch_cache_key(batch);
let cached = {
let guard = self.cache.lock();
guard.get(&cache_key).cloned()
};
let actions = if let Some(actions) = cached {
result.cache_hits += 1;
tracing::debug!(key = %cache_key, "semantic consolidation cache hit");
actions
} else {
match self.inference.consolidate(batch).await {
Ok(actions) => {
calls_made += 1;
result.llm_calls += 1;
self.cache.lock().insert(cache_key, actions.clone());
actions
}
Err(e) => {
tracing::warn!(
error = %e,
"semantic consolidation LLM call failed; skipping batch"
);
calls_made += 1; vec![]
}
}
};
apply_actions_to_result(actions, batch, &mut result);
}
result
}
}
fn batch_cache_key(batch: &[Drawer]) -> String {
use sha2::Digest;
use std::collections::BTreeMap;
let sorted: BTreeMap<Uuid, &str> = batch.iter().map(|d| (d.id, d.content.as_str())).collect();
let mut hasher = sha2::Sha256::new();
for (id, content) in &sorted {
hasher.update(id.as_bytes());
hasher.update(content.as_bytes());
}
let hash = hasher.finalize();
::hex::encode(&hash[..16])
}
fn apply_actions_to_result(
actions: Vec<ConsolidationAction>,
batch: &[Drawer],
result: &mut ConsolidationResult,
) {
for action in actions {
match action {
ConsolidationAction::Alias { from, to } => {
result.aliases.push((from, to));
}
ConsolidationAction::Merge {
canonical_content,
superseded_ids,
} => {
let max_importance = batch
.iter()
.filter(|d| superseded_ids.contains(&d.id))
.map(|d| d.importance)
.fold(0.5f32, f32::max);
let mut tags: Vec<String> = Vec::new();
for d in batch.iter().filter(|d| superseded_ids.contains(&d.id)) {
for t in &d.tags {
if !tags.contains(t) {
tags.push(t.clone());
}
}
}
result.canonical_drawers.push(CanonicalDrawer {
content: canonical_content,
importance: max_importance,
tags,
canonical_for: superseded_ids.clone(),
});
for id in superseded_ids {
if !result.superseded_ids.contains(&id) {
result.superseded_ids.push(id);
}
}
}
ConsolidationAction::Flag { drawer_id, reason } => {
result.flagged_ids.push((drawer_id, reason));
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::memory_core::palace::Drawer;
use uuid::Uuid;
fn make_drawer(content: &str, importance: f32) -> Drawer {
let room_id = Uuid::new_v4();
let mut d = Drawer::new(room_id, content);
d.importance = importance;
d
}
#[test]
fn semantic_config_defaults() {
let cfg = SemanticConsolidationConfig::default();
assert!(cfg.enabled);
assert_eq!(cfg.model, "anthropic/claude-haiku-4-5");
assert!((cfg.similarity_threshold - 0.75).abs() < 1e-6);
assert_eq!(cfg.max_batch_size, 8);
assert_eq!(cfg.max_calls_per_cycle, 20);
}
#[test]
fn consolidation_action_deserializes() {
let alias_json = r#"{"action":"alias","from":"ts","to":"trusty-search"}"#;
let action: ConsolidationAction = serde_json::from_str(alias_json).unwrap();
assert_eq!(
action,
ConsolidationAction::Alias {
from: "ts".into(),
to: "trusty-search".into()
}
);
let id = Uuid::new_v4();
let merge_json = format!(
r#"{{"action":"merge","canonical_content":"trusty-search is a hybrid search daemon","superseded_ids":["{id}"]}}"#
);
let action: ConsolidationAction = serde_json::from_str(&merge_json).unwrap();
if let ConsolidationAction::Merge {
canonical_content,
superseded_ids,
} = action
{
assert_eq!(canonical_content, "trusty-search is a hybrid search daemon");
assert_eq!(superseded_ids, vec![id]);
} else {
panic!("expected Merge");
}
let flag_json =
format!(r#"{{"action":"flag","drawer_id":"{id}","reason":"contradicts other entry"}}"#);
let action: ConsolidationAction = serde_json::from_str(&flag_json).unwrap();
assert_eq!(
action,
ConsolidationAction::Flag {
drawer_id: id,
reason: "contradicts other entry".into()
}
);
}
#[test]
fn inference_available_false_without_key() {
assert!(!inference_available("", false));
assert!(!inference_available(" ", false));
}
#[test]
fn inference_available_true_with_inline_key() {
assert!(inference_available("sk-test-key", false));
}
#[test]
fn inference_available_true_with_local_model() {
assert!(inference_available("", true));
}
#[test]
fn parse_consolidation_actions_round_trips() {
let id = Uuid::new_v4();
let raw = format!(
r#"[{{"action":"alias","from":"ts","to":"trusty-search"}},{{"action":"flag","drawer_id":"{id}","reason":"test"}}]"#
);
let actions = parse_consolidation_actions(&raw).unwrap();
assert_eq!(actions.len(), 2);
}
#[test]
fn parse_handles_markdown_fence() {
let raw = "```json\n[{\"action\":\"alias\",\"from\":\"a\",\"to\":\"b\"}]\n```";
let actions = parse_consolidation_actions(raw).unwrap();
assert_eq!(actions.len(), 1);
}
#[test]
fn parse_returns_empty_on_garbage() {
let actions = parse_consolidation_actions("sorry, I cannot help with that").unwrap();
assert!(actions.is_empty());
}
#[test]
fn batch_cache_key_is_deterministic() {
let d1 = make_drawer("alpha content", 0.7);
let d2 = make_drawer("beta content", 0.5);
let batch = vec![d1.clone(), d2.clone()];
let k1 = batch_cache_key(&batch);
let k2 = batch_cache_key(&batch);
assert_eq!(k1, k2);
}
#[test]
fn batch_cache_key_differs_for_different_content() {
let d1 = make_drawer("alpha content", 0.7);
let d2 = make_drawer("totally different", 0.5);
let k1 = batch_cache_key(&[d1]);
let k2 = batch_cache_key(&[d2]);
assert_ne!(k1, k2);
}
#[tokio::test]
async fn consolidator_merges_cluster() {
let id1 = Uuid::new_v4();
let id2 = Uuid::new_v4();
let mut d1 = make_drawer("ts is a search tool", 0.8);
d1.id = id1;
let mut d2 = make_drawer("trusty-search is a hybrid search daemon", 0.6);
d2.id = id2;
let actions = vec![ConsolidationAction::Merge {
canonical_content: "trusty-search (ts) is a hybrid BM25+vector search daemon"
.to_string(),
superseded_ids: vec![id1, id2],
}];
let mock = Arc::new(MockInference::new(actions));
let call_count = mock.call_count.clone();
let cfg = SemanticConsolidationConfig {
max_batch_size: 8,
max_calls_per_cycle: 20,
..Default::default()
};
let consolidator = SemanticConsolidator::new(mock, cfg);
let result = consolidator.consolidate(&[d1, d2]).await;
assert_eq!(result.canonical_drawers.len(), 1);
assert_eq!(
result.canonical_drawers[0].content,
"trusty-search (ts) is a hybrid BM25+vector search daemon"
);
assert!(result.superseded_ids.contains(&id1));
assert!(result.superseded_ids.contains(&id2));
assert_eq!(call_count.load(std::sync::atomic::Ordering::Relaxed), 1);
}
#[tokio::test]
async fn consolidator_caches_repeated_batches() {
let d = make_drawer("trusty-memory is a palace storage engine", 0.7);
let actions = vec![ConsolidationAction::Alias {
from: "tm".to_string(),
to: "trusty-memory".to_string(),
}];
let mock = Arc::new(MockInference::new(actions));
let call_count = mock.call_count.clone();
let consolidator = SemanticConsolidator::new(
mock,
SemanticConsolidationConfig {
max_batch_size: 8,
max_calls_per_cycle: 20,
..Default::default()
},
);
let r1 = consolidator.consolidate(std::slice::from_ref(&d)).await;
let r2 = consolidator.consolidate(std::slice::from_ref(&d)).await;
assert_eq!(call_count.load(std::sync::atomic::Ordering::Relaxed), 1);
assert_eq!(r1.cache_hits, 0);
assert_eq!(r2.cache_hits, 1);
assert_eq!(r1.aliases.len(), 1);
assert_eq!(r2.aliases.len(), 1);
}
#[tokio::test]
async fn consolidator_respects_call_budget() {
let drawers: Vec<Drawer> = (0..10)
.map(|i| make_drawer(&format!("drawer content {i}"), 0.5))
.collect();
let mock = Arc::new(MockInference::no_op());
let call_count = mock.call_count.clone();
let consolidator = SemanticConsolidator::new(
mock,
SemanticConsolidationConfig {
max_batch_size: 2,
max_calls_per_cycle: 3, ..Default::default()
},
);
let result = consolidator.consolidate(&drawers).await;
assert_eq!(
call_count.load(std::sync::atomic::Ordering::Relaxed),
3,
"should stop at budget of 3 calls"
);
assert_eq!(result.llm_calls, 3);
}
#[tokio::test]
async fn consolidator_collects_aliases() {
let d = make_drawer("ts stands for trusty-search", 0.5);
let actions = vec![ConsolidationAction::Alias {
from: "ts".into(),
to: "trusty-search".into(),
}];
let mock = Arc::new(MockInference::new(actions));
let consolidator = SemanticConsolidator::new(mock, SemanticConsolidationConfig::default());
let result = consolidator.consolidate(&[d]).await;
assert_eq!(
result.aliases,
vec![("ts".to_string(), "trusty-search".to_string())]
);
}
#[tokio::test]
async fn consolidator_flags_contradictions() {
let d = make_drawer("trusty-search uses PostgreSQL for storage", 0.7);
let id = d.id;
let actions = vec![ConsolidationAction::Flag {
drawer_id: id,
reason: "contradicts: trusty-search uses redb".into(),
}];
let mock = Arc::new(MockInference::new(actions));
let consolidator = SemanticConsolidator::new(mock, SemanticConsolidationConfig::default());
let result = consolidator.consolidate(&[d]).await;
assert_eq!(result.flagged_ids.len(), 1);
assert_eq!(result.flagged_ids[0].0, id);
assert!(result.superseded_ids.is_empty());
assert!(result.canonical_drawers.is_empty());
}
}