use crate::embeddings::Embed;
use crate::hnsw::VectorIndex;
use crate::llm::OllamaClient;
use crate::models::Tier;
use crate::models::field_names;
use crate::{db, validate};
use serde_json::{Value, json};
use std::path::Path;
pub(super) fn handle_consolidate(
conn: &rusqlite::Connection,
db_path: &Path,
params: &Value,
llm: Option<&OllamaClient>,
embedder: Option<&dyn Embed>,
vector_index: Option<&VectorIndex>,
mcp_client: Option<&str>,
) -> Result<Value, String> {
let ids_arr = params["ids"]
.as_array()
.ok_or("ids is required (array of memory IDs)")?;
let mut ids = Vec::with_capacity(ids_arr.len());
for (i, v) in ids_arr.iter().enumerate() {
match v.as_str() {
Some(s) => {
validate::validate_id(s).map_err(|e| e.to_string())?;
ids.push(s.to_string());
}
None => return Err(format!("ids[{i}] must be a string")),
}
}
let title = params["title"]
.as_str()
.ok_or(crate::errors::msg::TITLE_REQUIRED)?;
let namespace = params["namespace"]
.as_str()
.unwrap_or(crate::DEFAULT_NAMESPACE);
let summary: String = if let Some(s) = params["summary"].as_str() {
s.to_string()
} else if let Some(llm_client) = llm {
let mut memory_pairs: Vec<(String, String)> = Vec::new();
for id in &ids {
match db::get(conn, id) {
Ok(Some(mem)) => memory_pairs.push((mem.title, mem.content)),
Ok(None) => return Err(crate::errors::msg::memory_not_found(id)),
Err(e) => return Err(e.to_string()),
}
}
llm_client
.summarize_memories(&memory_pairs)
.map_err(|e| format!("LLM summarization failed: {e}"))?
} else {
return Err(
"summary is required (or use smart/autonomous tier for auto-summarization)".into(),
);
};
validate::RequestValidator::validate_consolidate(&ids, title, &summary, namespace)
.map_err(|e| e.to_string())?;
{
use crate::permissions::{Op, PermissionContext, Permissions};
let agent_id = crate::identity::resolve_agent_id(params["agent_id"].as_str(), mcp_client)
.map_err(|e| e.to_string())?;
let ctx = PermissionContext {
op: Op::MemoryConsolidate,
namespace: namespace.to_string(),
agent_id,
payload: json!({
"title": title,
"summary_chars": summary.len(),
(field_names::SOURCE_IDS): ids,
}),
};
match Permissions::evaluate(&ctx, &[]) {
crate::permissions::Decision::Allow | crate::permissions::Decision::Modify(_) => {}
crate::permissions::Decision::Deny(reason) => {
return Err(crate::governance::deny_message(
crate::audit::OP_CONSOLIDATE,
crate::governance::DenyGate::PermissionRule,
&reason,
));
}
crate::permissions::Decision::Ask(prompt) => {
return Ok(json!({
"status": "ask",
"reason": prompt,
"action": crate::audit::OP_CONSOLIDATE,
"namespace": namespace,
"source_count": ids.len(),
}));
}
}
}
let auto_generated = params["summary"].as_str().is_none();
if let Some(idx) = vector_index {
for id in &ids {
idx.remove(id);
}
}
let explicit_agent_id = params["agent_id"].as_str();
let consolidator_agent_id = crate::identity::resolve_agent_id(explicit_agent_id, mcp_client)
.map_err(|e| e.to_string())?;
let new_id = db::consolidate(
conn,
&ids,
title,
&summary,
namespace,
&Tier::Long,
crate::db::CONSOLIDATION_SOURCE,
&consolidator_agent_id,
)
.map_err(|e| e.to_string())?;
if let Some(emb) = embedder {
let text = format!("{title} {summary}");
match emb.embed(&text) {
Ok(embedding) => {
if let Err(e) = db::set_embedding(conn, &new_id, &embedding) {
tracing::warn!(
"failed to store embedding for consolidated {}: {}",
&new_id,
e
);
}
if let Some(idx) = vector_index {
for id in &ids {
idx.remove(id);
}
idx.insert(new_id.clone(), embedding);
}
}
Err(e) => {
tracing::warn!(
"failed to generate embedding for consolidated {}: {}",
&new_id,
e
);
}
}
}
let mut result = json!({"id": new_id, (field_names::CONSOLIDATED): ids.len()});
if auto_generated {
result["auto_summary"] = json!(true);
result["summary_preview"] = json!(summary.chars().take(200).collect::<String>());
}
let standard_ids: Vec<&str> = ids
.iter()
.filter(|id| db::is_namespace_standard(conn, id))
.map(std::string::String::as_str)
.collect();
if !standard_ids.is_empty() {
result["warning"] = json!(format!(
"consolidated memories included namespace standard(s): {}. Re-set the standard to the new memory ID: {}",
standard_ids.join(", "),
new_id
));
}
let details = serde_json::to_value(crate::subscriptions::ConsolidatedEventDetails {
source_ids: ids.clone(),
source_count: ids.len(),
})
.ok();
crate::subscriptions::dispatch_event_with_details(
conn,
crate::subscriptions::webhook_events::MEMORY_CONSOLIDATED,
&new_id,
namespace,
Some(&consolidator_agent_id),
db_path,
details,
);
Ok(result)
}
use crate::mcp::registry::McpTool;
use schemars::JsonSchema;
use serde::Deserialize;
#[derive(Debug, Clone, Default, Deserialize, JsonSchema)]
#[allow(dead_code)]
pub struct ConsolidateRequest {
pub ids: Vec<String>,
pub title: String,
#[serde(default)]
pub summary: Option<String>,
#[serde(default)]
pub namespace: Option<String>,
#[schemars(description = "#908 consolidator agent_id.")]
#[serde(default)]
pub agent_id: Option<String>,
}
#[allow(dead_code)]
pub struct ConsolidateTool;
impl McpTool for ConsolidateTool {
fn name() -> &'static str {
crate::mcp::registry::tool_names::MEMORY_CONSOLIDATE
}
fn description() -> &'static str {
"Consolidate multiple memories into one long-term summary."
}
fn docs() -> &'static str {
"Merge 2-100 sources into one long-tier memory; deletes sources; provenance recorded in \
metadata.derived_from + metadata.consolidated_from_agents (NOT KG-traversable link rows). \
LLM auto-generates summary if omitted (smart/autonomous tier)."
}
fn input_schema() -> Value {
crate::mcp::registry::input_schema_for::<ConsolidateRequest>()
}
fn family() -> &'static str {
crate::profile::Family::Power.name()
}
}
#[cfg(test)]
mod d1_5_986_tests {
use super::*;
use crate::mcp::parity_test_helpers::{
assert_descriptions_match, assert_property_set_parity, derived_props_for,
};
#[test]
fn consolidate_parity_986() {
let derived = derived_props_for::<ConsolidateRequest>();
assert_property_set_parity("memory_consolidate", &derived);
assert_descriptions_match("memory_consolidate", &derived);
}
#[test]
fn consolidate_tool_metadata_986() {
assert_eq!(ConsolidateTool::name(), "memory_consolidate");
assert_eq!(ConsolidateTool::family(), "power");
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::embeddings::test_support::MockEmbedder;
use crate::models::{Memory, MemoryKind};
use crate::storage as db;
use serde_json::json;
fn fresh_db() -> (rusqlite::Connection, tempfile::NamedTempFile) {
let tmp = tempfile::NamedTempFile::new().expect("tempfile");
let conn = db::open(tmp.path()).expect("db::open");
(conn, tmp)
}
fn seed_observation(conn: &rusqlite::Connection, ns: &str, title: &str) -> String {
let now = chrono::Utc::now().to_rfc3339();
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: ns.to_string(),
title: title.to_string(),
content: format!("body for {title}"),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: json!({"agent_id": "ai:test"}),
reflection_depth: 0,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(conn, &mem).expect("insert")
}
#[test]
fn missing_ids_errors() {
let (conn, tmp) = fresh_db();
let err = handle_consolidate(
&conn,
tmp.path(),
&json!({"title": "t", "summary": "s"}),
None,
None,
None,
None,
)
.unwrap_err();
assert!(err.contains("ids"), "got: {err}");
}
#[test]
fn non_string_id_errors() {
let (conn, tmp) = fresh_db();
let err = handle_consolidate(
&conn,
tmp.path(),
&json!({"ids": [42], "title": "t", "summary": "s"}),
None,
None,
None,
None,
)
.unwrap_err();
assert!(err.contains("must be a string"), "got: {err}");
}
#[test]
fn invalid_id_rejected() {
let (conn, tmp) = fresh_db();
let err = handle_consolidate(
&conn,
tmp.path(),
&json!({"ids": [" "], "title": "t", "summary": "s"}),
None,
None,
None,
None,
)
.unwrap_err();
assert!(!err.is_empty());
}
#[test]
fn missing_title_errors() {
let (conn, tmp) = fresh_db();
let err = handle_consolidate(
&conn,
tmp.path(),
&json!({"ids": ["11111111-2222-3333-4444-555555555555"], "summary": "s"}),
None,
None,
None,
None,
)
.unwrap_err();
assert!(err.contains("title"), "got: {err}");
}
#[test]
fn no_summary_no_llm_refused() {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-ns", "a");
let err = handle_consolidate(
&conn,
tmp.path(),
&json!({"ids": [a], "title": "consolidated"}),
None,
None,
None,
None,
)
.unwrap_err();
assert!(err.contains("summary is required"), "got: {err}");
}
#[test]
fn happy_path_consolidates_two() {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-ns2", "a");
let b = seed_observation(&conn, "cn-ns2", "b");
let resp = handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": [a, b],
"title": "consolidated",
"summary": "the merged summary text",
"namespace": "cn-ns2",
}),
None,
None,
None,
None,
)
.expect("ok");
assert!(resp["id"].is_string());
assert_eq!(resp["consolidated"].as_u64(), Some(2));
}
#[test]
fn happy_path_with_embedder_stores_embedding() {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-emb", "a");
let b = seed_observation(&conn, "cn-emb", "b");
let emb = MockEmbedder::new_local().unwrap();
let resp = handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": [a, b],
"title": "consolidated-emb",
"summary": "merged",
"namespace": "cn-emb",
}),
None,
Some(&emb),
None,
None,
)
.expect("ok");
let new_id = resp["id"].as_str().unwrap();
let has_emb: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memories WHERE id = ?1 AND embedding IS NOT NULL",
rusqlite::params![new_id],
|r| r.get(0),
)
.unwrap_or(0);
assert_eq!(has_emb, 1);
}
#[tokio::test(flavor = "multi_thread")]
async fn llm_summary_auto_generated() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/chat"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"message": {"content": "auto-summary text"},
"done": true,
})))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/tags"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"models": []})))
.mount(&server)
.await;
let uri = server.uri();
let resp = tokio::task::spawn_blocking(move || {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-llm", "a");
let b = seed_observation(&conn, "cn-llm", "b");
let client = crate::llm::OllamaClient::new_with_url(&uri, "test-model").unwrap();
handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": [a, b],
"title": "consolidated-auto",
"namespace": "cn-llm",
}),
Some(&client),
None,
None,
None,
)
.expect("ok")
})
.await
.unwrap();
assert_eq!(resp["auto_summary"], true);
assert!(resp["summary_preview"].is_string());
}
#[tokio::test(flavor = "multi_thread")]
async fn llm_summary_error_surfaced() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/chat"))
.respond_with(ResponseTemplate::new(500).set_body_string("boom"))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/api/tags"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"models": []})))
.mount(&server)
.await;
let uri = server.uri();
let err = tokio::task::spawn_blocking(move || {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-llm-err", "a");
let client = crate::llm::OllamaClient::new_with_url(&uri, "test-model").unwrap();
handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": [a],
"title": "consolidated-err",
"namespace": "cn-llm-err",
}),
Some(&client),
None,
None,
None,
)
.err()
.unwrap_or_default()
})
.await
.unwrap();
assert!(err.contains("LLM summarization failed"), "got: {err}");
}
#[tokio::test(flavor = "multi_thread")]
async fn llm_path_missing_source_errors() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/tags"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({"models": []})))
.mount(&server)
.await;
let uri = server.uri();
let err = tokio::task::spawn_blocking(move || {
let (conn, tmp) = fresh_db();
let client = crate::llm::OllamaClient::new_with_url(&uri, "test-model").unwrap();
handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": ["11111111-2222-3333-4444-555555555555"],
"title": "consolidated-missing",
"namespace": "cn-llm-miss",
}),
Some(&client),
None,
None,
None,
)
.err()
.unwrap_or_default()
})
.await
.unwrap();
assert!(err.contains("memory not found"), "got: {err}");
}
#[test]
fn no_warning_when_no_standard() {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-no-std", "a");
let b = seed_observation(&conn, "cn-no-std", "b");
let resp = handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": [a, b],
"title": "no-standard",
"summary": "merged",
"namespace": "cn-no-std",
}),
None,
None,
None,
None,
)
.expect("ok");
assert!(resp.get("warning").is_none());
}
#[test]
fn provenance_is_metadata_only_zero_link_rows_1599() {
let (conn, tmp) = fresh_db();
let a = seed_observation(&conn, "cn-prov", "a");
let b = seed_observation(&conn, "cn-prov", "b");
let c = seed_observation(&conn, "cn-prov", "c");
let resp = handle_consolidate(
&conn,
tmp.path(),
&json!({
"ids": [a, b, c],
"title": "provenance-contract",
"summary": "merged",
"namespace": "cn-prov",
}),
None,
None,
None,
None,
)
.expect("ok");
let new_id = resp["id"].as_str().expect("new id");
let mem = db::get(&conn, new_id).expect("get").expect("row exists");
let derived_key = crate::models::MemoryLinkRelation::DerivedFrom.as_str();
let derived: Vec<&str> = mem.metadata[derived_key]
.as_array()
.expect("metadata.derived_from must be an array")
.iter()
.filter_map(serde_json::Value::as_str)
.collect();
assert_eq!(derived.len(), 3, "derived_from must carry every source");
for src in [&a, &b, &c] {
assert!(
derived.contains(&src.as_str()),
"derived_from missing source {src}"
);
}
let agents = mem.metadata["consolidated_from_agents"]
.as_array()
.expect("metadata.consolidated_from_agents must be an array");
assert!(
agents.iter().any(|v| v.as_str() == Some("ai:test")),
"source author must be preserved, got: {agents:?}"
);
let links_resp = super::super::link::handle_get_links(&conn, &json!({"id": new_id}), None)
.expect("get_links ok");
assert_eq!(
links_resp["count"].as_u64(),
Some(0),
"consolidate must not mint MemoryLink rows, got: {links_resp}"
);
assert_eq!(
links_resp["links"].as_array().map(Vec::len),
Some(0),
"links array must be empty"
);
}
}