use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use surrealdb::sql::Thing;
use crate::knowledge::KnowledgeEntry;
use super::connection::normalize_datetime;
use super::{RecordId, SurrealConnection, SurrealDatabase};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(super) struct SurrealKnowledgeRecord {
pub id: String,
pub title: String,
#[serde(default)]
pub body: Option<String>,
#[serde(default)]
pub summary: Option<String>,
#[serde(default)]
pub file_path: Option<String>,
#[serde(default)]
pub content_hash: Option<String>,
#[serde(default)]
pub ephemeral: bool,
#[serde(default)]
pub owner: Option<String>,
#[serde(default = "default_visibility")]
pub visibility: String,
pub category_id: String,
#[serde(default)]
pub source_type_id: Option<String>,
#[serde(default)]
pub entry_type_id: Option<String>,
#[serde(default)]
pub content_type_id: Option<String>,
#[serde(default)]
pub source_project_id: Option<String>,
#[serde(default)]
pub source_agent_id: Option<String>,
#[serde(default)]
pub session_id: Option<String>,
#[serde(default)]
pub created_at: Option<String>,
#[serde(default)]
pub updated_at: Option<String>,
#[serde(default)]
pub resonance: i32,
#[serde(default)]
pub resonance_type: Option<String>,
#[serde(default)]
pub last_activated: Option<String>,
#[serde(default)]
pub activation_count: i32,
#[serde(default)]
pub decay_rate: f64,
#[serde(default)]
pub anchors: Vec<String>,
#[serde(default)]
pub wake_phrases: Vec<String>,
#[serde(default)]
pub triggers: Vec<String>,
#[serde(default)]
pub wake_order: Option<i32>,
#[serde(default)]
pub wake_phrase: Option<String>,
#[serde(default)]
pub embedding: Option<Vec<f32>>,
#[serde(default)]
pub embedding_model: Option<String>,
#[serde(default)]
pub embedded_at: Option<String>,
#[serde(default)]
pub chunk_count: i32,
#[serde(default = "default_format")]
pub format: String,
}
fn default_visibility() -> String {
"public".to_string()
}
fn default_format() -> String {
"markdown".to_string()
}
impl SurrealKnowledgeRecord {
pub fn into_knowledge_entry(
self,
tags: Vec<String>,
applicability: Vec<String>,
) -> KnowledgeEntry {
KnowledgeEntry {
id: format!("kn-{}", self.id),
category_id: self.category_id,
title: self.title,
body: self.body,
summary: self.summary,
file_path: self.file_path,
content_hash: self.content_hash,
ephemeral: self.ephemeral,
owner: self.owner,
visibility: self.visibility,
source_type_id: self.source_type_id,
entry_type_id: self.entry_type_id,
content_type_id: self.content_type_id,
source_project_id: self.source_project_id,
source_agent_id: self.source_agent_id,
session_id: self.session_id,
created_at: self.created_at,
updated_at: self.updated_at,
tags,
applicability,
resonance: self.resonance,
resonance_type: self.resonance_type,
last_activated: self.last_activated,
activation_count: self.activation_count,
decay_rate: self.decay_rate,
anchors: self.anchors,
wake_phrases: self.wake_phrases,
triggers: self.triggers,
wake_order: self.wake_order,
wake_phrase: self.wake_phrase,
embedding: self.embedding,
embedding_model: self.embedding_model,
embedded_at: self.embedded_at,
chunk_count: self.chunk_count,
format: self.format,
effective_resonance: None,
}
}
}
impl SurrealDatabase {
pub(super) fn knowledge_select_fields() -> &'static str {
"meta::id(id) AS id, title, body, summary, file_path, content_hash, ephemeral,
owner, visibility,
meta::id(category) AS category_id,
meta::id(source_type) AS source_type_id,
meta::id(entry_type) AS entry_type_id,
meta::id(content_type) AS content_type_id,
IF source_project THEN meta::id(source_project) ELSE null END AS source_project_id,
IF source_agent THEN meta::id(source_agent) ELSE null END AS source_agent_id,
IF session THEN meta::id(session) ELSE null END AS session_id,
<string>created_at AS created_at, <string>updated_at AS updated_at,
IF resonance THEN resonance ELSE 0 END AS resonance,
IF resonance_type THEN <string>resonance_type ELSE null END AS resonance_type,
IF last_activated THEN <string>last_activated ELSE null END AS last_activated,
IF activation_count THEN activation_count ELSE 0 END AS activation_count,
IF decay_rate THEN decay_rate ELSE 0.0 END AS decay_rate,
IF anchors THEN anchors ELSE [] END AS anchors,
IF wake_phrases THEN wake_phrases ELSE [] END AS wake_phrases,
IF triggers THEN triggers ELSE [] END AS triggers,
IF wake_order THEN wake_order ELSE null END AS wake_order,
IF wake_phrase THEN wake_phrase ELSE null END AS wake_phrase,
IF embedding THEN embedding ELSE null END AS embedding,
IF embedding_model THEN embedding_model ELSE null END AS embedding_model,
IF embedded_at THEN <string>embedded_at ELSE null END AS embedded_at,
IF chunk_count THEN chunk_count ELSE 0 END AS chunk_count,
IF format THEN format ELSE 'markdown' END AS format"
}
pub(super) fn build_visibility_filter(
ctx: &crate::store::AgentContext,
) -> (String, Option<String>) {
if ctx.include_private {
if let Some(ref agent) = ctx.agent_id {
(
"AND ((visibility = 'public') OR (visibility = 'private' AND owner = $current_agent))".to_string(),
Some(agent.clone())
)
} else {
("AND (visibility = 'public')".to_string(), None)
}
} else {
("AND (visibility = 'public')".to_string(), None)
}
}
pub(super) fn effective_resonance_expr() -> &'static str {
"IF resonance_type IN ['foundational', 'transformative'] THEN resonance \
ELSE resonance * math::pow(\
IF resonance <= 3 THEN 0.90 \
ELSE IF resonance <= 5 THEN 0.95 \
ELSE 0.975 \
END, \
duration::days(time::now() - (last_activated ?? created_at)) / 7.0\
) \
END"
}
fn compute_effective_resonance(entry: &KnowledgeEntry) -> f64 {
let resonance = entry.resonance as f64;
if let Some(ref rtype) = entry.resonance_type
&& (rtype == "foundational" || rtype == "transformative")
{
return resonance;
}
let reference_ts = entry
.last_activated
.as_deref()
.or(entry.created_at.as_deref());
let weeks_elapsed = match reference_ts {
Some(ts) => {
if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(ts) {
let elapsed = chrono::Utc::now() - dt.to_utc();
elapsed.num_seconds() as f64 / (7.0 * 86400.0)
} else {
0.0
}
}
None => 0.0,
};
let decay_base: f64 = if entry.resonance <= 3 {
0.90
} else if entry.resonance <= 5 {
0.95
} else {
0.975
};
resonance * decay_base.powf(weeks_elapsed)
}
pub(super) fn build_resonance_filter(filter: &crate::store::KnowledgeFilter) -> String {
let effective_resonance_expr = Self::effective_resonance_expr();
let mut clauses = Vec::new();
if let Some(min) = filter.min_resonance {
clauses.push(format!("({}) >= {}", effective_resonance_expr, min));
}
if let Some(max) = filter.max_resonance {
clauses.push(format!("({}) <= {}", effective_resonance_expr, max));
}
if clauses.is_empty() {
String::new()
} else {
format!("AND ({})", clauses.join(" AND "))
}
}
fn is_valid_category_name(name: &str) -> bool {
!name.is_empty()
&& name.len() <= 64
&& name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
}
pub(super) fn build_category_filter(filter: &crate::store::KnowledgeFilter) -> String {
match &filter.categories {
Some(cats) if !cats.is_empty() => {
let valid_cats: Vec<&String> = cats
.iter()
.filter(|c| Self::is_valid_category_name(c))
.collect();
if valid_cats.is_empty() {
return String::new();
}
if valid_cats.len() == 1 {
format!(
"AND category = type::thing('category', '{}')",
valid_cats[0]
)
} else {
let quoted: Vec<String> = valid_cats
.iter()
.map(|c| format!("type::thing('category', '{}')", c))
.collect();
format!("AND category IN [{}]", quoted.join(", "))
}
}
_ => String::new(),
}
}
pub fn upsert_knowledge_internal(&self, entry: &KnowledgeEntry) -> Result<RecordId> {
Self::runtime().block_on(self.upsert_knowledge_async(entry))
}
async fn upsert_knowledge_async(&self, entry: &KnowledgeEntry) -> Result<RecordId> {
let id_part = entry.id.strip_prefix("kn-").unwrap_or(&entry.id);
let record_id = RecordId::new("knowledge", id_part);
let mut query = "UPSERT type::thing('knowledge', $id) SET
title = $title,
body = $body,
summary = $summary,
file_path = $file_path,
content_hash = $content_hash,
ephemeral = $ephemeral,
owner = $owner,
visibility = $visibility,
category = type::thing('category', $category_id),
source_type = type::thing('source_type', $source_type_id),
entry_type = type::thing('entry_type', $entry_type_id),
content_type = type::thing('content_type', $content_type_id),
resonance = $resonance,
resonance_type = $resonance_type,
activation_count = $activation_count,
decay_rate = $decay_rate,
anchors = $anchors,
wake_phrases = $wake_phrases,
triggers = $triggers,
wake_order = $wake_order,
wake_phrase = $wake_phrase,
embedding = $embedding,
embedding_model = $embedding_model,
chunk_count = $chunk_count,
format = $format"
.to_string();
if entry.source_project_id.is_some() {
query.push_str(", source_project = type::thing('project', $source_project_id)");
}
if entry.source_agent_id.is_some() {
query.push_str(", source_agent = type::thing('agent', $source_agent_id)");
}
if entry.session_id.is_some() {
query.push_str(", session = type::thing('session', $session_id)");
}
if entry.created_at.is_some() {
query.push_str(", created_at = <datetime>$created_at");
}
if entry.updated_at.is_some() {
query.push_str(", updated_at = <datetime>$updated_at");
}
if entry.last_activated.is_some() {
query.push_str(", last_activated = <datetime>$last_activated");
}
if entry.embedded_at.is_some() {
query.push_str(", embedded_at = <datetime>$embedded_at");
}
let mut response = with_db!(self, db, {
let mut q = db
.query(&query)
.bind(("id", id_part.to_string()))
.bind(("title", entry.title.clone()))
.bind(("body", entry.body.clone()))
.bind(("summary", entry.summary.clone()))
.bind(("file_path", entry.file_path.clone()))
.bind((
"content_hash",
entry.content_hash.clone().unwrap_or_default(),
))
.bind(("ephemeral", entry.ephemeral))
.bind(("owner", entry.owner.clone()))
.bind(("visibility", entry.visibility.clone()))
.bind(("category_id", entry.category_id.clone()))
.bind((
"source_type_id",
entry
.source_type_id
.clone()
.unwrap_or_else(|| "manual".to_string()),
))
.bind((
"entry_type_id",
entry
.entry_type_id
.clone()
.unwrap_or_else(|| "primary".to_string()),
))
.bind((
"content_type_id",
entry
.content_type_id
.clone()
.unwrap_or_else(|| "text".to_string()),
))
.bind(("resonance", entry.resonance))
.bind(("resonance_type", entry.resonance_type.clone()))
.bind(("activation_count", entry.activation_count))
.bind(("decay_rate", entry.decay_rate))
.bind(("anchors", entry.anchors.clone()))
.bind(("wake_phrases", entry.wake_phrases.clone()))
.bind((
"triggers",
crate::knowledge::normalize_triggers(entry.triggers.iter()),
))
.bind(("wake_order", entry.wake_order))
.bind(("wake_phrase", entry.wake_phrase.clone()))
.bind(("embedding", entry.embedding.clone()))
.bind(("embedding_model", entry.embedding_model.clone()))
.bind(("chunk_count", entry.chunk_count))
.bind(("format", entry.format.clone()));
if let Some(ref proj) = entry.source_project_id {
q = q.bind(("source_project_id", proj.clone()));
}
if let Some(ref agent) = entry.source_agent_id {
q = q.bind(("source_agent_id", agent.clone()));
}
if let Some(ref sess) = entry.session_id {
q = q.bind(("session_id", sess.clone()));
}
if let Some(ref created) = entry.created_at {
q = q.bind(("created_at", normalize_datetime(created)));
}
if let Some(ref updated) = entry.updated_at {
q = q.bind(("updated_at", normalize_datetime(updated)));
}
if let Some(ref activated) = entry.last_activated {
q = q.bind(("last_activated", normalize_datetime(activated)));
}
if let Some(ref embedded) = entry.embedded_at {
q = q.bind(("embedded_at", normalize_datetime(embedded)));
}
q.await.context("Failed to upsert knowledge record")
})?;
let errors = response.take_errors();
if !errors.is_empty() {
return Err(anyhow::anyhow!("SurrealDB returned errors: {:?}", errors));
}
let mut tag_delete_response = with_db!(self, db, {
db.query("DELETE tagged_with WHERE in = $knowledge")
.bind(("knowledge", record_id.0.clone()))
.await
.context("Failed to clear existing tags")
})?;
let tag_delete_errors = tag_delete_response.take_errors();
if !tag_delete_errors.is_empty() {
return Err(anyhow::anyhow!(
"SurrealDB returned errors: {:?}",
tag_delete_errors
));
}
for tag_name in &entry.tags {
let mut tag_response = with_db!(self, db, {
db.query("UPSERT type::thing('tag', $tag_id) SET name = $tag_name")
.bind(("tag_id", tag_name.clone()))
.bind(("tag_name", tag_name.clone()))
.await
.context("Failed to create tag")
})?;
let tag_errors = tag_response.take_errors();
if !tag_errors.is_empty() {
return Err(anyhow::anyhow!("Failed to create tag: {:?}", tag_errors));
}
let tag_id = RecordId::new("tag", tag_name);
let mut tag_edge_response = with_db!(self, db, {
db.query("RELATE $knowledge->tagged_with->$tag")
.bind(("knowledge", record_id.0.clone()))
.bind(("tag", tag_id.0.clone()))
.await
.context("Failed to create tag edge")
})?;
let tag_edge_errors = tag_edge_response.take_errors();
if !tag_edge_errors.is_empty() {
return Err(anyhow::anyhow!(
"SurrealDB returned errors: {:?}",
tag_edge_errors
));
}
}
let mut app_delete_response = with_db!(self, db, {
db.query("DELETE applies_to WHERE in = $knowledge")
.bind(("knowledge", record_id.0.clone()))
.await
.context("Failed to clear existing applicability")
})?;
let app_delete_errors = app_delete_response.take_errors();
if !app_delete_errors.is_empty() {
return Err(anyhow::anyhow!(
"SurrealDB returned errors: {:?}",
app_delete_errors
));
}
for app_type in &entry.applicability {
let mut app_type_response = with_db!(self, db, {
db.query("UPSERT type::thing('applicability_type', $app_type_id) SET description = $app_type_desc")
.bind(("app_type_id", app_type.clone()))
.bind(("app_type_desc", format!("Applicability: {}", app_type)))
.await
.context("Failed to create applicability_type")
})?;
let app_type_errors = app_type_response.take_errors();
if !app_type_errors.is_empty() {
return Err(anyhow::anyhow!(
"Failed to create applicability_type: {:?}",
app_type_errors
));
}
let app_id = RecordId::new("applicability_type", app_type);
let mut app_edge_response = with_db!(self, db, {
db.query("RELATE $knowledge->applies_to->$app_type")
.bind(("knowledge", record_id.0.clone()))
.bind(("app_type", app_id.0.clone()))
.await
.context("Failed to create applicability edge")
})?;
let app_edge_errors = app_edge_response.take_errors();
if !app_edge_errors.is_empty() {
return Err(anyhow::anyhow!(
"SurrealDB returned errors: {:?}",
app_edge_errors
));
}
}
Ok(record_id)
}
pub fn get_knowledge(
&self,
id: &str,
ctx: &crate::store::AgentContext,
) -> Result<Option<KnowledgeEntry>> {
Self::runtime().block_on(self.get_knowledge_async(id, ctx))
}
async fn get_knowledge_async(
&self,
id: &str,
ctx: &crate::store::AgentContext,
) -> Result<Option<KnowledgeEntry>> {
let id_part = id.strip_prefix("kn-").unwrap_or(id);
let (visibility_clause, current_agent) = Self::build_visibility_filter(ctx);
let sql = format!(
"SELECT {}
FROM knowledge
WHERE meta::id(id) = $id {}",
Self::knowledge_select_fields(),
visibility_clause
);
let mut response = with_db!(self, db, {
let mut query = db.query(&sql).bind(("id", id_part.to_string()));
if let Some(agent) = current_agent {
query = query.bind(("current_agent", agent));
}
query.await.context("Failed to query knowledge record")
})?;
let records: Vec<SurrealKnowledgeRecord> = response.take(0)?;
if records.is_empty() {
return Ok(None);
}
let record = records.into_iter().next().unwrap();
let tags = self
.get_tags_for_entry_async(&format!("kn-{}", record.id))
.await?;
let applicability = self
.get_applicability_for_entry_async(&format!("kn-{}", record.id))
.await?;
Ok(Some(record.into_knowledge_entry(tags, applicability)))
}
pub fn delete_knowledge(&self, id: &str, ctx: &crate::store::AgentContext) -> Result<bool> {
Self::runtime().block_on(self.delete_knowledge_async(id, ctx))
}
async fn delete_knowledge_async(
&self,
id: &str,
ctx: &crate::store::AgentContext,
) -> Result<bool> {
let id_part = id.strip_prefix("kn-").unwrap_or(id);
let (visibility_clause, current_agent) = Self::build_visibility_filter(ctx);
let check_sql = format!(
"SELECT count() AS c FROM knowledge WHERE meta::id(id) = $id {} GROUP ALL",
visibility_clause
);
let mut check_response = with_db!(self, db, {
let mut query = db.query(&check_sql).bind(("id", id_part.to_string()));
if let Some(ref agent) = current_agent {
query = query.bind(("current_agent", agent.clone()));
}
query
.await
.context("Failed to check knowledge record existence")
})?;
let count_results: Vec<serde_json::Value> = check_response.take(0)?;
let exists = count_results
.first()
.and_then(|v| v["c"].as_i64())
.unwrap_or(0)
> 0;
if !exists {
return Ok(false);
}
let delete_sql = format!(
"DELETE FROM knowledge WHERE meta::id(id) = $id {}",
visibility_clause
);
let mut response = with_db!(self, db, {
let mut query = db.query(&delete_sql).bind(("id", id_part.to_string()));
if let Some(ref agent) = current_agent {
query = query.bind(("current_agent", agent.clone()));
}
query.await.context("Failed to delete knowledge record")
})?;
let errors = response.take_errors();
if !errors.is_empty() {
return Err(anyhow::anyhow!("Delete failed: {:?}", errors));
}
let full_entry_id = format!("kn-{}", id_part);
self.delete_embedding_chunks_async(&full_entry_id)
.await
.ok();
Ok(true)
}
pub fn search_knowledge(
&self,
query: &str,
ctx: &crate::store::AgentContext,
filter: &crate::store::KnowledgeFilter,
) -> Result<Vec<KnowledgeEntry>> {
Self::runtime().block_on(self.search_knowledge_async(query, ctx, filter))
}
async fn search_knowledge_async(
&self,
query: &str,
ctx: &crate::store::AgentContext,
filter: &crate::store::KnowledgeFilter,
) -> Result<Vec<KnowledgeEntry>> {
let query_owned = query.to_string();
let (visibility_clause, current_agent) = Self::build_visibility_filter(ctx);
let resonance_clause = Self::build_resonance_filter(filter);
let category_clause = Self::build_category_filter(filter);
let sql = format!(
"SELECT {}
FROM knowledge
WHERE (title @@ $query OR body @@ $query OR summary @@ $query) {} {} {}",
Self::knowledge_select_fields(),
visibility_clause,
resonance_clause,
category_clause
);
let mut response = with_db!(self, db, {
let mut query_builder = db.query(&sql).bind(("query", query_owned));
if let Some(agent) = current_agent {
query_builder = query_builder.bind(("current_agent", agent));
}
query_builder
.await
.context("Failed to execute search query")
})?;
let results: Vec<serde_json::Value> =
response.take(0).context("Failed to parse search results")?;
let mut entries = Vec::new();
for obj in results {
entries.push(self.value_to_knowledge_entry(obj).await?);
}
Ok(entries)
}
pub fn semantic_search_knowledge(
&self,
query_embedding: &[f32],
ctx: &crate::store::AgentContext,
filter: &crate::store::KnowledgeFilter,
limit: usize,
) -> Result<Vec<KnowledgeEntry>> {
Ok(self
.semantic_search_knowledge_scored(query_embedding, ctx, filter, limit)?
.into_iter()
.map(|(entry, _score)| entry)
.collect())
}
pub fn semantic_search_knowledge_scored(
&self,
query_embedding: &[f32],
ctx: &crate::store::AgentContext,
filter: &crate::store::KnowledgeFilter,
limit: usize,
) -> Result<Vec<(KnowledgeEntry, f32)>> {
Self::runtime().block_on(self.semantic_search_knowledge_async(
query_embedding,
ctx,
filter,
limit,
))
}
async fn semantic_search_knowledge_async(
&self,
query_embedding: &[f32],
ctx: &crate::store::AgentContext,
filter: &crate::store::KnowledgeFilter,
limit: usize,
) -> Result<Vec<(KnowledgeEntry, f32)>> {
let (visibility_clause, current_agent) = Self::build_visibility_filter(ctx);
let resonance_clause = Self::build_resonance_filter(filter);
let category_clause = Self::build_category_filter(filter);
let unchunked_sql = format!(
"SELECT {}, vector::similarity::cosine(embedding, $query_vec) AS score
FROM knowledge
WHERE embedding IS NOT NONE AND (chunk_count IS NONE OR chunk_count <= 0) {} {} {}
ORDER BY score DESC
LIMIT $limit",
Self::knowledge_select_fields(),
visibility_clause,
resonance_clause,
category_clause
);
let chunk_sql =
"SELECT entry_id, vector::similarity::cosine(embedding, $query_vec) AS score
FROM embedding_chunk
ORDER BY score DESC
LIMIT $chunk_limit";
let chunk_limit = limit * 3;
let mut response = with_db!(self, db, {
let mut query_builder = db
.query(&unchunked_sql)
.query(chunk_sql)
.bind(("query_vec", query_embedding.to_vec()))
.bind(("limit", limit))
.bind(("chunk_limit", chunk_limit));
if let Some(agent) = current_agent.clone() {
query_builder = query_builder.bind(("current_agent", agent));
}
query_builder
.await
.context("Failed to execute semantic search query")
})?;
let unchunked_results: Vec<serde_json::Value> = response
.take(0)
.context("Failed to parse unchunked search results")?;
let chunk_results: Vec<serde_json::Value> = response
.take(1)
.context("Failed to parse chunk search results")?;
let mut scored_entries: std::collections::HashMap<String, (f32, Option<KnowledgeEntry>)> =
std::collections::HashMap::new();
for obj in unchunked_results {
let entry = self.value_to_knowledge_entry(obj.clone()).await?;
let score = obj["score"].as_f64().unwrap_or(0.0) as f32;
scored_entries.insert(entry.id.clone(), (score, Some(entry)));
}
let mut chunk_scores: std::collections::HashMap<String, f32> =
std::collections::HashMap::new();
for obj in &chunk_results {
let entry_id = obj["entry_id"].as_str().unwrap_or_default().to_string();
let score = obj["score"].as_f64().unwrap_or(0.0) as f32;
let current = chunk_scores.entry(entry_id).or_insert(0.0f32);
if score > *current {
*current = score;
}
}
for (entry_id, score) in &chunk_scores {
if scored_entries.contains_key(entry_id) {
if let Some((existing_score, _)) = scored_entries.get_mut(entry_id)
&& *score > *existing_score
{
*existing_score = *score;
}
continue;
}
if let Some(entry) = self.get_knowledge_async(entry_id, ctx).await? {
let effective = Self::compute_effective_resonance(&entry);
if let Some(min) = filter.min_resonance
&& effective < min as f64
{
continue;
}
if let Some(max) = filter.max_resonance
&& effective > max as f64
{
continue;
}
if let Some(cats) = &filter.categories
&& !cats.is_empty()
&& !cats.contains(&entry.category_id)
{
continue;
}
scored_entries.insert(entry_id.clone(), (*score, Some(entry)));
}
}
let mut sorted: Vec<(f32, KnowledgeEntry)> = scored_entries
.into_values()
.filter_map(|(score, entry)| entry.map(|e| (score, e)))
.collect();
sorted.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
sorted.truncate(limit);
Ok(sorted
.into_iter()
.map(|(score, entry)| (entry, score))
.collect())
}
pub fn semantic_search_entries_scored(
&self,
query_embedding: &[f32],
ctx: &crate::store::AgentContext,
limit: usize,
) -> Result<Vec<(KnowledgeEntry, f32)>> {
Self::runtime().block_on(self.semantic_search_entries_scored_async(
query_embedding,
ctx,
limit,
))
}
async fn semantic_search_entries_scored_async(
&self,
query_embedding: &[f32],
ctx: &crate::store::AgentContext,
limit: usize,
) -> Result<Vec<(KnowledgeEntry, f32)>> {
let (visibility_clause, current_agent) = Self::build_visibility_filter(ctx);
let sql = format!(
"SELECT {}, vector::similarity::cosine(embedding, $query_vec) AS score
FROM knowledge
WHERE embedding IS NOT NONE {}
ORDER BY score DESC
LIMIT $limit",
Self::knowledge_select_fields(),
visibility_clause,
);
let mut response = with_db!(self, db, {
let mut query_builder = db
.query(&sql)
.bind(("query_vec", query_embedding.to_vec()))
.bind(("limit", limit));
if let Some(agent) = current_agent.clone() {
query_builder = query_builder.bind(("current_agent", agent));
}
query_builder
.await
.context("Failed to execute entry-level scored search query")
})?;
let results: Vec<serde_json::Value> = response
.take(0)
.context("Failed to parse entry-level scored search results")?;
let mut scored = Vec::with_capacity(results.len());
for obj in results {
let score = obj["score"].as_f64().unwrap_or(0.0) as f32;
let entry = self.value_to_knowledge_entry(obj).await?;
scored.push((entry, score));
}
Ok(scored)
}
pub(super) async fn value_to_knowledge_entry(
&self,
obj: serde_json::Value,
) -> Result<KnowledgeEntry> {
let id_str = obj["id"].as_str().unwrap_or_default();
let id = format!("kn-{}", id_str);
let category_id = obj["category_id"].as_str().unwrap_or_default().to_string();
let source_project_id = obj
.get("source_project_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let source_agent_id = obj
.get("source_agent_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let session_id = obj
.get("session_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let source_type_id = obj
.get("source_type_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let entry_type_id = obj
.get("entry_type_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let content_type_id = obj
.get("content_type_id")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let knowledge_thing = Thing::from(("knowledge", id_str));
let mut tags_response = with_db!(self, db, {
db.query("SELECT VALUE out.name FROM tagged_with WHERE in = $knowledge")
.bind(("knowledge", knowledge_thing.clone()))
.await
.context("Failed to query tags")
})?;
let tags: Vec<String> = tags_response.take(0).unwrap_or_default();
let mut app_response = with_db!(self, db, {
db.query("SELECT VALUE meta::id(out) FROM applies_to WHERE in = $knowledge")
.bind(("knowledge", knowledge_thing))
.await
.context("Failed to query applicability")
})?;
let applicability_raw: Vec<Thing> = app_response.take(0).unwrap_or_default();
let applicability: Vec<String> = applicability_raw
.into_iter()
.map(|t| t.id.to_string())
.collect();
Ok(KnowledgeEntry {
id,
category_id,
title: serde_json::from_value(obj["title"].clone()).unwrap_or_default(),
body: serde_json::from_value(obj["body"].clone()).ok(),
summary: serde_json::from_value(obj["summary"].clone()).ok(),
file_path: serde_json::from_value(obj["file_path"].clone()).ok(),
content_hash: serde_json::from_value(obj["content_hash"].clone()).ok(),
ephemeral: serde_json::from_value(obj["ephemeral"].clone()).unwrap_or(false),
created_at: serde_json::from_value(obj["created_at"].clone()).ok(),
updated_at: serde_json::from_value(obj["updated_at"].clone()).ok(),
tags,
applicability,
source_project_id,
source_agent_id,
source_type_id,
entry_type_id,
content_type_id,
session_id,
owner: serde_json::from_value(obj["owner"].clone()).ok(),
visibility: serde_json::from_value(obj["visibility"].clone())
.unwrap_or_else(|_| "public".to_string()),
resonance: serde_json::from_value(obj["resonance"].clone()).unwrap_or(0),
resonance_type: serde_json::from_value(obj["resonance_type"].clone()).ok(),
last_activated: serde_json::from_value(obj["last_activated"].clone()).ok(),
activation_count: serde_json::from_value(obj["activation_count"].clone()).unwrap_or(0),
decay_rate: serde_json::from_value(obj["decay_rate"].clone()).unwrap_or(0.0),
anchors: serde_json::from_value(obj["anchors"].clone()).unwrap_or_default(),
wake_phrases: serde_json::from_value(obj["wake_phrases"].clone()).unwrap_or_default(),
triggers: serde_json::from_value(obj["triggers"].clone()).unwrap_or_default(),
wake_order: serde_json::from_value(obj["wake_order"].clone()).ok(),
wake_phrase: serde_json::from_value(obj["wake_phrase"].clone()).ok(),
embedding: serde_json::from_value(obj["embedding"].clone()).ok(),
embedding_model: serde_json::from_value(obj["embedding_model"].clone()).ok(),
embedded_at: serde_json::from_value(obj["embedded_at"].clone()).ok(),
chunk_count: serde_json::from_value(obj["chunk_count"].clone()).unwrap_or(0),
format: serde_json::from_value(obj["format"].clone())
.unwrap_or_else(|_| "markdown".to_string()),
effective_resonance: obj.get("effective_resonance").and_then(|v| v.as_f64()),
})
}
pub fn delete_embedding_chunks(&self, entry_id: &str) -> Result<()> {
Self::runtime().block_on(self.delete_embedding_chunks_async(entry_id))
}
async fn delete_embedding_chunks_async(&self, entry_id: &str) -> Result<()> {
let mut response = with_db!(self, db, {
db.query("DELETE FROM embedding_chunk WHERE entry_id = $entry_id")
.bind(("entry_id", entry_id.to_string()))
.await
.context("Failed to delete embedding chunks")
})?;
let errors = response.take_errors();
if !errors.is_empty() {
return Err(anyhow::anyhow!(
"Failed to delete embedding chunks: {:?}",
errors
));
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn insert_embedding_chunk(
&self,
entry_id: &str,
chunk_index: usize,
chunk_text: &str,
token_offset: usize,
token_count: usize,
embedding: &[f32],
model_id: &str,
) -> Result<()> {
Self::runtime().block_on(self.insert_embedding_chunk_async(
entry_id,
chunk_index,
chunk_text,
token_offset,
token_count,
embedding,
model_id,
))
}
#[allow(clippy::too_many_arguments)]
async fn insert_embedding_chunk_async(
&self,
entry_id: &str,
chunk_index: usize,
chunk_text: &str,
token_offset: usize,
token_count: usize,
embedding: &[f32],
model_id: &str,
) -> Result<()> {
let chunk_id = format!("{}_{}", entry_id, chunk_index);
let sql = "UPSERT type::thing('embedding_chunk', $chunk_id) SET
entry_id = $entry_id,
chunk_index = $chunk_index,
chunk_text = $chunk_text,
token_offset = $token_offset,
token_count = $token_count,
embedding = $embedding,
embedding_model = $embedding_model";
let mut response = with_db!(self, db, {
db.query(sql)
.bind(("chunk_id", chunk_id))
.bind(("entry_id", entry_id.to_string()))
.bind(("chunk_index", chunk_index as i64))
.bind(("chunk_text", chunk_text.to_string()))
.bind(("token_offset", token_offset as i64))
.bind(("token_count", token_count as i64))
.bind(("embedding", embedding.to_vec()))
.bind(("embedding_model", model_id.to_string()))
.await
.context("Failed to insert embedding chunk")
})?;
let errors = response.take_errors();
if !errors.is_empty() {
return Err(anyhow::anyhow!(
"Failed to insert embedding chunk: {:?}",
errors
));
}
Ok(())
}
pub fn semantic_search_chunks(
&self,
query_embedding: &[f32],
limit: usize,
) -> Result<Vec<(String, f32)>> {
Self::runtime().block_on(self.semantic_search_chunks_async(query_embedding, limit))
}
async fn semantic_search_chunks_async(
&self,
query_embedding: &[f32],
limit: usize,
) -> Result<Vec<(String, f32)>> {
let sql = "SELECT entry_id, vector::similarity::cosine(embedding, $query_vec) AS score
FROM embedding_chunk
ORDER BY score DESC
LIMIT $limit";
let mut response = with_db!(self, db, {
db.query(sql)
.bind(("query_vec", query_embedding.to_vec()))
.bind(("limit", limit))
.await
.context("Failed to search embedding chunks")
})?;
let results: Vec<serde_json::Value> = response
.take(0)
.context("Failed to parse chunk search results")?;
let mut pairs = Vec::new();
for obj in results {
let entry_id = obj["entry_id"].as_str().unwrap_or_default().to_string();
let score = obj["score"].as_f64().unwrap_or(0.0) as f32;
pairs.push((entry_id, score));
}
Ok(pairs)
}
#[cfg(test)]
pub(crate) fn test_exec(&self, sql: &str) -> Result<()> {
Self::runtime().block_on(async {
let mut response = with_db!(self, db, {
db.query(sql).await.context("test_exec query failed")
})?;
let errors = response.take_errors();
if !errors.is_empty() {
return Err(anyhow::anyhow!("test_exec statement errors: {:?}", errors));
}
Ok(())
})
}
#[cfg(test)]
pub(crate) fn test_raw_chunk_count(&self, entry_id: &str) -> Result<Option<i64>> {
let id_part = entry_id.strip_prefix("kn-").unwrap_or(entry_id).to_string();
Self::runtime().block_on(async {
let mut response = with_db!(self, db, {
db.query("SELECT chunk_count FROM type::thing('knowledge', $id)")
.bind(("id", id_part.clone()))
.await
.context("test_raw_chunk_count query failed")
})?;
let rows: Vec<serde_json::Value> = response
.take(0)
.context("test_raw_chunk_count parse failed")?;
let row = match rows.first() {
Some(r) => r,
None => return Ok(None),
};
match row.get("chunk_count") {
Some(v) if v.is_null() => Ok(None),
Some(v) => Ok(v.as_i64()),
None => Ok(None),
}
})
}
}