use serde_json::{json, Value};
use kyma_memory::types::{CreateMemory, MemoryType, RecallFilter};
use kyma_memory::{rows, MemoryWriter, DEFAULT_DATABASE, EDGE_INVALIDATES, NODE_TABLE};
use super::memory_extract::{decide_conflict, ConflictOp, ExtractedMemory};
use super::memory_tools::fetch_latest_node;
use super::state::AgentState;
use super::tools::{execute_sql, SharedToolCtx};
#[derive(Debug, Default, Clone)]
pub struct ConflictTally {
pub added: i64,
pub updated: i64,
pub noop: i64,
pub invalidated: i64,
}
impl ConflictTally {
pub fn written(&self) -> i64 {
self.added + self.updated
}
pub fn merge(&mut self, other: &ConflictTally) {
self.added += other.added;
self.updated += other.updated;
self.noop += other.noop;
self.invalidated += other.invalidated;
}
pub fn to_json(&self) -> Value {
json!({
"added": self.added,
"updated": self.updated,
"noop": self.noop,
"invalidated": self.invalidated,
})
}
}
const NEIGHBOURS: usize = 5;
pub async fn consolidate_memory(
state: &AgentState,
shared: &SharedToolCtx,
writer: &MemoryWriter,
realm: &str,
m: &ExtractedMemory,
references: Vec<String>,
provenance: Value,
) -> ConflictTally {
let mut tally = ConflictTally::default();
let content = m.content.trim();
if content.is_empty() {
return tally;
}
let kind = MemoryType::parse(&m.kind);
let similar = nearest(shared, writer, realm, kind, content).await;
let decision = match decide_conflict(state, content, &similar).await {
Ok(d) => d,
Err(e) => {
tracing::debug!(error = %e, "conflict decision failed; defaulting to ADD");
super::memory_extract::ConflictDecision {
op: ConflictOp::Add,
target_id: None,
merged_content: None,
reason: Some("decision error".into()),
}
}
};
match decision.op {
ConflictOp::Noop => {
tally.noop += 1;
}
ConflictOp::Add => {
if add_memory(writer, realm, m, kind, content, references, provenance)
.await
.is_some()
{
tally.added += 1;
}
}
ConflictOp::Update => match decision.target_id.as_deref() {
Some(target) => {
let new_content = decision.merged_content.as_deref().unwrap_or(content);
if update_memory(shared, writer, target, new_content).await {
tally.updated += 1;
} else if add_memory(writer, realm, m, kind, content, references, provenance)
.await
.is_some()
{
tally.added += 1; }
}
None => {
if add_memory(writer, realm, m, kind, content, references, provenance)
.await
.is_some()
{
tally.added += 1;
}
}
},
ConflictOp::Invalidate => {
match add_memory(writer, realm, m, kind, content, references, provenance).await {
Some(new_node_id) => {
tally.added += 1;
if let Some(target) = decision.target_id.as_deref() {
if invalidate_memory(shared, writer, realm, target, &new_node_id).await {
tally.invalidated += 1;
}
}
}
None => {}
}
}
}
tally
}
async fn nearest(
shared: &SharedToolCtx,
writer: &MemoryWriter,
realm: &str,
kind: MemoryType,
content: &str,
) -> Vec<(String, String)> {
let qvec = match writer.embed_one(content).await {
Ok(v) => v,
Err(_) => return Vec::new(),
};
let filter = RecallFilter {
realms: vec![realm.to_string()],
memory_type: Some(kind),
..Default::default()
};
let sql = kyma_memory::sql::recall_sql(NODE_TABLE, &qvec, &filter, NEIGHBOURS, None);
let res = execute_sql(shared, DEFAULT_DATABASE, &sql, NEIGHBOURS).await;
res.get("rows")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(|r| {
let id = r.get("id").and_then(Value::as_str)?;
let c = r
.get("content")
.or_else(|| r.get("content_preview"))
.and_then(Value::as_str)
.unwrap_or("");
Some((id.to_string(), c.to_string()))
})
.collect()
})
.unwrap_or_default()
}
async fn add_memory(
writer: &MemoryWriter,
realm: &str,
m: &ExtractedMemory,
kind: MemoryType,
content: &str,
references: Vec<String>,
provenance: Value,
) -> Option<String> {
let mut cm = CreateMemory::new(content);
cm.title = m.title.clone();
cm.memory_type = kind;
cm.realm = realm.to_string();
cm.importance = m.importance.clamp(0.0, 1.0);
cm.references = references;
cm.valid_at = m.valid_at.clone();
cm.provenance = Some(provenance);
cm.tags = vec!["source:extraction".to_string()];
match writer.save(&cm).await {
Ok(id) => Some(rows::node_id(&id)),
Err(e) => {
tracing::debug!(error = %e, "add_memory failed");
None
}
}
}
async fn update_memory(
shared: &SharedToolCtx,
writer: &MemoryWriter,
target_node_id: &str,
new_content: &str,
) -> bool {
let mut row = match fetch_latest_node(shared, target_node_id).await {
Ok(r) => r,
Err(_) => return false,
};
let emb = match writer.embed_one(new_content).await {
Ok(v) => v,
Err(_) => return false,
};
let now = now_rfc3339();
row["content"] = json!(new_content);
row["content_preview"] = json!(rows::preview(new_content));
row["embedding"] = json!(emb);
row["updated_at"] = json!(now);
row["valid_at"] = json!(now);
writer.append_node_rows(vec![row]).await.is_ok()
}
async fn invalidate_memory(
shared: &SharedToolCtx,
writer: &MemoryWriter,
realm: &str,
target_node_id: &str,
replacement_node_id: &str,
) -> bool {
let mut row = match fetch_latest_node(shared, target_node_id).await {
Ok(r) => r,
Err(_) => return false,
};
let now = now_rfc3339();
row["invalid_at"] = json!(now);
row["superseded_by"] = json!(replacement_node_id);
row["updated_at"] = json!(now);
if writer.append_node_rows(vec![row]).await.is_err() {
return false;
}
let _ = writer
.link(replacement_node_id, target_node_id, EDGE_INVALIDATES, realm, None)
.await;
true
}
fn now_rfc3339() -> String {
chrono::Utc::now().to_rfc3339()
}