use super::*;
impl KnowledgeBase {
pub fn add(
&self,
content: &str,
kind: &str,
trigger_desc: Option<&str>,
anti_trigger_desc: Option<&str>,
source: &str,
skill_name: Option<&str>,
) -> Result<String> {
if !matches!(kind, "note" | "skill") {
return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
}
if !matches!(source, "chat" | "manual" | "doc" | "agent") {
return Err(InnateError::InvalidState(format!(
"invalid source: {source}"
)));
}
let (content, action) = self.sanitize_content(content);
if action == SanitizeAction::Discard {
return Ok(String::new());
}
let trigger_clean = trigger_desc.and_then(|t| {
let (cleaned, act) = self.sanitizer.sanitize(t);
if act == SanitizeAction::Discard {
None
} else {
Some(cleaned)
}
});
let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
let (cleaned, act) = self.sanitizer.sanitize(t);
if act == SanitizeAction::Discard {
None
} else {
Some(cleaned)
}
});
let h = content_hash(&content);
if self.storage.is_hash_invalidated(&h)? {
return Err(InnateError::InvalidState(
"content hash is invalidated".into(),
));
}
let existing = self.storage.query_chunks_params(
"SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
rusqlite::params![h],
)?;
if let Some(e) = existing.first() {
if let Some(id) = e.get("id").and_then(Value::as_str) {
return Ok(id.to_string());
}
}
let now = utc_now_iso();
let chunk_id = gen_uuid();
let redacted = action == SanitizeAction::Redact;
let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
(
"captured",
"pending",
if redacted { 0.4 } else { 0.60 },
0,
"init:captured_agent",
)
} else if kind == "skill" {
(
"installed",
"active",
if redacted { 0.4 } else { 0.85 },
1,
"init:installed",
)
} else {
(
"captured",
"active",
if redacted { 0.4 } else { 0.60 },
0,
"init:captured",
)
};
let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
let (cvec, tvec, embed_ver, final_state_reason) = match (
self.embedding.embed_content(&content),
self.embedding.embed_trigger(trigger_str),
) {
(Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
_ => (
vec![],
vec![],
0i64,
format!("embedding_pending:target={state}"),
),
};
let tokens = estimate_tokens(&content) as i64;
let row = ChunkRow {
id: chunk_id.clone(),
skill_name: skill_name.map(str::to_string),
content: content.clone(),
trigger_desc: trigger_clean.clone(),
anti_trigger_desc: anti_trigger_clean.clone(),
content_hash: h,
token_count: Some(tokens),
origin: origin.to_string(),
source: Some(source.to_string()),
protected: prot,
state: state.to_string(),
state_reason: Some(final_state_reason),
confidence: conf,
confidence_reason: Some(format!("init:{origin}")),
version: 1,
embed_version: embed_ver,
created_at: now.clone(),
updated_at: now.clone(),
..Default::default()
};
self.storage.begin_immediate()?;
let result = (|| -> Result<()> {
self.storage.insert_chunk(&row)?;
if embed_ver > 0 {
self.storage
.insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
self.storage
.insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
}
self.storage.commit()
})();
if result.is_err() {
let _ = self.storage.rollback();
}
result?;
Ok(chunk_id)
}
pub fn spark(
&self,
content: &str,
trigger_desc: Option<&str>,
anti_trigger_desc: Option<&str>,
) -> Result<String> {
let (content, action) = self.sanitize_content(content);
if action == SanitizeAction::Discard {
return Ok(String::new());
}
let trigger_clean = trigger_desc.and_then(|t| {
let (cleaned, act) = self.sanitizer.sanitize(t);
if act == SanitizeAction::Discard {
None
} else {
Some(cleaned)
}
});
let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
let (cleaned, act) = self.sanitizer.sanitize(t);
if act == SanitizeAction::Discard {
None
} else {
Some(cleaned)
}
});
let h = content_hash(&content);
if self.storage.is_hash_invalidated(&h)? {
return Err(InnateError::InvalidState(
"content hash is invalidated".into(),
));
}
let related: Vec<String> = self
.recall(
&content,
2000,
false,
false,
Some(5),
"sdk",
"false",
false,
"off",
)
.map(|r| {
r.knowledge
.iter()
.filter_map(|c| c["id"].as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
let now = utc_now_iso();
let chunk_id = gen_uuid();
let tokens = estimate_tokens(&content) as i64;
let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
let (cvec, tvec, embed_ver, state_reason) = match (
self.embedding.embed_content(&content),
self.embedding.embed_trigger(trigger_str),
) {
(Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
_ => (
vec![],
vec![],
0i64,
"embedding_pending:target=active".to_string(),
),
};
let row = ChunkRow {
id: chunk_id.clone(),
content: content.clone(),
trigger_desc: trigger_clean.clone(),
anti_trigger_desc: anti_trigger_clean.clone(),
content_hash: h,
token_count: Some(tokens),
origin: "spark".to_string(),
maturity: Some("seed".to_string()),
related_ids: if related.is_empty() {
None
} else {
Some(related.join(","))
},
state: "active".to_string(),
state_reason: Some(state_reason),
confidence: 0.5,
version: 1,
embed_version: embed_ver,
created_at: now.clone(),
updated_at: now.clone(),
..Default::default()
};
self.storage.begin_immediate()?;
let result = (|| -> Result<()> {
self.storage.insert_chunk(&row)?;
if embed_ver > 0 {
self.storage
.insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
self.storage
.insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
}
self.storage.commit()
})();
if result.is_err() {
let _ = self.storage.rollback();
}
result?;
Ok(chunk_id)
}
pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
let chunk = self
.storage
.get_chunk(spark_id)?
.ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
return Err(InnateError::ChunkNotFound(spark_id.to_string()));
}
let current = chunk
.get("maturity")
.and_then(Value::as_str)
.unwrap_or("seed");
let valid_next: &[&str] = match current {
"seed" => &["sprouting"],
"sprouting" => &["incubating"],
_ => {
return Err(InnateError::InvalidState(format!(
"spark {spark_id} already {current}"
)))
}
};
if current == to {
return Ok(());
}
if !valid_next.contains(&to) {
return Err(InnateError::InvalidState(format!(
"invalid spark maturity transition: {current} -> {to}"
)));
}
let now = utc_now_iso();
self.storage.begin_immediate()?;
let result = self
.storage
.query_chunks_params(
"UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
rusqlite::params![to, now, spark_id],
)
.and_then(|_| self.storage.commit());
if result.is_err() {
let _ = self.storage.rollback();
}
result.map(|_| ())
}
pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
let spark = self
.storage
.get_chunk(spark_id)?
.ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
if spark.get("origin").and_then(Value::as_str) != Some("spark") {
return Err(InnateError::ChunkNotFound(spark_id.to_string()));
}
let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
if maturity == "promoted" || maturity == "dropped" {
return Err(InnateError::InvalidState(format!(
"spark {spark_id} already {maturity}"
)));
}
if !matches!(to, "note" | "skill") {
return Err(InnateError::InvalidState(format!(
"invalid spark promotion target: {to}"
)));
}
let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
let (content, action) = self.sanitize_content(content);
if action == SanitizeAction::Discard {
return Err(InnateError::InvalidState(
"sanitize discard on promote".into(),
));
}
let promoted_hash = content_hash(&content);
if self.storage.is_hash_invalidated(&promoted_hash)? {
return Err(InnateError::InvalidState(
"spark content hash is invalidated".into(),
));
}
let now = utc_now_iso();
let existing = self.storage.query_chunks_params(
"SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
rusqlite::params![promoted_hash],
)?;
if let Some(e) = existing.first() {
if let Some(id) = e.get("id").and_then(Value::as_str) {
let id = id.to_string();
self.storage.begin_immediate()?;
let result = self
.storage
.query_chunks_params(
"UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
rusqlite::params![now, spark_id],
)
.and_then(|_| self.storage.commit());
if result.is_err() {
let _ = self.storage.rollback();
result?;
}
return Ok(id);
}
}
let (state, conf, prot, origin, state_reason) = if to == "skill" {
("active", 0.85, 1, "installed", "init:installed")
} else {
("active", 0.60, 0, "captured", "init:captured")
};
let conf = if action == SanitizeAction::Redact {
0.4_f64
} else {
conf
};
let new_id = gen_uuid();
let trigger = spark.get("trigger_desc").and_then(Value::as_str);
let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
let row = ChunkRow {
id: new_id.clone(),
content: content.clone(),
trigger_desc: trigger.map(str::to_string),
anti_trigger_desc: anti.map(str::to_string),
content_hash: promoted_hash,
token_count: Some(estimate_tokens(&content) as i64),
origin: origin.to_string(),
source: Some("manual".to_string()),
protected: prot,
state: state.to_string(),
state_reason: Some(state_reason.to_string()),
confidence: conf,
confidence_reason: Some("manual_set".to_string()),
parent_id: Some(spark_id.to_string()),
version: 1,
embed_version: 1,
created_at: now.clone(),
updated_at: now.clone(),
..Default::default()
};
let cvec = self.embedding.embed_content(&content)?;
let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
self.storage.begin_immediate()?;
let result = (|| -> Result<()> {
self.storage.insert_chunk(&row)?;
self.storage
.insert_vec_content(&new_id, &pack_embedding(&cvec))?;
self.storage
.insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
self.storage.query_chunks_params(
"UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
rusqlite::params![now, spark_id],
)?;
self.storage.commit()
})();
if result.is_err() {
let _ = self.storage.rollback();
}
result?;
Ok(new_id)
}
pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
let spark = self
.storage
.get_chunk(spark_id)?
.ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
if spark.get("origin").and_then(Value::as_str) != Some("spark") {
return Err(InnateError::ChunkNotFound(spark_id.to_string()));
}
let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
if maturity == "promoted" {
return Err(InnateError::InvalidState(format!(
"spark {spark_id} already promoted"
)));
}
if maturity == "dropped" {
return Ok(());
}
let now = utc_now_iso();
let reason_str = if reason.is_empty() {
"dropped".to_string()
} else {
format!("dropped:{reason}")
};
self.storage.begin_immediate()?;
let result = self
.storage
.query_chunks_params(
"UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
rusqlite::params![reason_str, now, spark_id],
)
.and_then(|_| self.storage.commit());
if result.is_err() {
let _ = self.storage.rollback();
}
result.map(|_| ())
}
pub fn approve(&self, chunk_id: &str) -> Result<()> {
let chunk = self
.storage
.get_chunk(chunk_id)?
.ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
return Err(InnateError::InvalidState(
"spark lifecycle uses promote_spark() or invalidate()".into(),
));
}
if chunk.get("state").and_then(Value::as_str) == Some("active") {
return Ok(());
}
if chunk.get("state").and_then(Value::as_str) != Some("pending") {
return Err(InnateError::InvalidState(
"approve requires pending chunk".into(),
));
}
let now = utc_now_iso();
self.storage.begin_immediate()?;
let result = (|| -> Result<()> {
self.storage
.update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
self.storage.query_chunks_params(
"UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
rusqlite::params![now, chunk_id],
)?;
self.storage.commit()
})();
if result.is_err() {
let _ = self.storage.rollback();
}
result
}
pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
let chunk = self
.storage
.get_chunk(chunk_id)?
.ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
return Err(InnateError::InvalidState(
"spark lifecycle uses drop_spark() or invalidate()".into(),
));
}
let now = utc_now_iso();
self.storage.begin_immediate()?;
let result = self
.storage
.update_chunk_state(chunk_id, "archived", Some(reason), &now)
.and_then(|_| self.storage.commit());
if result.is_err() {
let _ = self.storage.rollback();
}
result
}
pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
let chunk = self
.storage
.get_chunk(chunk_id)?
.ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
let h = chunk
.get("content_hash")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let now = utc_now_iso();
let reason_str = if reason.is_empty() {
"invalidated".to_string()
} else {
format!("invalidated:{reason}")
};
self.storage.begin_immediate()?;
let result = (|| -> Result<()> {
self.storage.query_chunks_params(
"UPDATE chunks
SET state='archived', confidence=0.0, confidence_base=0.0,
confidence_reason='invalidated', state_reason=?,
state_updated_at=?, updated_at=?
WHERE id=?",
rusqlite::params![reason_str, now, now, chunk_id],
)?;
self.storage.query_chunks_params(
"UPDATE chunks
SET state='archived', confidence=0.0, confidence_base=0.0,
confidence_reason='invalidated',
state_reason='invalidated:same_hash',
state_updated_at=?, updated_at=?
WHERE content_hash=? AND id!=?",
rusqlite::params![now, now, h, chunk_id],
)?;
self.storage.conn_execute(
"DELETE FROM confidence_evidence
WHERE chunk_id IN (SELECT id FROM chunks WHERE content_hash=?)",
rusqlite::params![h],
)?;
self.storage
.insert_invalidated_hash(&h, Some(reason), &now)?;
self.storage.commit()
})();
if result.is_err() {
let _ = self.storage.rollback();
}
result
}
pub fn restore(&self, chunk_id: &str) -> Result<()> {
let chunk = self
.storage
.get_chunk(chunk_id)?
.ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
if state == "active" {
return Ok(());
}
if state != "archived" {
return Err(InnateError::InvalidState(
"restore requires archived chunk".into(),
));
}
let was_invalidated = chunk
.get("state_reason")
.and_then(Value::as_str)
.map(|r| r.starts_with("invalidated"))
.unwrap_or(false);
let h = chunk
.get("content_hash")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let now = utc_now_iso();
self.storage.begin_immediate()?;
let result = (|| -> Result<()> {
self.storage
.update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
if was_invalidated {
self.storage.query_chunks_params(
"DELETE FROM invalidated_hashes WHERE content_hash=?",
rusqlite::params![h],
)?;
}
self.storage.query_chunks_params(
"UPDATE chunks
SET confidence_base=0.5, confidence=0.5,
confidence_reason='restore',
selected_count=0, selected_count_base=0,
used_count=0, used_count_base=0,
used_success_count=0, used_success_count_base=0,
success_trace_ids_count=0,
last_used_at=NULL, last_used_base=NULL,
last_success_at=NULL, last_decayed_at=NULL,
evidence_cutoff_at=?, updated_at=?
WHERE id=?",
rusqlite::params![now, now, chunk_id],
)?;
self.storage.conn_execute(
"DELETE FROM confidence_evidence WHERE chunk_id=?",
rusqlite::params![chunk_id],
)?;
self.storage.conn_execute(
"DELETE FROM chunk_success_traces WHERE chunk_id=?",
rusqlite::params![chunk_id],
)?;
self.storage.conn_execute(
"DELETE FROM chunk_context_stats_base WHERE chunk_id=?",
rusqlite::params![chunk_id],
)?;
self.storage.conn_execute(
"DELETE FROM chunk_context_stats WHERE chunk_id=?",
rusqlite::params![chunk_id],
)?;
self.storage.conn_execute(
"UPDATE governance_proposals
SET state='rejected', reason=reason || '; restored by user', updated_at=?
WHERE chunk_id=? AND state IN ('pending','accepted')",
rusqlite::params![now, chunk_id],
)?;
self.storage.commit()
})();
if result.is_err() {
let _ = self.storage.rollback();
}
result
}
}