use serde_json::{Value, json};
use crate::identity::keypair::AgentKeypair;
use crate::llm::OllamaClient;
use crate::mcp::param_names;
use crate::models::{GovernancePolicy, Memory, MemoryLinkRelation};
use crate::{db, hnsw::VectorIndex};
use super::AUTONOMY_MIN_CONTENT_LEN;
pub(super) struct SynthesisOutcome {
pub counts: Option<crate::synthesis::SynthesisCounts>,
pub updates: Vec<(String, String)>,
pub deletes: Vec<String>,
pub failed_reason: Option<String>,
}
impl SynthesisOutcome {
pub(super) fn empty() -> Self {
Self {
counts: None,
updates: Vec::new(),
deletes: Vec::new(),
failed_reason: None,
}
}
}
pub(super) fn run_synthesis_pass(
llm: &OllamaClient,
mem: &Memory,
agent_id: &str,
existing: &[Memory],
ns_policy: &GovernancePolicy,
) -> Result<SynthesisOutcome, String> {
let cands: Vec<&Memory> = existing
.iter()
.filter(|c| c.id != mem.id && c.title != mem.title)
.collect();
if cands.is_empty() {
return Ok(SynthesisOutcome::empty());
}
let cap = ns_policy.effective_synthesis_max_candidate_chars();
match crate::synthesis::synthesise_with_cap(llm, &mem.title, &mem.content, &cands, cap) {
Ok(resp) => {
let counts = crate::synthesis::SynthesisCounts::from_response(&resp);
tracing::info!(
target: "synthesis",
namespace = %mem.namespace,
add = counts.add,
update = counts.update,
delete = counts.delete,
no_op = counts.no_op,
"synthesis batch decision",
);
let delete_cap = ns_policy.effective_synthesis_max_deletes_per_call() as usize;
if counts.delete > delete_cap {
tracing::warn!(
target: "synthesis",
namespace = %mem.namespace,
requested = counts.delete,
cap = delete_cap,
"synthesis.refused_unbounded_delete",
);
return Err(format!(
"GOVERNANCE_REFUSED: synthesis batch attempted {} \
deletes, exceeding namespace cap of {} (K10 approval \
required for unbounded-delete; raise \
`synthesis_max_deletes_per_call` to opt in per-namespace)",
counts.delete, delete_cap
));
}
if counts.update > 1 {
tracing::warn!(
target: "synthesis",
namespace = %mem.namespace,
update_count = counts.update,
"synthesis_decisions.update_count > 1; honouring all updates in sequence",
);
}
let mut updates: Vec<(String, String)> = Vec::new();
let mut deletes: Vec<String> = Vec::new();
for v in &resp.verdicts {
match v.verb {
crate::synthesis::SynthesisVerb::Update => {
let merged = v
.merged_content
.clone()
.unwrap_or_else(|| mem.content.clone());
updates.push((v.candidate_id.clone(), merged));
}
crate::synthesis::SynthesisVerb::Delete => {
if k9_allows_synthesis_delete(&mem.namespace, agent_id, &v.candidate_id) {
deletes.push(v.candidate_id.clone());
}
}
crate::synthesis::SynthesisVerb::Add
| crate::synthesis::SynthesisVerb::NoOp => {}
}
}
Ok(SynthesisOutcome {
counts: Some(counts),
updates,
deletes,
failed_reason: None,
})
}
Err(e) => {
let reason = e.to_string();
tracing::warn!(
target: "synthesis",
namespace = %mem.namespace,
"synthesis call failed: {reason}",
);
match ns_policy.effective_synthesis_failure_mode() {
crate::models::SynthesisFailureMode::BlockWrite => Err(format!(
"SYNTHESIS_FAILED: namespace policy `block_write` refuses \
the store while the curator is unavailable: {reason}"
)),
crate::models::SynthesisFailureMode::FallThrough => Ok(SynthesisOutcome {
counts: None,
updates: Vec::new(),
deletes: Vec::new(),
failed_reason: Some(reason),
}),
}
}
}
}
fn k9_allows_synthesis_delete(namespace: &str, agent_id: &str, candidate_id: &str) -> bool {
use crate::permissions::{Decision, Op, PermissionContext, Permissions};
let payload = json!({
"id": candidate_id,
"via": "synthesis_verdict",
});
let ctx = PermissionContext {
op: Op::MemoryDelete,
namespace: namespace.to_string(),
agent_id: agent_id.to_string(),
payload,
};
match Permissions::evaluate(&ctx, &[]) {
Decision::Allow | Decision::Modify(_) => true,
Decision::Deny(reason) => {
tracing::warn!(
target: "synthesis",
namespace = %namespace,
candidate_id = %candidate_id,
"synthesis delete verdict denied by K9: {reason}",
);
false
}
Decision::Ask(reason) => {
tracing::warn!(
target: "synthesis",
namespace = %namespace,
candidate_id = %candidate_id,
"synthesis delete verdict held for approval (ask): {reason}; \
skipping in this batch",
);
false
}
}
}
pub(super) fn apply_synthesis_updates_and_deletes(
conn: &rusqlite::Connection,
mem: &Memory,
existing: &[Memory],
embedder: Option<&dyn crate::embeddings::Embed>,
vector_index: Option<&VectorIndex>,
outcome: &SynthesisOutcome,
active_keypair: Option<&AgentKeypair>,
) -> Option<Value> {
let primary_update = outcome.updates.first().cloned();
let (primary_id, _) = primary_update.as_ref()?;
if conn
.execute_batch(crate::storage::connection::SQL_BEGIN_IMMEDIATE)
.is_err()
{
return None;
}
let mut deferred_index_ops: Vec<(String, Vec<f32>)> = Vec::new();
let mut updates_emitted: usize = 0;
for (cand_id, merged_content) in &outcome.updates {
let Some(target) = existing.iter().find(|c| c.id == *cand_id).cloned() else {
tracing::warn!(
target: "synthesis",
"synthesis update target {cand_id} not found in candidate set",
);
continue;
};
let preserved_metadata =
crate::identity::preserve_agent_id(&target.metadata, &mem.metadata);
let upd = db::update(
conn,
cand_id,
None,
Some(merged_content.as_str()),
Some(&mem.tier),
None,
Some(&mem.tags),
Some(mem.priority),
Some(mem.confidence),
None,
Some(&preserved_metadata),
);
let (_found, content_changed) = match upd {
Ok(v) => v,
Err(e) => {
tracing::warn!(
target: "synthesis",
"synthesis update failed for {cand_id}: {e}; rolling back merge",
);
let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
return None;
}
};
if content_changed && let Some(emb) = embedder {
let text = crate::embeddings::embedding_document(&target.title, &merged_content);
if let Ok(embedding) = emb.embed(&text) {
let _ = db::set_embedding(conn, cand_id, &embedding);
if vector_index.is_some() {
deferred_index_ops.push((cand_id.to_string(), embedding));
}
}
}
let mut provenance_row = mem.clone();
if updates_emitted > 0 {
provenance_row.id = uuid::Uuid::new_v4().to_string();
}
provenance_row.content = merged_content.clone();
provenance_row.metadata =
crate::identity::preserve_agent_id(&target.metadata, &mem.metadata);
provenance_row.title = format!("{} (sup ⟶ {})", mem.title, &target.id);
match db::insert(conn, &provenance_row) {
Ok(provenance_id) => {
if let Err(e) = db::create_link_signed(
conn,
&provenance_id,
&target.id,
MemoryLinkRelation::Supersedes.as_str(),
active_keypair,
) {
tracing::warn!(
target: "synthesis",
"synthesis supersedes link emit failed for {} -> {}: {e}",
provenance_id,
target.id,
);
}
}
Err(e) => {
tracing::warn!(
target: "synthesis",
"synthesis provenance-row insert failed for {} (target={}): {e}",
mem.id,
target.id,
);
}
}
updates_emitted += 1;
}
for del_id in &outcome.deletes {
if del_id == primary_id {
continue;
}
if let Err(e) = db::delete(conn, del_id) {
tracing::warn!(
target: "synthesis",
"synthesis delete failed for {del_id}: {e}; rolling back merge",
);
let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
return None;
}
}
if conn
.execute_batch(crate::storage::connection::SQL_COMMIT)
.is_err()
{
let _ = conn.execute_batch(crate::storage::connection::SQL_ROLLBACK);
return None;
}
if let Some(idx) = vector_index {
for (id, embedding) in deferred_index_ops {
idx.remove(&id);
idx.insert(id, embedding);
}
}
let target = existing.iter().find(|c| c.id == *primary_id).cloned()?;
let preserved_metadata = crate::identity::preserve_agent_id(&target.metadata, &mem.metadata);
let echoed_agent_id = preserved_metadata
.get(param_names::AGENT_ID)
.and_then(|v| v.as_str())
.map(str::to_string);
let mut resp = json!({
"id": target.id,
"tier": mem.tier,
"title": target.title,
"namespace": mem.namespace,
"agent_id": echoed_agent_id,
"duplicate": true,
"action": "synthesised: update existing memory",
});
if let Some(c) = &outcome.counts {
resp["synthesis_decisions"] = c.to_json();
}
if let Some(reason) = &outcome.failed_reason {
resp["synthesis_failed"] = json!(true);
resp["synthesis_failed_reason"] = json!(reason);
}
Some(resp)
}
pub(super) fn pending_synthesis_delete_targets(outcome: &SynthesisOutcome) -> Vec<String> {
if !outcome.updates.is_empty() {
return Vec::new();
}
outcome.deletes.clone()
}
pub(super) fn apply_pending_synthesis_deletes_with_links(
conn: &rusqlite::Connection,
new_id: &str,
pending_deletes: &[String],
active_keypair: Option<&AgentKeypair>,
) {
for del_id in pending_deletes {
if let Err(e) = db::create_link_signed(
conn,
new_id,
del_id,
MemoryLinkRelation::Supersedes.as_str(),
active_keypair,
) {
tracing::warn!(
target: "synthesis",
"synthesis supersedes link emit failed for {new_id} -> {del_id}: {e}",
);
}
if let Err(e) = db::delete(conn, del_id) {
tracing::warn!(
target: "synthesis",
"synthesis delete failed for {del_id}: {e}",
);
}
}
}
pub(super) fn synthesis_eligible(
autonomous_hooks: bool,
llm_present: bool,
content_len: usize,
namespace: &str,
ns_policy: &GovernancePolicy,
) -> bool {
autonomous_hooks
&& llm_present
&& content_len >= AUTONOMY_MIN_CONTENT_LEN
&& !namespace.starts_with('_')
&& !ns_policy.effective_legacy_per_pair_classifier()
}