1pub(crate) mod matching;
20pub(crate) mod schema;
21pub(crate) mod vamana;
22
23use std::collections::{HashMap, HashSet};
24
25use chrono::Utc;
26use khive_fold::{GreedySelector, Selector, SelectorInput, SelectorWeights};
27use khive_runtime::{KhiveRuntime, NamespaceToken, RuntimeError};
28use khive_score::DeterministicScore;
29use khive_storage::types::{SqlStatement, SqlValue};
30use khive_types::SubstrateKind;
31use serde_json::{json, Value};
32use uuid::Uuid;
33
34use crate::knowledge::schema::{
35 AdjudicateParams, Atom, ChallengeParams, ComposeParams, DeleteAtomsParams, Domain, EditParams,
36 FoldCandidate, FoldParams, GetParams, ImportParams, IndexParams, ListParams, SearchParams,
37 Section, SectionType, SuggestParams, UpsertAtomsParams, UpsertDomainsParams,
38};
39
40const D_W_EXACT_NAME: f32 = 5.0;
43const D_W_NAME: f32 = 3.0;
44const D_W_DESCRIPTION: f32 = 1.5;
45const D_W_TAGS: f32 = 1.25;
46const D_W_CONTENT: f32 = 1.0;
47const D_EXPAND_DISCOUNT: f32 = 0.35;
48const D_COVERAGE_ALPHA: f32 = 0.5;
49const D_W_BIGRAM: f32 = 2.0;
50
51const CANDIDATE_POOL: usize = 2000;
52const MIN_TERM_LEN: usize = 3;
53const EMBED_BATCH: usize = 32;
54const MAX_EMBED_BYTES: usize = 32_768;
55
56static STOP_WORDS: &[&str] = &[
57 "and", "are", "also", "but", "can", "did", "does", "for", "from", "had", "has", "have", "its",
58 "just", "may", "not", "our", "out", "than", "that", "the", "then", "this", "was", "were",
59 "will", "with",
60];
61
62fn is_stop(w: &str) -> bool {
63 STOP_WORDS.contains(&w)
64}
65
66fn sql_err(ctx: &str, e: impl std::fmt::Display) -> RuntimeError {
69 RuntimeError::Internal(format!("{ctx}: {e}"))
70}
71
72fn deser<T: serde::de::DeserializeOwned>(params: Value) -> Result<T, RuntimeError> {
73 serde_json::from_value(params)
74 .map_err(|e| RuntimeError::InvalidInput(format!("bad params: {e}")))
75}
76
77fn now_us() -> i64 {
80 Utc::now().timestamp_micros()
81}
82
83fn new_id() -> String {
84 Uuid::new_v4().to_string()
85}
86
87fn tags_to_json(tags: Option<&Vec<String>>) -> String {
88 match tags {
89 Some(t) => serde_json::to_string(t).unwrap_or_else(|_| "[]".into()),
90 None => "[]".to_string(),
91 }
92}
93
94fn row_str(row: &khive_storage::types::SqlRow, col: &str) -> Option<String> {
95 match row.get(col) {
96 Some(SqlValue::Text(s)) => Some(s.clone()),
97 _ => None,
98 }
99}
100
101fn row_i64(row: &khive_storage::types::SqlRow, col: &str) -> Option<i64> {
102 match row.get(col) {
103 Some(SqlValue::Integer(n)) => Some(*n),
104 _ => None,
105 }
106}
107
108fn row_bool(row: &khive_storage::types::SqlRow, col: &str) -> bool {
109 matches!(row.get(col), Some(SqlValue::Integer(1)))
110}
111
112fn atom_from_row(row: &khive_storage::types::SqlRow) -> Option<Atom> {
113 let id: Uuid = row_str(row, "id")?.parse().ok()?;
114 Some(Atom {
115 id,
116 namespace: row_str(row, "namespace")?,
117 slug: row_str(row, "slug")?,
118 name: row_str(row, "name")?,
119 description: row_str(row, "description"),
120 content: row_str(row, "content").unwrap_or_default(),
121 tags: row_str(row, "tags").unwrap_or_else(|| "[]".into()),
122 properties: row_str(row, "properties"),
123 status: row_str(row, "status"),
124 source_uri: row_str(row, "source_uri"),
125 source_type: row_str(row, "source_type"),
126 finalized: row_bool(row, "finalized"),
127 created_at: row_i64(row, "created_at").unwrap_or(0),
128 updated_at: row_i64(row, "updated_at").unwrap_or(0),
129 deleted_at: row_i64(row, "deleted_at"),
130 })
131}
132
133fn domain_from_row(row: &khive_storage::types::SqlRow) -> Option<Domain> {
134 let id: Uuid = row_str(row, "id")?.parse().ok()?;
135 Some(Domain {
136 id,
137 namespace: row_str(row, "namespace")?,
138 slug: row_str(row, "slug")?,
139 name: row_str(row, "name")?,
140 description: row_str(row, "description"),
141 tags: row_str(row, "tags").unwrap_or_else(|| "[]".into()),
142 members: row_str(row, "members").unwrap_or_else(|| "[]".into()),
143 created_at: row_i64(row, "created_at").unwrap_or(0),
144 updated_at: row_i64(row, "updated_at").unwrap_or(0),
145 deleted_at: row_i64(row, "deleted_at"),
146 })
147}
148
149fn atom_to_json(atom: &Atom) -> Value {
150 json!({
151 "id": atom.id.to_string(),
152 "namespace": atom.namespace,
153 "slug": atom.slug,
154 "name": atom.name,
155 "description": atom.description,
156 "content": atom.content,
157 "tags": serde_json::from_str::<Value>(&atom.tags).unwrap_or(Value::Array(vec![])),
158 "properties": atom.properties.as_deref().and_then(|s| serde_json::from_str::<Value>(s).ok()),
159 "status": atom.status,
160 "source_uri": atom.source_uri,
161 "source_type": atom.source_type,
162 "finalized": atom.finalized,
163 "kind": "atom",
164 "created_at": atom.created_at,
165 "updated_at": atom.updated_at,
166 })
167}
168
169fn domain_to_json(domain: &Domain) -> Value {
170 json!({
171 "id": domain.id.to_string(),
172 "namespace": domain.namespace,
173 "slug": domain.slug,
174 "name": domain.name,
175 "description": domain.description,
176 "tags": serde_json::from_str::<Value>(&domain.tags).unwrap_or(Value::Array(vec![])),
177 "members": serde_json::from_str::<Value>(&domain.members).unwrap_or(Value::Array(vec![])),
178 "kind": "domain",
179 "created_at": domain.created_at,
180 "updated_at": domain.updated_at,
181 })
182}
183
184pub(crate) struct KnowledgeHandlers;
187
188impl KnowledgeHandlers {
189 pub(crate) async fn upsert_atoms(
192 runtime: &KhiveRuntime,
193 token: &NamespaceToken,
194 params: Value,
195 ) -> Result<Value, RuntimeError> {
196 let p: UpsertAtomsParams = deser(params)?;
197 if p.atoms.is_empty() {
198 return Err(RuntimeError::InvalidInput(
199 "atoms list must not be empty".into(),
200 ));
201 }
202 if p.atoms.len() > 5000 {
203 return Err(RuntimeError::InvalidInput(
204 "max 5000 atoms per request".into(),
205 ));
206 }
207
208 let ns = token.namespace().as_str().to_owned();
209 let sql = runtime.sql();
210 let now = now_us();
211 let mut created = 0usize;
212 let mut updated = 0usize;
213
214 for atom_in in &p.atoms {
215 let slug = atom_in.slug.trim().to_string();
216 if slug.is_empty() {
217 return Err(RuntimeError::InvalidInput(
218 "atom slug must not be empty".into(),
219 ));
220 }
221
222 let tags_json = tags_to_json(atom_in.tags.as_ref());
223 let content = atom_in.content.as_deref().unwrap_or("").to_string();
224 let props_json = atom_in
225 .properties
226 .as_ref()
227 .map(|v| serde_json::to_string(v).unwrap_or_default());
228 let source_uri = atom_in
229 .source_uri
230 .as_ref()
231 .map(|s| s.trim())
232 .filter(|s| !s.is_empty());
233 let source_type = atom_in
234 .source_type
235 .as_ref()
236 .map(|s| s.trim())
237 .filter(|s| !s.is_empty());
238
239 let mut reader = sql
241 .reader()
242 .await
243 .map_err(|e| sql_err("upsert_atoms reader", e))?;
244 let existing = reader
245 .query_row(SqlStatement {
246 sql: "SELECT id FROM knowledge_atoms WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
247 params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(slug.clone())],
248 label: None,
249 })
250 .await
251 .map_err(|e| sql_err("upsert_atoms lookup", e))?;
252
253 let mut writer = sql
254 .writer()
255 .await
256 .map_err(|e| sql_err("upsert_atoms writer", e))?;
257 if let Some(row) = existing {
258 let id = row_str(&row, "id").ok_or_else(|| {
259 RuntimeError::Internal("missing id in existing atom row".into())
260 })?;
261 writer
262 .execute(SqlStatement {
263 sql: "UPDATE knowledge_atoms SET name=?1, description=?2, content=?3, tags=?4, properties=?5, source_uri=?6, source_type=?7, finalized=?8, status = CASE WHEN ?8 = 1 AND status = 'draft' THEN 'reviewed' ELSE status END, updated_at=?9 WHERE id=?10 AND namespace=?11".into(),
268 params: vec![
269 SqlValue::Text(atom_in.name.clone()),
270 atom_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
271 SqlValue::Text(content.clone()),
272 SqlValue::Text(tags_json.clone()),
273 props_json.as_ref().map_or(SqlValue::Null, |p| SqlValue::Text(p.clone())),
274 source_uri.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
275 source_type.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
276 SqlValue::Integer(atom_in.finalized.unwrap_or(false) as i64),
277 SqlValue::Integer(now),
278 SqlValue::Text(id),
279 SqlValue::Text(ns.clone()),
280 ],
281 label: None,
282 })
283 .await
284 .map_err(|e| sql_err("upsert_atoms update", e))?;
285 updated += 1;
286 } else {
287 let id = new_id();
288 writer
289 .execute(SqlStatement {
290 sql: "INSERT INTO knowledge_atoms (id, namespace, slug, name, description, content, tags, properties, source_uri, source_type, status, finalized, created_at, updated_at) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14)".into(),
291 params: vec![
292 SqlValue::Text(id),
293 SqlValue::Text(ns.clone()),
294 SqlValue::Text(slug.clone()),
295 SqlValue::Text(atom_in.name.clone()),
296 atom_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
297 SqlValue::Text(content.clone()),
298 SqlValue::Text(tags_json.clone()),
299 props_json.as_ref().map_or(SqlValue::Null, |p| SqlValue::Text(p.clone())),
300 source_uri.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
301 source_type.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
302 SqlValue::Text(if atom_in.finalized.unwrap_or(false) { "reviewed" } else { "draft" }.to_string()),
305 SqlValue::Integer(atom_in.finalized.unwrap_or(false) as i64),
306 SqlValue::Integer(now),
307 SqlValue::Integer(now),
308 ],
309 label: None,
310 })
311 .await
312 .map_err(|e| sql_err("upsert_atoms insert", e))?;
313 created += 1;
314 }
315 }
316
317 Ok(json!({
318 "created": created,
319 "updated": updated,
320 "total": p.atoms.len(),
321 }))
322 }
323
324 pub(crate) async fn upsert_domains(
327 runtime: &KhiveRuntime,
328 token: &NamespaceToken,
329 params: Value,
330 ) -> Result<Value, RuntimeError> {
331 let p: UpsertDomainsParams = deser(params)?;
332 if p.domains.is_empty() {
333 return Err(RuntimeError::InvalidInput(
334 "domains list must not be empty".into(),
335 ));
336 }
337
338 let ns = token.namespace().as_str().to_owned();
339 let sql = runtime.sql();
340 let now = now_us();
341 let mut created = 0usize;
342 let mut updated = 0usize;
343
344 for domain_in in &p.domains {
345 let slug = domain_in.slug.trim().to_string();
346 let name = domain_in.name.trim().to_string();
347 if slug.is_empty() {
348 return Err(RuntimeError::InvalidInput(
349 "domain slug must not be empty".into(),
350 ));
351 }
352 if name.is_empty() {
353 return Err(RuntimeError::InvalidInput(
354 "domain name must not be empty".into(),
355 ));
356 }
357
358 let mut tags: Vec<String> = domain_in.tags.clone().unwrap_or_default();
359 if !tags.iter().any(|t| t == "type:domain") {
360 tags.push("type:domain".to_string());
361 }
362 let tags_json = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".into());
363 let members_json = match &domain_in.members {
364 Some(m) => serde_json::to_string(m).unwrap_or_else(|_| "[]".into()),
365 None => "[]".to_string(),
366 };
367 let properties_json = serde_json::to_string(
368 &serde_json::json!({ "members": domain_in.members.as_deref().unwrap_or(&[]) }),
369 )
370 .unwrap_or_else(|_| "{}".into());
371
372 let mut reader = sql
373 .reader()
374 .await
375 .map_err(|e| sql_err("upsert_domains reader", e))?;
376 let existing = reader
377 .query_row(SqlStatement {
378 sql: "SELECT id FROM knowledge_domains WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
379 params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(slug.clone())],
380 label: None,
381 })
382 .await
383 .map_err(|e| sql_err("upsert_domains lookup", e))?;
384
385 let mut writer = sql
386 .writer()
387 .await
388 .map_err(|e| sql_err("upsert_domains writer", e))?;
389 if let Some(row) = existing {
390 let id = row_str(&row, "id").ok_or_else(|| {
391 RuntimeError::Internal("missing id in existing domain row".into())
392 })?;
393 writer
394 .execute(SqlStatement {
395 sql: "UPDATE knowledge_domains SET name=?1, description=?2, tags=?3, members=?4, updated_at=?5 WHERE id=?6 AND namespace=?7".into(),
396 params: vec![
397 SqlValue::Text(name.clone()),
398 domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
399 SqlValue::Text(tags_json.clone()),
400 SqlValue::Text(members_json.clone()),
401 SqlValue::Integer(now),
402 SqlValue::Text(id.clone()),
403 SqlValue::Text(ns.clone()),
404 ],
405 label: None,
406 })
407 .await
408 .map_err(|e| sql_err("upsert_domains update", e))?;
409 writer
411 .execute(SqlStatement {
412 sql: "INSERT INTO knowledge_atoms (id, namespace, slug, name, description, content, tags, properties, status, finalized, created_at, updated_at) \
413 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,'reviewed',1,?9,?10) \
414 ON CONFLICT(namespace, slug) DO UPDATE SET name=?4, description=?5, content=?6, tags=?7, properties=?8, status='reviewed', updated_at=?10".into(),
415 params: vec![
416 SqlValue::Text(id),
417 SqlValue::Text(ns.clone()),
418 SqlValue::Text(slug.clone()),
419 SqlValue::Text(name.clone()),
420 domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
421 SqlValue::Text(String::new()),
422 SqlValue::Text(tags_json.clone()),
423 SqlValue::Text(properties_json.clone()),
424 SqlValue::Integer(now),
425 SqlValue::Integer(now),
426 ],
427 label: None,
428 })
429 .await
430 .map_err(|e| sql_err("upsert_domains atom mirror update", e))?;
431 updated += 1;
432 } else {
433 let id = new_id();
434 writer
435 .execute(SqlStatement {
436 sql: "INSERT INTO knowledge_domains (id, namespace, slug, name, description, tags, members, created_at, updated_at) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9)".into(),
437 params: vec![
438 SqlValue::Text(id.clone()),
439 SqlValue::Text(ns.clone()),
440 SqlValue::Text(slug.clone()),
441 SqlValue::Text(name.clone()),
442 domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
443 SqlValue::Text(tags_json.clone()),
444 SqlValue::Text(members_json.clone()),
445 SqlValue::Integer(now),
446 SqlValue::Integer(now),
447 ],
448 label: None,
449 })
450 .await
451 .map_err(|e| sql_err("upsert_domains insert", e))?;
452 writer
454 .execute(SqlStatement {
455 sql: "INSERT INTO knowledge_atoms (id, namespace, slug, name, description, content, tags, properties, status, finalized, created_at, updated_at) \
456 VALUES (?1,?2,?3,?4,?5,?6,?7,?8,'reviewed',1,?9,?10)".into(),
457 params: vec![
458 SqlValue::Text(id),
459 SqlValue::Text(ns.clone()),
460 SqlValue::Text(slug.clone()),
461 SqlValue::Text(name.clone()),
462 domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
463 SqlValue::Text(String::new()),
464 SqlValue::Text(tags_json.clone()),
465 SqlValue::Text(properties_json.clone()),
466 SqlValue::Integer(now),
467 SqlValue::Integer(now),
468 ],
469 label: None,
470 })
471 .await
472 .map_err(|e| sql_err("upsert_domains atom mirror insert", e))?;
473 created += 1;
474 }
475 }
476
477 Ok(json!({
478 "created": created,
479 "updated": updated,
480 "total": p.domains.len(),
481 }))
482 }
483
484 pub(crate) async fn get(
487 runtime: &KhiveRuntime,
488 token: &NamespaceToken,
489 params: Value,
490 ) -> Result<Value, RuntimeError> {
491 let p: GetParams = deser(params)?;
492 let ns = token.namespace().as_str().to_owned();
493 let sql = runtime.sql();
494 let id = p.id.trim().to_string();
495
496 let is_uuid = id.parse::<Uuid>().is_ok();
498
499 let mut reader = sql.reader().await.map_err(|e| sql_err("get reader", e))?;
500
501 if is_uuid {
502 let row = reader
504 .query_row(SqlStatement {
505 sql: "SELECT * FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
506 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
507 label: None,
508 })
509 .await
510 .map_err(|e| sql_err("get atom by id", e))?;
511 if let Some(r) = row {
512 return atom_from_row(&r)
513 .map(|a| atom_to_json(&a))
514 .ok_or_else(|| RuntimeError::Internal("atom row parse failed".into()));
515 }
516 let row = reader
518 .query_row(SqlStatement {
519 sql: "SELECT * FROM knowledge_domains WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
520 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
521 label: None,
522 })
523 .await
524 .map_err(|e| sql_err("get domain by id", e))?;
525 if let Some(r) = row {
526 return domain_from_row(&r)
527 .map(|d| domain_to_json(&d))
528 .ok_or_else(|| RuntimeError::Internal("domain row parse failed".into()));
529 }
530 }
531
532 let row = reader
535 .query_row(SqlStatement {
536 sql: "SELECT * FROM knowledge_domains WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
537 params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(id.clone())],
538 label: None,
539 })
540 .await
541 .map_err(|e| sql_err("get domain by slug", e))?;
542 if let Some(r) = row {
543 return domain_from_row(&r)
544 .map(|d| domain_to_json(&d))
545 .ok_or_else(|| RuntimeError::Internal("domain row parse failed".into()));
546 }
547
548 let row = reader
549 .query_row(SqlStatement {
550 sql: "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
551 params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(id.clone())],
552 label: None,
553 })
554 .await
555 .map_err(|e| sql_err("get atom by slug", e))?;
556 if let Some(r) = row {
557 return atom_from_row(&r)
558 .map(|a| atom_to_json(&a))
559 .ok_or_else(|| RuntimeError::Internal("atom row parse failed".into()));
560 }
561
562 Err(RuntimeError::NotFound(format!(
563 "atom or domain not found: {id:?}"
564 )))
565 }
566
567 pub(crate) async fn list(
570 runtime: &KhiveRuntime,
571 token: &NamespaceToken,
572 params: Value,
573 ) -> Result<Value, RuntimeError> {
574 let p: ListParams = deser(params)?;
575 let ns = token.namespace().as_str().to_owned();
576 let sql = runtime.sql();
577 let limit = p.limit.unwrap_or(20).clamp(1, 500) as i64;
578 let offset = p.offset.unwrap_or(0) as i64;
579
580 let mut reader = sql.reader().await.map_err(|e| sql_err("list reader", e))?;
581
582 match p.kind.as_deref() {
583 Some("domain") => {
584 let rows = reader
585 .query_all(SqlStatement {
586 sql: "SELECT * FROM knowledge_domains WHERE namespace = ?1 AND deleted_at IS NULL ORDER BY created_at DESC LIMIT ?2 OFFSET ?3".into(),
587 params: vec![
588 SqlValue::Text(ns.clone()),
589 SqlValue::Integer(limit),
590 SqlValue::Integer(offset),
591 ],
592 label: None,
593 })
594 .await
595 .map_err(|e| sql_err("list domains", e))?;
596
597 let total_row = reader
598 .query_scalar(SqlStatement {
599 sql: "SELECT COUNT(*) FROM knowledge_domains WHERE namespace = ?1 AND deleted_at IS NULL".into(),
600 params: vec![SqlValue::Text(ns)],
601 label: None,
602 })
603 .await
604 .map_err(|e| sql_err("list domains count", e))?;
605 let total = match total_row {
606 Some(SqlValue::Integer(n)) => n,
607 _ => 0,
608 };
609
610 let items: Vec<Value> = rows
611 .iter()
612 .filter_map(|r| domain_from_row(r).map(|d| domain_to_json(&d)))
613 .collect();
614
615 Ok(json!({ "results": items, "total": total, "limit": limit, "offset": offset }))
616 }
617 Some("atom") | None => {
618 let requested_statuses = status_values(p.status.as_ref());
619 let (data_status_clause, data_status_params) =
620 status_sql_clause(&requested_statuses, p.exclude_status.as_deref(), 4);
621 let (count_status_clause, count_status_params) =
622 status_sql_clause(&requested_statuses, p.exclude_status.as_deref(), 2);
623
624 let sql_str = format!(
625 "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'{} ORDER BY created_at DESC LIMIT ?2 OFFSET ?3",
626 data_status_clause
627 );
628 let count_sql = format!(
629 "SELECT COUNT(*) FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'{}",
630 count_status_clause
631 );
632
633 let mut row_params = vec![
634 SqlValue::Text(ns.clone()),
635 SqlValue::Integer(limit),
636 SqlValue::Integer(offset),
637 ];
638 row_params.extend(data_status_params);
639
640 let rows = reader
641 .query_all(SqlStatement {
642 sql: sql_str,
643 params: row_params,
644 label: None,
645 })
646 .await
647 .map_err(|e| sql_err("list atoms", e))?;
648
649 let mut count_params = vec![SqlValue::Text(ns)];
650 count_params.extend(count_status_params);
651 let total_row = reader
652 .query_scalar(SqlStatement {
653 sql: count_sql,
654 params: count_params,
655 label: None,
656 })
657 .await
658 .map_err(|e| sql_err("list atoms count", e))?;
659 let total = match total_row {
660 Some(SqlValue::Integer(n)) => n,
661 _ => 0,
662 };
663
664 let items: Vec<Value> = rows
665 .iter()
666 .filter_map(|r| atom_from_row(r).map(|a| atom_to_json(&a)))
667 .collect();
668
669 Ok(json!({ "results": items, "total": total, "limit": limit, "offset": offset }))
670 }
671 Some(other) => Err(RuntimeError::InvalidInput(format!(
672 "unknown type {other:?}; valid: atom | domain"
673 ))),
674 }
675 }
676
677 pub(crate) async fn delete_atoms(
680 runtime: &KhiveRuntime,
681 token: &NamespaceToken,
682 params: Value,
683 ) -> Result<Value, RuntimeError> {
684 let p: DeleteAtomsParams = deser(params)?;
685 if p.ids.is_empty() {
686 return Err(RuntimeError::InvalidInput("ids must not be empty".into()));
687 }
688
689 let ns = token.namespace().as_str().to_owned();
690 let sql = runtime.sql();
691 let now = now_us();
692 let mut deleted = 0usize;
693
694 let mut writer = sql
695 .writer()
696 .await
697 .map_err(|e| sql_err("delete_atoms writer", e))?;
698 for id_or_slug in &p.ids {
699 let id_or_slug = id_or_slug.trim().to_string();
700 let affected = writer
702 .execute(SqlStatement {
703 sql: "UPDATE knowledge_atoms SET deleted_at = ?1 WHERE namespace = ?2 AND (id = ?3 OR slug = ?3) AND deleted_at IS NULL".into(),
704 params: vec![
705 SqlValue::Integer(now),
706 SqlValue::Text(ns.clone()),
707 SqlValue::Text(id_or_slug),
708 ],
709 label: None,
710 })
711 .await
712 .map_err(|e| sql_err("delete_atoms update", e))?;
713 deleted += affected as usize;
714 }
715
716 Ok(json!({
717 "deleted": deleted,
718 "requested": p.ids.len(),
719 }))
720 }
721
722 pub(crate) async fn stats(
725 runtime: &KhiveRuntime,
726 token: &NamespaceToken,
727 _params: Value,
728 ) -> Result<Value, RuntimeError> {
729 let ns = token.namespace().as_str().to_owned();
730 let sql = runtime.sql();
731 let mut reader = sql.reader().await.map_err(|e| sql_err("stats reader", e))?;
732
733 let atom_count = reader
734 .query_scalar(SqlStatement {
735 sql: "SELECT COUNT(*) FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'".into(),
736 params: vec![SqlValue::Text(ns.clone())],
737 label: None,
738 })
739 .await
740 .map_err(|e| sql_err("stats atoms", e))?;
741
742 let domain_count = reader
743 .query_scalar(SqlStatement {
744 sql: "SELECT COUNT(*) FROM knowledge_domains WHERE namespace = ?1 AND deleted_at IS NULL".into(),
745 params: vec![SqlValue::Text(ns.clone())],
746 label: None,
747 })
748 .await
749 .map_err(|e| sql_err("stats domains", e))?;
750
751 let finalized_count = reader
752 .query_scalar(SqlStatement {
753 sql: "SELECT COUNT(*) FROM knowledge_atoms WHERE namespace = ?1 AND finalized = 1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'".into(),
754 params: vec![SqlValue::Text(ns.clone())],
755 label: None,
756 })
757 .await
758 .map_err(|e| sql_err("stats finalized", e))?;
759
760 let total_atoms = match atom_count {
761 Some(SqlValue::Integer(n)) => n,
762 _ => 0,
763 };
764 let total_domains = match domain_count {
765 Some(SqlValue::Integer(n)) => n,
766 _ => 0,
767 };
768 let finalized = match finalized_count {
769 Some(SqlValue::Integer(n)) => n,
770 _ => 0,
771 };
772
773 let eval_coverage = if total_atoms > 0 {
774 finalized as f64 / total_atoms as f64
775 } else {
776 0.0
777 };
778
779 let embedding_coverage =
780 compute_embedding_coverage(runtime, token, &ns, total_atoms).await?;
781
782 Ok(json!({
783 "total_atoms": total_atoms,
784 "total_domains": total_domains,
785 "total_events": 0,
786 "eval_coverage": eval_coverage,
787 "embedding_coverage": embedding_coverage,
788 "namespace": ns,
789 }))
790 }
791
792 pub(crate) async fn index(
795 runtime: &KhiveRuntime,
796 token: &NamespaceToken,
797 params: Value,
798 ann: &vamana::SharedAnn,
799 ) -> Result<Value, RuntimeError> {
800 let p: IndexParams = deser(params)?;
801 let rebuild_ann = p.rebuild_ann.unwrap_or(false);
802 let ns = token.namespace().as_str().to_owned();
803
804 if runtime.default_embedder_name().is_empty() {
806 return Ok(
807 json!({ "indexed": 0, "skipped": 0, "total": 0, "reason": "no embedding model configured" }),
808 );
809 }
810
811 let sql = runtime.sql();
812 let batch_size = p.batch_size.unwrap_or(500).clamp(1, 1000);
813 let insert_only = p.insert_only.unwrap_or(false);
814
815 let atoms: Vec<Atom> = if let Some(ref ids) = p.ids {
817 let mut out = Vec::with_capacity(ids.len());
818 let mut reader = sql.reader().await.map_err(|e| sql_err("index reader", e))?;
819 for id_or_slug in ids {
820 let row = reader
821 .query_row(SqlStatement {
822 sql: "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND (id = ?2 OR slug = ?2) AND deleted_at IS NULL LIMIT 1".into(),
823 params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(id_or_slug.clone())],
824 label: None,
825 })
826 .await
827 .map_err(|e| sql_err("index atom lookup", e))?;
828 if let Some(r) = row {
829 if let Some(a) = atom_from_row(&r) {
830 out.push(a);
831 }
832 }
833 }
834 out
835 } else {
836 let mut out = Vec::new();
837 let mut offset = 0i64;
838 loop {
839 let mut reader = sql
840 .reader()
841 .await
842 .map_err(|e| sql_err("index page reader", e))?;
843 let rows = reader
844 .query_all(SqlStatement {
845 sql: "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL ORDER BY created_at LIMIT ?2 OFFSET ?3".into(),
846 params: vec![
847 SqlValue::Text(ns.clone()),
848 SqlValue::Integer(batch_size as i64),
849 SqlValue::Integer(offset),
850 ],
851 label: None,
852 })
853 .await
854 .map_err(|e| sql_err("index page", e))?;
855 let n = rows.len();
856 out.extend(rows.iter().filter_map(atom_from_row));
857 if n < batch_size {
858 break;
859 }
860 offset += n as i64;
861 }
862 out
863 };
864
865 let total = atoms.len();
866 let mut indexed = 0usize;
867 let mut skipped = 0usize;
868
869 let mut ann_vectors: Vec<f32> = Vec::new();
870 let mut ann_ids: Vec<Uuid> = Vec::new();
871 let mut ann_dim: usize = 0;
872
873 for chunk in atoms.chunks(EMBED_BATCH) {
874 let mut staged: Vec<(Uuid, String)> = Vec::with_capacity(chunk.len());
875 for atom in chunk {
876 let text = atom_embed_text(atom);
877 if text.trim().is_empty() {
878 skipped += 1;
879 continue;
880 }
881 staged.push((atom.id, text));
882 }
883 if staged.is_empty() {
884 continue;
885 }
886
887 let texts: Vec<String> = staged
888 .iter()
889 .map(|(_, t)| {
890 if t.len() <= MAX_EMBED_BYTES {
891 t.clone()
892 } else {
893 let mut end = MAX_EMBED_BYTES;
894 while !t.is_char_boundary(end) {
895 end -= 1;
896 }
897 t[..end].to_string()
898 }
899 })
900 .collect();
901
902 let embeddings = match runtime.embed_batch(&texts).await {
903 Ok(e) => e,
904 Err(_) => {
905 skipped += staged.len();
906 continue;
907 }
908 };
909 if embeddings.len() != staged.len() {
910 skipped += staged.len();
911 continue;
912 }
913
914 if let Ok(vectors) = runtime.vectors(token) {
915 let ns_str = token.namespace().as_str();
916 if !insert_only {
917 for (id, _) in &staged {
918 let _ = vectors.delete(*id).await;
919 }
920 }
921 for ((id, _), emb) in staged.iter().zip(embeddings.iter()) {
922 let _ = vectors
923 .insert(
924 *id,
925 SubstrateKind::Entity,
926 ns_str,
927 "knowledge.atom",
928 vec![emb.clone()],
929 )
930 .await;
931 }
932 }
933
934 if rebuild_ann {
935 for ((id, _), emb) in staged.iter().zip(embeddings.iter()) {
936 if ann_dim == 0 {
937 ann_dim = emb.len();
938 }
939 if emb.len() == ann_dim {
940 ann_ids.push(*id);
941 ann_vectors.extend_from_slice(emb);
942 }
943 }
944 }
945
946 indexed += staged.len();
947 }
948
949 if indexed > 0 {
953 vamana::invalidate_snapshot(runtime, &ns).await;
954 *ann.index.write().await = None;
955 }
956
957 let mut ann_count: Option<usize> = None;
958 let is_full_corpus = p.ids.is_none();
959 if rebuild_ann && is_full_corpus && !ann_vectors.is_empty() && ann_dim > 0 {
960 match vamana::AnnBridge::build(ann_vectors, ann_dim, ann_ids) {
961 Ok(bridge) => {
962 ann_count = Some(bridge.num_vectors());
963 let model_name = runtime.default_embedder_name();
964 if let Some(fp) = vamana::compute_fingerprint(runtime, token, model_name).await
965 {
966 if let Err(e) =
967 vamana::persist_snapshot(runtime, &ns, model_name, &bridge, fp).await
968 {
969 tracing::error!(error = %e, "failed to persist Vamana snapshot");
970 }
971 }
972 let mut guard = ann.index.write().await;
973 *guard = Some(bridge);
974 }
975 Err(e) => {
976 tracing::warn!(error = %e, "failed to build Vamana ANN index");
977 }
978 }
979 }
980
981 Ok(json!({
982 "indexed": indexed,
983 "skipped": skipped,
984 "total": total,
985 "ann_vectors": ann_count,
986 }))
987 }
988
989 pub(crate) async fn fold(
992 _runtime: &KhiveRuntime,
993 _token: &NamespaceToken,
994 params: Value,
995 ) -> Result<Value, RuntimeError> {
996 let p: FoldParams = deser(params)?;
997
998 if p.candidates.is_empty() {
999 return Ok(json!({
1000 "selected": [],
1001 "total_size": 0,
1002 "budget": p.budget,
1003 "selected_count": 0,
1004 }));
1005 }
1006
1007 let inputs: Vec<SelectorInput<FoldCandidate>> = p
1008 .candidates
1009 .iter()
1010 .cloned()
1011 .map(|c| SelectorInput {
1012 id: c.id.clone(),
1013 score: c.score,
1014 size: c.size,
1015 category: c.category.clone(),
1016 information_gain: c.information_gain,
1017 content: c,
1018 })
1019 .collect();
1020
1021 let weights = SelectorWeights {
1022 min_score: p.min_score.unwrap_or(0.0),
1023 category_weights: p.category_weights.unwrap_or_default().into_iter().collect(),
1024 diversity_bias: p.diversity_bias.unwrap_or(0.0),
1025 epistemic_weight: p.epistemic_weight.unwrap_or(0.0),
1026 };
1027
1028 let output = GreedySelector
1029 .select(inputs, p.budget, &weights)
1030 .map_err(|e| RuntimeError::Internal(format!("fold selector: {e}")))?;
1031
1032 let selected: Vec<Value> = output
1033 .selected
1034 .iter()
1035 .map(|item| {
1036 json!({
1037 "id": item.id,
1038 "score": item.score,
1039 "size": item.size,
1040 "content": item.content.content,
1041 "category": item.content.category,
1042 })
1043 })
1044 .collect();
1045
1046 Ok(json!({
1047 "selected": selected,
1048 "total_size": output.total_size,
1049 "budget": p.budget,
1050 "selected_count": output.selected.len(),
1051 }))
1052 }
1053
1054 pub(crate) async fn search(
1057 runtime: &KhiveRuntime,
1058 token: &NamespaceToken,
1059 params: Value,
1060 ann: &vamana::SharedAnn,
1061 ) -> Result<Value, RuntimeError> {
1062 let p: SearchParams = deser(params)?;
1063 let raw_query = p.query.trim().to_string();
1064 if raw_query.is_empty() {
1065 return Err(RuntimeError::InvalidInput("query must not be empty".into()));
1066 }
1067
1068 let limit = p.limit.unwrap_or(10).clamp(1, 100);
1069 let min_score = p.min_score.unwrap_or(0.0) as f32;
1070 let w = Weights::from_opts(&p);
1071 let type_filter = p.kind.as_deref();
1072 let do_decompose = p.decompose.unwrap_or(false);
1073 let decompose_threshold = p.decompose_threshold.unwrap_or(4);
1074 let intersection_bonus = p.intersection_bonus.unwrap_or(0.25) as f32;
1075 let requested_rerank = p.rerank.unwrap_or(true);
1076 let do_rerank = requested_rerank && !runtime.default_embedder_name().is_empty();
1077 let rerank_alpha = p.rerank_alpha.unwrap_or(0.7) as f32;
1078 let fetch_limit = if do_rerank { limit * 3 } else { limit }.min(100);
1079
1080 let non_stop_count = raw_query
1081 .split_whitespace()
1082 .filter(|w| w.len() >= MIN_TERM_LEN && !is_stop(&w.to_lowercase()))
1083 .count();
1084
1085 let ns = token.namespace().as_str().to_owned();
1086 let requested_statuses = status_values(p.status.as_ref());
1087 let include_deprecated = explicitly_requested_status(&requested_statuses, "deprecated");
1088
1089 let ctx = SearchCtx {
1090 runtime,
1091 ns: &ns,
1092 role: p.role.as_deref(),
1093 type_filter,
1094 min_score,
1095 w: &w,
1096 fetch_limit,
1097 statuses: &requested_statuses,
1098 exclude_status: p.exclude_status.as_deref(),
1099 };
1100
1101 let mut hits = if do_decompose && non_stop_count >= decompose_threshold {
1102 search_decomposed(&ctx, &raw_query, intersection_bonus).await?
1103 } else {
1104 search_core(&ctx, &raw_query).await?
1105 };
1106
1107 vamana::ensure_ann_background(runtime, token, ann);
1110
1111 let ann_guard = ann.index.read().await;
1113 if let Some(ref bridge) = *ann_guard {
1114 if let Ok(query_emb) = runtime.embed(&raw_query).await {
1115 let ann_k = fetch_limit.max(20);
1116 let ann_hits = bridge.search(&query_emb, ann_k);
1117 if !ann_hits.is_empty() {
1118 fuse_ann_hits(&mut hits, &ann_hits, min_score);
1119 hydrate_empty_hits(runtime, &ns, &mut hits).await;
1120 }
1121 }
1122 }
1123 drop(ann_guard);
1124
1125 if do_rerank && !hits.is_empty() {
1126 rerank_with_embeddings(runtime, &raw_query, &mut hits, rerank_alpha).await?;
1127 }
1128
1129 apply_status_multipliers(&mut hits, include_deprecated);
1130 hits.truncate(limit);
1131
1132 let results: Vec<Value> = hits
1133 .iter()
1134 .map(|h| {
1135 json!({
1136 "id": h.id,
1137 "slug": h.slug,
1138 "name": h.name,
1139 "description": h.description,
1140 "tags": h.tags,
1141 "status": h.status,
1142 "finalized": h.finalized,
1143 "kind": if h.is_domain { "domain" } else { "atom" },
1144 "score": h.score,
1145 })
1146 })
1147 .collect();
1148 let count = results.len();
1149
1150 Ok(json!({
1151 "status": "ok",
1152 "data": { "results": results, "count": count },
1153 }))
1154 }
1155
1156 pub(crate) async fn suggest(
1159 runtime: &KhiveRuntime,
1160 token: &NamespaceToken,
1161 params: Value,
1162 ann: &vamana::SharedAnn,
1163 ) -> Result<Value, RuntimeError> {
1164 let p: SuggestParams = deser(params)?;
1165 let raw_query = p.query.trim().to_string();
1166 if raw_query.is_empty() {
1167 return Err(RuntimeError::InvalidInput("query must not be empty".into()));
1168 }
1169 let limit = p.limit.unwrap_or(8).clamp(1, 100);
1170 let ns = token.namespace().as_str().to_owned();
1171
1172 let ctx = SearchCtx {
1173 runtime,
1174 ns: &ns,
1175 role: p.role.as_deref(),
1176 type_filter: Some("domain"),
1177 min_score: 0.0,
1178 w: &Weights::default(),
1179 fetch_limit: limit * 3,
1180 statuses: &[],
1181 exclude_status: None,
1182 };
1183
1184 let mut hits = search_core(&ctx, &raw_query).await?;
1185
1186 vamana::ensure_ann_background(runtime, token, ann);
1187 let ann_guard = ann.index.read().await;
1188 if let Some(ref bridge) = *ann_guard {
1189 if let Ok(query_emb) = runtime.embed(&raw_query).await {
1190 let ann_k = (limit * 3).max(20);
1191 let ann_hits = bridge.search(&query_emb, ann_k);
1192 if !ann_hits.is_empty() {
1193 fuse_ann_hits(&mut hits, &ann_hits, 0.0);
1194 hydrate_empty_hits(runtime, &ns, &mut hits).await;
1195 }
1196 }
1197 }
1198 drop(ann_guard);
1199
1200 rerank_with_embeddings(runtime, &raw_query, &mut hits, 0.7).await?;
1201
1202 hits.retain(|h| h.is_domain);
1203 hits.truncate(limit);
1204
1205 let results: Vec<Value> = hits
1206 .iter()
1207 .map(|h| json!({ "id": h.id, "name": h.name, "score": h.score }))
1208 .collect();
1209 let count = results.len();
1210
1211 Ok(json!({
1212 "status": "ok",
1213 "data": { "results": results, "count": count },
1214 }))
1215 }
1216
1217 pub(crate) async fn compose(
1220 runtime: &KhiveRuntime,
1221 token: &NamespaceToken,
1222 params: Value,
1223 ) -> Result<Value, RuntimeError> {
1224 let p: ComposeParams = deser(params)?;
1225 let raw_query = p.query.trim().to_string();
1226 if raw_query.is_empty() {
1227 return Err(RuntimeError::InvalidInput("query must not be empty".into()));
1228 }
1229
1230 let domain_ids: Vec<String> = p
1231 .domain_ids
1232 .unwrap_or_default()
1233 .into_iter()
1234 .filter(|s| !s.trim().is_empty())
1235 .collect();
1236 let atom_ids: Vec<String> = p
1237 .atom_ids
1238 .unwrap_or_default()
1239 .into_iter()
1240 .filter(|s| !s.trim().is_empty())
1241 .collect();
1242
1243 if domain_ids.is_empty() && atom_ids.is_empty() {
1244 return Err(RuntimeError::InvalidInput(
1245 "domain_ids or atom_ids must be provided".into(),
1246 ));
1247 }
1248
1249 let ns = token.namespace().as_str().to_owned();
1250
1251 let mut resolved_domains: Vec<Domain> = Vec::new();
1252 let mut member_slugs: Vec<String> = Vec::new();
1253
1254 for id in &domain_ids {
1255 let domain = load_domain_by_id_or_slug(runtime, &ns, id).await?;
1256 let members = parse_domain_members(&domain)?;
1257 member_slugs.extend(members);
1258 resolved_domains.push(domain);
1259 }
1260
1261 let mut seen_ids: HashSet<String> = HashSet::new();
1262 let mut ordered_atoms: Vec<Atom> = Vec::new();
1263
1264 for slug in &member_slugs {
1265 let atom = load_atom_by_id_or_slug(runtime, &ns, slug).await?;
1266 if seen_ids.insert(atom.id.to_string()) {
1267 ordered_atoms.push(atom);
1268 }
1269 }
1270 for id in &atom_ids {
1271 let atom = load_atom_by_id_or_slug(runtime, &ns, id).await?;
1272 if seen_ids.insert(atom.id.to_string()) {
1273 ordered_atoms.push(atom);
1274 }
1275 }
1276
1277 if ordered_atoms.is_empty() {
1278 return Ok(json!({
1279 "status": "ok",
1280 "data": {
1281 "query": raw_query,
1282 "markdown": "# Knowledge Briefing\n\nNo atoms found.",
1283 "domains": [],
1284 "atoms": [],
1285 "count": 0,
1286 },
1287 }));
1288 }
1289
1290 let mut items: Vec<ScoredTextItem> = ordered_atoms
1291 .iter()
1292 .map(|a| ScoredTextItem {
1293 id: a.id.to_string(),
1294 slug: a.slug.clone(),
1295 name: a.name.clone(),
1296 text: atom_embed_text(a),
1297 score: 1.0,
1298 })
1299 .collect();
1300
1301 rerank_text_items(runtime, &raw_query, &mut items).await?;
1302
1303 let sorted_atoms: Vec<(&Atom, f32)> = items
1304 .iter()
1305 .filter_map(|item| {
1306 ordered_atoms
1307 .iter()
1308 .find(|a| a.id.to_string() == item.id)
1309 .map(|a| (a, item.score))
1310 })
1311 .collect();
1312
1313 let markdown = format_compose_markdown(&raw_query, &resolved_domains, &sorted_atoms);
1314
1315 let atom_json: Vec<Value> = items
1316 .iter()
1317 .map(|item| {
1318 json!({
1319 "id": item.id,
1320 "slug": item.slug,
1321 "name": item.name,
1322 "score": item.score,
1323 })
1324 })
1325 .collect();
1326
1327 let domain_json: Vec<Value> = resolved_domains
1328 .iter()
1329 .map(|d| json!({ "id": d.id.to_string(), "slug": d.slug, "name": d.name }))
1330 .collect();
1331
1332 let count = atom_json.len();
1333
1334 Ok(json!({
1335 "status": "ok",
1336 "data": {
1337 "query": raw_query,
1338 "markdown": markdown,
1339 "domains": domain_json,
1340 "atoms": atom_json,
1341 "count": count,
1342 },
1343 }))
1344 }
1345}
1346
1347struct Weights {
1350 w_exact_name: f32,
1351 w_name: f32,
1352 w_description: f32,
1353 w_tags: f32,
1354 w_content: f32,
1355 expand_discount: f32,
1356 coverage_alpha: f32,
1357 w_bigram: f32,
1358}
1359
1360impl Default for Weights {
1361 fn default() -> Self {
1362 Self {
1363 w_exact_name: D_W_EXACT_NAME,
1364 w_name: D_W_NAME,
1365 w_description: D_W_DESCRIPTION,
1366 w_tags: D_W_TAGS,
1367 w_content: D_W_CONTENT,
1368 expand_discount: D_EXPAND_DISCOUNT,
1369 coverage_alpha: D_COVERAGE_ALPHA,
1370 w_bigram: D_W_BIGRAM,
1371 }
1372 }
1373}
1374
1375impl Weights {
1376 fn from_opts(opts: &SearchParams) -> Self {
1377 let w = opts.weights.as_ref();
1378 Self {
1379 w_exact_name: w
1380 .and_then(|w| w.w_exact_name)
1381 .map_or(D_W_EXACT_NAME, |v| v as f32),
1382 w_name: w.and_then(|w| w.w_name).map_or(D_W_NAME, |v| v as f32),
1383 w_description: w
1384 .and_then(|w| w.w_description)
1385 .map_or(D_W_DESCRIPTION, |v| v as f32),
1386 w_tags: w.and_then(|w| w.w_tags).map_or(D_W_TAGS, |v| v as f32),
1387 w_content: w
1388 .and_then(|w| w.w_content)
1389 .map_or(D_W_CONTENT, |v| v as f32),
1390 expand_discount: w
1391 .and_then(|w| w.expand_discount)
1392 .map_or(D_EXPAND_DISCOUNT, |v| v as f32),
1393 coverage_alpha: w
1394 .and_then(|w| w.coverage_alpha)
1395 .map_or(D_COVERAGE_ALPHA, |v| v as f32),
1396 w_bigram: w.and_then(|w| w.w_bigram).map_or(D_W_BIGRAM, |v| v as f32),
1397 }
1398 }
1399}
1400
1401struct ScoredHit {
1404 id: String,
1405 slug: String,
1406 name: String,
1407 description: Option<String>,
1408 tags: Option<String>,
1409 finalized: bool,
1410 is_domain: bool,
1411 status: Option<String>,
1412 score: f32,
1413}
1414
1415const RRF_K: usize = 60;
1418
1419fn normalize_rrf_score(raw: f32, source_count: usize, k: usize) -> f32 {
1420 if source_count == 0 {
1421 return 0.0;
1422 }
1423 let theoretical_max = source_count as f32 / (k as f32 + 1.0);
1424 (raw / theoretical_max).clamp(0.0, 1.0)
1425}
1426
1427fn fuse_ann_hits(fts_hits: &mut Vec<ScoredHit>, ann_hits: &[(Uuid, f32)], min_score: f32) {
1428 let drained: Vec<ScoredHit> = std::mem::take(fts_hits);
1429
1430 let fts_source: Vec<(String, DeterministicScore)> = drained
1431 .iter()
1432 .map(|hit| (hit.id.clone(), DeterministicScore::from_f32(hit.score)))
1433 .collect();
1434 let mut by_id: HashMap<String, ScoredHit> = drained
1435 .into_iter()
1436 .map(|hit| (hit.id.clone(), hit))
1437 .collect();
1438 let ann_source: Vec<(String, DeterministicScore)> = ann_hits
1439 .iter()
1440 .map(|(uuid, score)| (uuid.to_string(), DeterministicScore::from_f32(*score)))
1441 .collect();
1442
1443 let source_count = usize::from(!fts_source.is_empty()) + usize::from(!ann_source.is_empty());
1444 let fused = khive_fusion::reciprocal_rank_fusion(vec![fts_source, ann_source], RRF_K);
1445
1446 for (id, fused_score) in fused {
1447 let raw_score = fused_score.to_f64() as f32;
1448 let score = normalize_rrf_score(raw_score, source_count, RRF_K);
1449 if score < min_score {
1450 continue;
1451 }
1452
1453 if let Some(mut hit) = by_id.remove(&id) {
1454 hit.score = score;
1455 fts_hits.push(hit);
1456 } else {
1457 fts_hits.push(ScoredHit {
1458 id,
1459 slug: String::new(),
1460 name: String::new(),
1461 description: None,
1462 tags: None,
1463 finalized: false,
1464 is_domain: false,
1465 status: None,
1466 score,
1467 });
1468 }
1469 }
1470}
1471
1472async fn hydrate_empty_hits(runtime: &KhiveRuntime, ns: &str, hits: &mut Vec<ScoredHit>) {
1473 let ids: Vec<String> = hits
1474 .iter()
1475 .filter(|hit| hit.slug.is_empty())
1476 .map(|hit| hit.id.clone())
1477 .collect();
1478 if ids.is_empty() {
1479 return;
1480 }
1481
1482 let sql = runtime.sql();
1483 let mut reader = match sql.reader().await {
1484 Ok(r) => r,
1485 Err(_) => return,
1486 };
1487
1488 let placeholders = ids
1489 .iter()
1490 .enumerate()
1491 .map(|(i, _)| format!("?{}", i + 2))
1492 .collect::<Vec<_>>()
1493 .join(",");
1494 let mut params = vec![SqlValue::Text(ns.to_owned())];
1495 params.extend(ids.iter().cloned().map(SqlValue::Text));
1496
1497 let atom_rows = reader
1498 .query_all(SqlStatement {
1499 sql: format!(
1500 "SELECT id, slug, name, description, tags, finalized, status FROM knowledge_atoms WHERE namespace = ?1 AND id IN ({placeholders}) AND deleted_at IS NULL"
1501 ),
1502 params,
1503 label: None,
1504 })
1505 .await
1506 .unwrap_or_default();
1507
1508 let mut atom_rows_by_id: HashMap<String, khive_storage::types::SqlRow> = HashMap::new();
1509 for row in atom_rows {
1510 if let Some(id) = row_str(&row, "id") {
1511 atom_rows_by_id.insert(id, row);
1512 }
1513 }
1514
1515 for hit in hits.iter_mut().filter(|hit| hit.slug.is_empty()) {
1516 if let Some(row) = atom_rows_by_id.get(&hit.id) {
1517 hit.slug = row_str(row, "slug").unwrap_or_default();
1518 hit.name = row_str(row, "name").unwrap_or_default();
1519 hit.description = row_str(row, "description");
1520 hit.tags = row_str(row, "tags");
1521 hit.finalized = row_bool(row, "finalized");
1522 hit.status = row_str(row, "status");
1523 let tags_arr: Vec<String> = hit
1524 .tags
1525 .as_deref()
1526 .and_then(|tags| serde_json::from_str(tags).ok())
1527 .unwrap_or_default();
1528 hit.is_domain = tags_arr.iter().any(|t| t == "type:domain");
1529 }
1530 }
1531
1532 let missing_ids: Vec<String> = hits
1533 .iter()
1534 .filter(|hit| hit.slug.is_empty())
1535 .map(|hit| hit.id.clone())
1536 .collect();
1537 if missing_ids.is_empty() {
1538 return;
1539 }
1540
1541 let placeholders = missing_ids
1542 .iter()
1543 .enumerate()
1544 .map(|(i, _)| format!("?{}", i + 2))
1545 .collect::<Vec<_>>()
1546 .join(",");
1547 let mut params = vec![SqlValue::Text(ns.to_owned())];
1548 params.extend(missing_ids.iter().cloned().map(SqlValue::Text));
1549
1550 let domain_rows = reader
1551 .query_all(SqlStatement {
1552 sql: format!(
1553 "SELECT id, slug, name, description, tags FROM knowledge_domains WHERE namespace = ?1 AND id IN ({placeholders}) AND deleted_at IS NULL"
1554 ),
1555 params,
1556 label: None,
1557 })
1558 .await
1559 .unwrap_or_default();
1560
1561 let mut domain_rows_by_id: HashMap<String, khive_storage::types::SqlRow> = HashMap::new();
1562 for row in domain_rows {
1563 if let Some(id) = row_str(&row, "id") {
1564 domain_rows_by_id.insert(id, row);
1565 }
1566 }
1567
1568 for hit in hits.iter_mut().filter(|hit| hit.slug.is_empty()) {
1569 if let Some(row) = domain_rows_by_id.get(&hit.id) {
1570 hit.slug = row_str(row, "slug").unwrap_or_default();
1571 hit.name = row_str(row, "name").unwrap_or_default();
1572 hit.description = row_str(row, "description");
1573 hit.tags = row_str(row, "tags");
1574 hit.finalized = false;
1575 hit.is_domain = true;
1576 }
1577 }
1578
1579 hits.retain(|hit| !hit.slug.is_empty());
1580}
1581
1582fn status_values(value: Option<&Value>) -> Vec<String> {
1585 match value {
1586 Some(Value::String(s)) => {
1587 let s = s.trim();
1588 if s.is_empty() {
1589 Vec::new()
1590 } else {
1591 vec![s.to_string()]
1592 }
1593 }
1594 Some(Value::Array(items)) => items
1595 .iter()
1596 .filter_map(Value::as_str)
1597 .map(str::trim)
1598 .filter(|s| !s.is_empty())
1599 .map(str::to_string)
1600 .collect(),
1601 _ => Vec::new(),
1602 }
1603}
1604
1605fn status_sql_clause(
1606 statuses: &[String],
1607 exclude_status: Option<&str>,
1608 first_param: usize,
1609) -> (String, Vec<SqlValue>) {
1610 if !statuses.is_empty() {
1611 let placeholders = statuses
1612 .iter()
1613 .enumerate()
1614 .map(|(i, _)| format!("?{}", first_param + i))
1615 .collect::<Vec<_>>()
1616 .join(",");
1617 let clause = if statuses.len() == 1 {
1618 format!(" AND status = ?{first_param}")
1619 } else {
1620 format!(" AND status IN ({placeholders})")
1621 };
1622 let params = statuses.iter().cloned().map(SqlValue::Text).collect();
1623 return (clause, params);
1624 }
1625
1626 if let Some(status) = exclude_status.map(str::trim).filter(|s| !s.is_empty()) {
1627 return (
1628 format!(" AND (status IS NULL OR status != ?{first_param})"),
1629 vec![SqlValue::Text(status.to_string())],
1630 );
1631 }
1632
1633 (
1634 " AND (status IS NULL OR status != 'deprecated')".to_string(),
1635 Vec::new(),
1636 )
1637}
1638
1639fn explicitly_requested_status(statuses: &[String], status: &str) -> bool {
1640 statuses.iter().any(|s| s == status)
1641}
1642
1643fn status_multiplier(status: Option<&str>) -> f32 {
1644 match status.unwrap_or("reviewed") {
1645 "verified" => 1.2,
1646 "reviewed" => 1.0,
1647 "draft" => 0.8,
1648 "deprecated" => 0.0,
1649 _ => 1.0,
1650 }
1651}
1652
1653fn apply_status_multipliers(hits: &mut Vec<ScoredHit>, include_deprecated: bool) {
1654 hits.retain_mut(|hit| {
1655 let multiplier = status_multiplier(hit.status.as_deref());
1656 hit.score = (hit.score / (hit.score + 1.0) * multiplier).clamp(0.0, 1.0);
1660 include_deprecated || multiplier > 0.0
1661 });
1662 hits.sort_by(|a, b| {
1663 b.score
1664 .partial_cmp(&a.score)
1665 .unwrap_or(std::cmp::Ordering::Equal)
1666 .then_with(|| a.slug.cmp(&b.slug))
1667 });
1668}
1669
1670struct Candidate {
1673 id: String,
1674 slug: String,
1675 name_raw: String,
1676 description_raw: Option<String>,
1677 tags_raw: Option<String>,
1678 status_raw: Option<String>,
1679 finalized: bool,
1680 is_domain: bool,
1681 name: Vec<String>,
1682 description: Vec<String>,
1683 tags: Vec<String>,
1684 content: Vec<String>,
1685}
1686
1687fn load_candidates_from_atoms(atoms: &[Atom], type_filter: Option<&str>) -> Vec<Candidate> {
1688 let want_domain = type_filter == Some("domain");
1689 let want_atom = type_filter == Some("atom");
1690
1691 atoms
1692 .iter()
1693 .filter_map(|atom| {
1694 let tags_str = atom.tags_display();
1695 let is_domain = {
1696 let tags_arr: Vec<String> = serde_json::from_str(&atom.tags).unwrap_or_default();
1697 tags_arr.iter().any(|t| t == "type:domain")
1698 };
1699 if (want_domain && !is_domain) || (want_atom && is_domain) {
1700 return None;
1701 }
1702 Some(Candidate {
1703 id: atom.id.to_string(),
1704 slug: atom.slug.clone(),
1705 name_raw: atom.name.clone(),
1706 description_raw: atom.description.clone(),
1707 tags_raw: Some(tags_str.clone()),
1708 status_raw: atom.status.clone(),
1709 finalized: atom.finalized,
1710 is_domain,
1711 name: matching::tokenize_field(&atom.name),
1712 description: atom
1713 .description
1714 .as_deref()
1715 .map(matching::tokenize_field)
1716 .unwrap_or_default(),
1717 tags: matching::tokenize_field(&tags_str),
1718 content: matching::tokenize_field(&atom.content),
1719 })
1720 })
1721 .collect()
1722}
1723
1724fn compute_idf(
1727 candidates: &[Candidate],
1728 terms: &[String],
1729 expanded: &HashSet<String>,
1730 discount: f32,
1731) -> HashMap<String, f32> {
1732 let n = candidates.len() as f32;
1733 let mut df: HashMap<String, usize> = terms.iter().map(|t| (t.clone(), 0)).collect();
1734 for cand in candidates {
1735 for term in terms {
1736 if matching::has_in_tokens(&cand.content, term)
1737 || matching::has_in_tokens(&cand.name, term)
1738 || matching::has_in_tokens(&cand.description, term)
1739 || matching::has_in_tokens(&cand.tags, term)
1740 {
1741 if let Some(d) = df.get_mut(term) {
1742 *d += 1;
1743 }
1744 }
1745 }
1746 }
1747 df.into_iter()
1748 .map(|(term, d)| {
1749 let raw = (n / (d as f32 + 1.0)).ln().max(0.1);
1750 let idf = if expanded.contains(&term) {
1751 raw * discount
1752 } else {
1753 raw
1754 };
1755 (term, idf)
1756 })
1757 .collect()
1758}
1759
1760fn score_field(tokens: &[String], terms: &[String], idf: &HashMap<String, f32>) -> f32 {
1761 let mut score = 0.0;
1762 for term in terms {
1763 let count = matching::count_in_tokens(tokens, term);
1764 if count > 0 {
1765 let tf = 1.0 + (count as f32).ln();
1766 score += tf * idf.get(term).copied().unwrap_or(1.0);
1767 }
1768 }
1769 score
1770}
1771
1772fn bigram_bonus_field(tokens: &[String], query_order: &[String]) -> f32 {
1773 if query_order.len() < 2 {
1774 return 0.0;
1775 }
1776 let filtered: Vec<&str> = tokens
1777 .iter()
1778 .filter(|t| !is_stop(t))
1779 .map(|t| t.as_str())
1780 .collect();
1781 let mut bonus = 0.0f32;
1782 for window in query_order.windows(2) {
1783 let (a, b) = (window[0].as_str(), window[1].as_str());
1784 for w in filtered.windows(2) {
1785 if w[0] == a && w[1] == b {
1786 bonus += 1.0;
1787 break;
1788 }
1789 }
1790 }
1791 bonus
1792}
1793
1794fn exact_name_bonus(name: &str, raw_query: &str, bonus: f32) -> f32 {
1795 let q = raw_query.trim().to_lowercase();
1796 if !q.is_empty() && name.to_lowercase().contains(&q) {
1797 bonus
1798 } else {
1799 0.0
1800 }
1801}
1802
1803fn score_candidate(
1804 cand: &Candidate,
1805 terms: &[String],
1806 original_terms: &[String],
1807 query_order: &[String],
1808 idf: &HashMap<String, f32>,
1809 raw_query: &str,
1810 w: &Weights,
1811) -> f32 {
1812 let bigrams = bigram_bonus_field(&cand.name, query_order)
1813 + bigram_bonus_field(&cand.description, query_order)
1814 + bigram_bonus_field(&cand.tags, query_order)
1815 + bigram_bonus_field(&cand.content, query_order);
1816
1817 let base = exact_name_bonus(&cand.name_raw, raw_query, w.w_exact_name)
1818 + w.w_name * score_field(&cand.name, terms, idf)
1819 + w.w_description * score_field(&cand.description, terms, idf)
1820 + w.w_tags * score_field(&cand.tags, terms, idf)
1821 + w.w_content * score_field(&cand.content, terms, idf)
1822 + w.w_bigram * bigrams;
1823
1824 if w.coverage_alpha > 0.0 && !original_terms.is_empty() {
1825 let matched = original_terms
1829 .iter()
1830 .filter(|orig| {
1831 let has_exact = matching::has_in_tokens(&cand.name, orig)
1834 || matching::has_in_tokens(&cand.description, orig)
1835 || matching::has_in_tokens(&cand.tags, orig)
1836 || matching::has_in_tokens(&cand.content, orig);
1837 if has_exact {
1838 return true;
1839 }
1840 terms.iter().filter(|t| *t != *orig).any(|exp| {
1842 matching::has_in_tokens(&cand.name, exp)
1843 || matching::has_in_tokens(&cand.description, exp)
1844 || matching::has_in_tokens(&cand.tags, exp)
1845 || matching::has_in_tokens(&cand.content, exp)
1846 })
1847 })
1848 .count();
1849 let coverage = matched as f32 / original_terms.len() as f32;
1850 base * coverage.powf(w.coverage_alpha)
1851 } else {
1852 base
1853 }
1854}
1855
1856fn expand_terms(terms: &mut Vec<String>) -> HashSet<String> {
1857 let originals: HashSet<String> = terms.iter().cloned().collect();
1858 let snapshot: Vec<String> = terms.clone();
1859 for t in &snapshot {
1860 if !t.ends_with('s') && t.len() >= 3 {
1861 terms.push(format!("{t}s"));
1862 }
1863 if t.ends_with("ies") && t.len() > 4 {
1864 let s = format!("{}y", &t[..t.len() - 3]);
1865 if s.len() >= 3 {
1866 terms.push(s);
1867 }
1868 } else if t.ends_with('s') && !t.ends_with("ss") && t.len() > 3 {
1869 let s = t[..t.len() - 1].to_string();
1870 if s.len() >= 3 {
1871 terms.push(s);
1872 }
1873 }
1874 }
1875 terms.sort();
1876 terms.dedup();
1877 terms
1878 .iter()
1879 .filter(|t| !originals.contains(*t))
1880 .cloned()
1881 .collect()
1882}
1883
1884fn quote_fts5_phrase(raw_query: &str) -> String {
1887 let escaped = raw_query.replace('"', "\"\"");
1888 format!("\"{escaped}\"")
1889}
1890
1891async fn fetch_fts_candidates(
1894 runtime: &KhiveRuntime,
1895 ns: &str,
1896 raw_query: &str,
1897 type_filter: Option<&str>,
1898 statuses: &[String],
1899 exclude_status: Option<&str>,
1900 fetch_limit: usize,
1901) -> Result<Vec<Atom>, RuntimeError> {
1902 let sql = runtime.sql();
1903 let mut reader = sql
1904 .reader()
1905 .await
1906 .map_err(|e| sql_err("search fts reader", e))?;
1907
1908 let match_expr = quote_fts5_phrase(raw_query);
1910 let fts_rows = reader
1911 .query_all(SqlStatement {
1912 sql: "SELECT id FROM fts_knowledge WHERE fts_knowledge MATCH ?1 AND namespace = ?2 LIMIT ?3".into(),
1913 params: vec![
1914 SqlValue::Text(match_expr),
1915 SqlValue::Text(ns.to_owned()),
1916 SqlValue::Integer(fetch_limit as i64),
1917 ],
1918 label: None,
1919 })
1920 .await
1921 .map_err(|e| sql_err("search fts query", e))?;
1922
1923 if fts_rows.is_empty() {
1924 let (status_clause, status_params) = status_sql_clause(statuses, exclude_status, 3);
1926 let sql_str = format!(
1927 "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL{} ORDER BY created_at DESC LIMIT ?2",
1928 status_clause
1929 );
1930 let mut params = vec![
1931 SqlValue::Text(ns.to_owned()),
1932 SqlValue::Integer(CANDIDATE_POOL as i64),
1933 ];
1934 params.extend(status_params);
1935
1936 let rows = reader
1937 .query_all(SqlStatement {
1938 sql: sql_str,
1939 params,
1940 label: None,
1941 })
1942 .await
1943 .map_err(|e| sql_err("search full scan", e))?;
1944
1945 let mut atoms: Vec<Atom> = rows.iter().filter_map(atom_from_row).collect();
1946 if let Some(filt) = type_filter {
1947 let want_domain = filt == "domain";
1948 atoms.retain(|a| {
1949 let tags_arr: Vec<String> = serde_json::from_str(&a.tags).unwrap_or_default();
1950 let is_domain = tags_arr.iter().any(|t| t == "type:domain");
1951 if want_domain {
1952 is_domain
1953 } else {
1954 !is_domain
1955 }
1956 });
1957 }
1958 return Ok(atoms);
1959 }
1960
1961 let ids: Vec<String> = fts_rows.iter().filter_map(|r| row_str(r, "id")).collect();
1962 let placeholders: String = ids
1963 .iter()
1964 .enumerate()
1965 .map(|(i, _)| format!("?{}", i + 2))
1966 .collect::<Vec<_>>()
1967 .join(",");
1968
1969 let (status_clause, status_params) = status_sql_clause(statuses, exclude_status, ids.len() + 2);
1970 let mut params: Vec<SqlValue> = vec![SqlValue::Text(ns.to_owned())];
1971 params.extend(ids.iter().map(|id| SqlValue::Text(id.clone())));
1972 params.extend(status_params);
1973
1974 let rows = reader
1975 .query_all(SqlStatement {
1976 sql: format!(
1977 "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND id IN ({placeholders}) AND deleted_at IS NULL{status_clause}"
1978 ),
1979 params,
1980 label: None,
1981 })
1982 .await
1983 .map_err(|e| sql_err("search load atoms", e))?;
1984
1985 Ok(rows.iter().filter_map(atom_from_row).collect())
1986}
1987
1988struct SearchCtx<'a> {
1991 runtime: &'a KhiveRuntime,
1992 ns: &'a str,
1993 role: Option<&'a str>,
1994 type_filter: Option<&'a str>,
1995 min_score: f32,
1996 w: &'a Weights,
1997 fetch_limit: usize,
1998 statuses: &'a [String],
1999 exclude_status: Option<&'a str>,
2000}
2001
2002async fn search_core(ctx: &SearchCtx<'_>, query: &str) -> Result<Vec<ScoredHit>, RuntimeError> {
2005 let runtime = ctx.runtime;
2006 let ns = ctx.ns;
2007 let role = ctx.role;
2008 let type_filter = ctx.type_filter;
2009 let min_score = ctx.min_score;
2010 let w = ctx.w;
2011 let fetch_limit = ctx.fetch_limit;
2012 let raw_query = query.trim().to_string();
2013 if raw_query.is_empty() {
2014 return Ok(Vec::new());
2015 }
2016
2017 let scored_query = match role {
2018 Some(r) if !r.trim().is_empty() => format!("{} {}", r.trim(), raw_query),
2019 _ => raw_query.clone(),
2020 };
2021
2022 let (terms, original_terms, query_order, expanded) = {
2023 let raw_tokens: Vec<String> = matching::tokenize_field(&scored_query)
2024 .into_iter()
2025 .filter(|w| w.len() >= MIN_TERM_LEN && !is_stop(w))
2026 .collect();
2027 let mut seen = HashSet::new();
2028 let qo: Vec<String> = raw_tokens
2029 .iter()
2030 .filter(|w| seen.insert(w.as_str()))
2031 .cloned()
2032 .collect();
2033 let mut t = raw_tokens;
2034 t.sort();
2035 t.dedup();
2036 let originals = t.clone();
2037 let exp = expand_terms(&mut t);
2038 (t, originals, qo, exp)
2039 };
2040 let terms_only_exact = terms.is_empty();
2043
2044 let atoms = fetch_fts_candidates(
2045 runtime,
2046 ns,
2047 &raw_query,
2048 type_filter,
2049 ctx.statuses,
2050 ctx.exclude_status,
2051 CANDIDATE_POOL,
2052 )
2053 .await?;
2054 if atoms.is_empty() {
2055 return Ok(Vec::new());
2056 }
2057
2058 let candidates = load_candidates_from_atoms(&atoms, type_filter);
2059 if candidates.is_empty() {
2060 return Ok(Vec::new());
2061 }
2062
2063 let idf = compute_idf(&candidates, &terms, &expanded, w.expand_discount);
2064 let mut scored: Vec<(f32, &Candidate)> = candidates
2065 .iter()
2066 .filter_map(|cand| {
2067 let base = if terms_only_exact {
2068 exact_name_bonus(&cand.name_raw, &raw_query, w.w_exact_name)
2071 } else {
2072 score_candidate(
2073 cand,
2074 &terms,
2075 &original_terms,
2076 &query_order,
2077 &idf,
2078 &raw_query,
2079 w,
2080 )
2081 };
2082 (base > min_score).then_some((base, cand))
2083 })
2084 .collect();
2085 scored.sort_by(|a, b| {
2086 b.0.partial_cmp(&a.0)
2087 .unwrap_or(std::cmp::Ordering::Equal)
2088 .then_with(|| a.1.slug.cmp(&b.1.slug))
2089 });
2090
2091 Ok(scored
2092 .into_iter()
2093 .take(fetch_limit)
2094 .map(|(score, cand)| ScoredHit {
2095 id: cand.id.clone(),
2096 slug: cand.slug.clone(),
2097 name: cand.name_raw.clone(),
2098 description: cand.description_raw.clone(),
2099 tags: cand.tags_raw.clone(),
2100 status: cand.status_raw.clone(),
2101 finalized: cand.finalized,
2102 is_domain: cand.is_domain,
2103 score,
2104 })
2105 .collect())
2106}
2107
2108async fn search_decomposed(
2111 ctx: &SearchCtx<'_>,
2112 query: &str,
2113 intersection_bonus: f32,
2114) -> Result<Vec<ScoredHit>, RuntimeError> {
2115 let non_stop: Vec<&str> = query
2116 .split_whitespace()
2117 .filter(|w| w.len() >= MIN_TERM_LEN && !is_stop(&w.to_lowercase()))
2118 .collect();
2119
2120 let mid = non_stop.len() / 2;
2121 let sub_q1: String = non_stop[..mid].join(" ");
2122 let sub_q2: String = non_stop[mid..].join(" ");
2123 let sub_limit = ctx.fetch_limit.min(50);
2124
2125 let full = search_core(ctx, query).await?;
2126 let sub_ctx1 = SearchCtx {
2127 runtime: ctx.runtime,
2128 ns: ctx.ns,
2129 role: None,
2130 type_filter: ctx.type_filter,
2131 min_score: 0.0,
2132 w: ctx.w,
2133 fetch_limit: sub_limit,
2134 statuses: ctx.statuses,
2135 exclude_status: ctx.exclude_status,
2136 };
2137 let s1 = search_core(&sub_ctx1, &sub_q1).await?;
2138 let s2 = search_core(&sub_ctx1, &sub_q2).await?;
2139
2140 let mut scores: HashMap<String, f32> = HashMap::new();
2141 let mut data: HashMap<String, ScoredHit> = HashMap::new();
2142
2143 for hit in full {
2144 scores.insert(hit.id.clone(), hit.score);
2145 data.insert(hit.id.clone(), hit);
2146 }
2147
2148 let mut sub_counts: HashMap<String, u32> = HashMap::new();
2149 for hits in [s1, s2] {
2150 let mut seen: HashSet<String> = HashSet::new();
2151 for hit in hits {
2152 if !seen.insert(hit.id.clone()) {
2153 continue;
2154 }
2155 *sub_counts.entry(hit.id.clone()).or_default() += 1;
2156 if !data.contains_key(&hit.id) {
2157 scores.insert(hit.id.clone(), hit.score * 0.3);
2158 data.insert(hit.id.clone(), hit);
2159 }
2160 }
2161 }
2162
2163 for (id, count) in &sub_counts {
2164 if *count >= 2 {
2165 if let Some(s) = scores.get_mut(id) {
2166 *s *= 1.0 + intersection_bonus * (*count as f32 - 1.0);
2167 }
2168 }
2169 }
2170
2171 let mut ranked: Vec<ScoredHit> = data
2172 .into_values()
2173 .map(|mut h| {
2174 if let Some(&s) = scores.get(&h.id) {
2175 h.score = s;
2176 }
2177 h
2178 })
2179 .collect();
2180 ranked.sort_by(|a, b| {
2181 b.score
2182 .partial_cmp(&a.score)
2183 .unwrap_or(std::cmp::Ordering::Equal)
2184 .then_with(|| a.slug.cmp(&b.slug))
2185 });
2186 ranked.truncate(ctx.fetch_limit);
2187 Ok(ranked)
2188}
2189
2190async fn embed_cosine_scores(
2195 runtime: &KhiveRuntime,
2196 query: &str,
2197 candidate_texts: &[String],
2198) -> Option<Vec<f32>> {
2199 if runtime.default_embedder_name().is_empty() || candidate_texts.is_empty() {
2200 return None;
2201 }
2202 let mut texts = Vec::with_capacity(candidate_texts.len() + 1);
2203 texts.push(query.to_string());
2204 texts.extend_from_slice(candidate_texts);
2205 let embeddings = runtime.embed_batch(&texts).await.ok()?;
2206 if embeddings.len() != texts.len() {
2207 return None;
2208 }
2209 let query_emb = &embeddings[0];
2210 Some(
2211 embeddings[1..]
2212 .iter()
2213 .map(|emb| cosine_similarity(query_emb, emb))
2214 .collect(),
2215 )
2216}
2217
2218async fn rerank_with_embeddings(
2219 runtime: &KhiveRuntime,
2220 query: &str,
2221 hits: &mut [ScoredHit],
2222 alpha: f32,
2223) -> Result<(), RuntimeError> {
2224 if hits.is_empty() {
2225 return Ok(());
2226 }
2227 let texts: Vec<String> = hits
2228 .iter()
2229 .map(|h| format!("{} {}", h.name, h.description.as_deref().unwrap_or("")))
2230 .collect();
2231 if let Some(cosines) = embed_cosine_scores(runtime, query, &texts).await {
2232 let max_tfidf = hits
2233 .iter()
2234 .map(|h| h.score)
2235 .fold(0.0f32, f32::max)
2236 .max(1e-6);
2237 for (hit, cos) in hits.iter_mut().zip(cosines.iter()) {
2238 let norm_tfidf = hit.score / max_tfidf;
2239 hit.score = alpha * norm_tfidf + (1.0 - alpha) * cos.max(0.0);
2240 }
2241 hits.sort_by(|a, b| {
2242 b.score
2243 .partial_cmp(&a.score)
2244 .unwrap_or(std::cmp::Ordering::Equal)
2245 .then_with(|| a.slug.cmp(&b.slug))
2246 });
2247 }
2248 Ok(())
2249}
2250
2251fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
2252 if a.len() != b.len() || a.is_empty() {
2253 return 0.0;
2254 }
2255 let mut dot = 0.0f32;
2256 let mut norm_a = 0.0f32;
2257 let mut norm_b = 0.0f32;
2258 for i in 0..a.len() {
2259 dot += a[i] * b[i];
2260 norm_a += a[i] * a[i];
2261 norm_b += b[i] * b[i];
2262 }
2263 let denom = norm_a.sqrt() * norm_b.sqrt();
2264 if denom < 1e-8 {
2265 0.0
2266 } else {
2267 dot / denom
2268 }
2269}
2270
2271async fn compute_embedding_coverage(
2274 runtime: &KhiveRuntime,
2275 token: &NamespaceToken,
2276 ns: &str,
2277 total_atoms: i64,
2278) -> Result<f64, RuntimeError> {
2279 if total_atoms <= 0 || runtime.default_embedder_name().is_empty() {
2280 return Ok(0.0);
2281 }
2282
2283 match runtime.vectors(token) {
2284 Ok(_) => {}
2285 Err(RuntimeError::Unconfigured(_)) => return Ok(0.0),
2286 Err(e) => return Err(e),
2287 }
2288
2289 let model = runtime.default_embedder_name().to_owned();
2290 let table_name = format!("vec_{}", vamana::sanitize_model_key(&model));
2291 let sql = runtime.sql();
2292 let mut reader = sql
2293 .reader()
2294 .await
2295 .map_err(|e| sql_err("stats embedding coverage reader", e))?;
2296
2297 let count = reader
2298 .query_scalar(SqlStatement {
2299 sql: format!(
2300 "SELECT COUNT(DISTINCT a.id) \
2301 FROM knowledge_atoms a \
2302 WHERE a.namespace = ?1 \
2303 AND a.deleted_at IS NULL \
2304 AND a.tags NOT LIKE '%type:domain%' \
2305 AND a.id IN ( \
2306 SELECT v.subject_id FROM {table_name} v \
2307 WHERE v.namespace = ?1 \
2308 AND v.embedding_model = ?2 \
2309 AND v.field = 'knowledge.atom' \
2310 )"
2311 ),
2312 params: vec![SqlValue::Text(ns.to_owned()), SqlValue::Text(model.clone())],
2313 label: Some("knowledge_stats_embedding_coverage".into()),
2314 })
2315 .await
2316 .map_err(|e| sql_err("stats embedding coverage", e))?;
2317
2318 let atoms_with_vector = match count {
2319 Some(SqlValue::Integer(n)) => n,
2320 Some(other) => {
2321 return Err(RuntimeError::Internal(format!(
2322 "stats embedding coverage returned non-integer count: {other:?}"
2323 )));
2324 }
2325 None => 0,
2326 };
2327
2328 Ok(atoms_with_vector as f64 / total_atoms as f64)
2329}
2330
2331struct ScoredTextItem {
2334 id: String,
2335 slug: String,
2336 name: String,
2337 text: String,
2338 score: f32,
2339}
2340
2341async fn load_domain_by_id_or_slug(
2342 runtime: &KhiveRuntime,
2343 ns: &str,
2344 id_or_slug: &str,
2345) -> Result<Domain, RuntimeError> {
2346 let sql = runtime.sql();
2347 let mut reader = sql
2348 .reader()
2349 .await
2350 .map_err(|e| sql_err("compose domain reader", e))?;
2351 let id = id_or_slug.trim().to_string();
2352 let row = if id.parse::<Uuid>().is_ok() {
2353 reader
2354 .query_row(SqlStatement {
2355 sql: "SELECT * FROM knowledge_domains WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2356 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2357 label: None,
2358 })
2359 .await
2360 .map_err(|e| sql_err("compose domain by id", e))?
2361 } else {
2362 reader
2363 .query_row(SqlStatement {
2364 sql: "SELECT * FROM knowledge_domains WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2365 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2366 label: None,
2367 })
2368 .await
2369 .map_err(|e| sql_err("compose domain by slug", e))?
2370 };
2371 row.and_then(|r| domain_from_row(&r))
2372 .ok_or_else(|| RuntimeError::NotFound(format!("domain not found: {id:?}")))
2373}
2374
2375async fn load_atom_by_id_or_slug(
2376 runtime: &KhiveRuntime,
2377 ns: &str,
2378 id_or_slug: &str,
2379) -> Result<Atom, RuntimeError> {
2380 let sql = runtime.sql();
2381 let mut reader = sql
2382 .reader()
2383 .await
2384 .map_err(|e| sql_err("compose atom reader", e))?;
2385 let id = id_or_slug.trim().to_string();
2386 let row = if id.parse::<Uuid>().is_ok() {
2387 reader
2388 .query_row(SqlStatement {
2389 sql: "SELECT * FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2390 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2391 label: None,
2392 })
2393 .await
2394 .map_err(|e| sql_err("compose atom by id", e))?
2395 } else {
2396 reader
2397 .query_row(SqlStatement {
2398 sql: "SELECT * FROM knowledge_atoms WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2399 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2400 label: None,
2401 })
2402 .await
2403 .map_err(|e| sql_err("compose atom by slug", e))?
2404 };
2405 row.and_then(|r| atom_from_row(&r))
2406 .ok_or_else(|| RuntimeError::NotFound(format!("atom not found: {id:?}")))
2407}
2408
2409fn parse_domain_members(domain: &Domain) -> Result<Vec<String>, RuntimeError> {
2410 if domain.members.is_empty() || domain.members == "[]" {
2411 return Ok(Vec::new());
2412 }
2413 serde_json::from_str::<Vec<String>>(&domain.members).map_err(|e| {
2414 RuntimeError::Internal(format!(
2415 "domain {:?} has invalid members JSON: {e}",
2416 domain.slug
2417 ))
2418 })
2419}
2420
2421async fn rerank_text_items(
2422 runtime: &KhiveRuntime,
2423 query: &str,
2424 items: &mut [ScoredTextItem],
2425) -> Result<(), RuntimeError> {
2426 if items.is_empty() {
2427 return Ok(());
2428 }
2429 let texts: Vec<String> = items.iter().map(|item| item.text.clone()).collect();
2430 if let Some(cosines) = embed_cosine_scores(runtime, query, &texts).await {
2431 for (item, cos) in items.iter_mut().zip(cosines.iter()) {
2432 item.score = cos.max(0.0);
2433 }
2434 items.sort_by(|a, b| {
2435 b.score
2436 .partial_cmp(&a.score)
2437 .unwrap_or(std::cmp::Ordering::Equal)
2438 .then_with(|| a.slug.cmp(&b.slug))
2439 });
2440 }
2441 Ok(())
2442}
2443
2444fn format_compose_markdown(query: &str, domains: &[Domain], atoms: &[(&Atom, f32)]) -> String {
2445 let mut out = String::from("# Knowledge Briefing\n\n");
2446 out.push_str(&format!("Query: {query}\n"));
2447 for (atom, score) in atoms {
2448 out.push_str(&format!("\n## {}\n\n", atom.name));
2449 out.push_str(&format!("Source: {}\n", atom.slug));
2450 out.push_str(&format!("Score: {:.4}\n", score));
2451 if let Some(ref desc) = atom.description {
2452 if !desc.is_empty() {
2453 out.push('\n');
2454 out.push_str(desc);
2455 out.push('\n');
2456 }
2457 }
2458 if !atom.content.is_empty() {
2459 out.push('\n');
2460 out.push_str(&atom.content);
2461 out.push('\n');
2462 }
2463 }
2464 if !domains.is_empty() {
2465 out.push_str("\n---\n\nDomains: ");
2466 let names: Vec<&str> = domains.iter().map(|d| d.name.as_str()).collect();
2467 out.push_str(&names.join(", "));
2468 out.push('\n');
2469 }
2470 out
2471}
2472
2473fn atom_embed_text(atom: &Atom) -> String {
2476 let mut parts: Vec<&str> = Vec::with_capacity(3);
2477 if !atom.name.is_empty() {
2478 parts.push(&atom.name);
2479 }
2480 if let Some(ref desc) = atom.description {
2481 if !desc.is_empty() {
2482 parts.push(desc.as_str());
2483 }
2484 }
2485 if !atom.content.is_empty() {
2486 parts.push(&atom.content);
2487 }
2488 parts.join("\n\n")
2489}
2490
2491#[allow(dead_code)]
2494fn section_from_row(row: &khive_storage::types::SqlRow) -> Option<Section> {
2495 let id: Uuid = row_str(row, "id")?.parse().ok()?;
2496 let st_str = row_str(row, "section_type")?;
2497 let section_type = SectionType::from_str_loose(&st_str)?;
2498 Some(Section {
2499 id,
2500 atom_id: row_str(row, "atom_id")?,
2501 namespace: row_str(row, "namespace")?,
2502 section_type,
2503 heading: row_str(row, "heading").unwrap_or_default(),
2504 content: row_str(row, "content").unwrap_or_default(),
2505 tokens: row_i64(row, "tokens").unwrap_or(0),
2506 sort_order: row_i64(row, "sort_order").unwrap_or(0),
2507 created_at: row_i64(row, "created_at").unwrap_or(0),
2508 updated_at: row_i64(row, "updated_at").unwrap_or(0),
2509 })
2510}
2511
2512#[allow(dead_code)]
2513fn section_to_json(s: &Section) -> Value {
2514 json!({
2515 "id": s.id.to_string(),
2516 "atom_id": s.atom_id,
2517 "namespace": s.namespace,
2518 "section_type": s.section_type.as_str(),
2519 "heading": s.heading,
2520 "content": s.content,
2521 "tokens": s.tokens,
2522 "sort_order": s.sort_order,
2523 "created_at": s.created_at,
2524 "updated_at": s.updated_at,
2525 })
2526}
2527
2528fn count_tokens(text: &str) -> i64 {
2530 text.split_whitespace().count() as i64
2531}
2532
2533fn parse_section_type(s: &str) -> Result<SectionType, RuntimeError> {
2536 SectionType::from_str_loose(s).ok_or_else(|| {
2537 RuntimeError::InvalidInput(format!(
2538 "unknown section_type {s:?}; valid values: {}",
2539 SectionType::NAMES.join(", ")
2540 ))
2541 })
2542}
2543
2544async fn resolve_atom_id(
2545 runtime: &KhiveRuntime,
2546 ns: &str,
2547 id_or_slug: &str,
2548) -> Result<String, RuntimeError> {
2549 let sql = runtime.sql();
2550 let mut reader = sql
2551 .reader()
2552 .await
2553 .map_err(|e| sql_err("resolve_atom_id reader", e))?;
2554 let id = id_or_slug.trim().to_string();
2555 let row = if id.parse::<Uuid>().is_ok() {
2556 reader
2557 .query_row(SqlStatement {
2558 sql: "SELECT id FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2559 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2560 label: None,
2561 })
2562 .await
2563 .map_err(|e| sql_err("resolve_atom_id by id", e))?
2564 } else {
2565 reader
2566 .query_row(SqlStatement {
2567 sql: "SELECT id FROM knowledge_atoms WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2568 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2569 label: None,
2570 })
2571 .await
2572 .map_err(|e| sql_err("resolve_atom_id by slug", e))?
2573 };
2574 row.and_then(|r| row_str(&r, "id"))
2575 .ok_or_else(|| RuntimeError::NotFound(format!("atom not found: {id:?}")))
2576}
2577
2578impl KnowledgeHandlers {
2579 pub(crate) async fn edit(
2587 runtime: &KhiveRuntime,
2588 token: &NamespaceToken,
2589 params: Value,
2590 ) -> Result<Value, RuntimeError> {
2591 let p: EditParams = deser(params)?;
2592 if p.sections.is_empty() {
2593 return Err(RuntimeError::InvalidInput(
2594 "sections must not be empty".into(),
2595 ));
2596 }
2597
2598 let ns = token.namespace().as_str().to_owned();
2599 let sql = runtime.sql();
2600
2601 let atom_id = {
2603 let mut reader = sql
2604 .reader()
2605 .await
2606 .map_err(|e| sql_err("edit atom reader", e))?;
2607 let id = p.id.trim().to_string();
2608 let row = if id.parse::<Uuid>().is_ok() {
2609 reader
2610 .query_row(SqlStatement {
2611 sql: "SELECT id FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2612 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
2613 label: None,
2614 })
2615 .await
2616 .map_err(|e| sql_err("edit atom lookup by id", e))?
2617 } else {
2618 reader
2619 .query_row(SqlStatement {
2620 sql: "SELECT id FROM knowledge_atoms WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2621 params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
2622 label: None,
2623 })
2624 .await
2625 .map_err(|e| sql_err("edit atom lookup by slug", e))?
2626 };
2627 row.and_then(|r| row_str(&r, "id"))
2628 .ok_or_else(|| RuntimeError::NotFound(format!("atom not found: {:?}", p.id)))?
2629 };
2630
2631 let now = now_us();
2632 let mut upserted = 0usize;
2633 let mut section_results: Vec<Value> = Vec::with_capacity(p.sections.len());
2634
2635 for su in &p.sections {
2636 let stype = parse_section_type(&su.section_type)?;
2637 let heading = su.heading.as_deref().unwrap_or(stype.as_str()).to_string();
2638 let tokens = count_tokens(&su.content);
2639 let sort_order = su.sort_order.unwrap_or_else(|| {
2640 SectionType::ALL
2641 .iter()
2642 .position(|&t| t == stype)
2643 .unwrap_or(9) as i64
2644 });
2645
2646 let mut reader = sql
2648 .reader()
2649 .await
2650 .map_err(|e| sql_err("edit section reader", e))?;
2651 let existing_section = reader
2652 .query_row(SqlStatement {
2653 sql: "SELECT id, status FROM knowledge_sections WHERE atom_id = ?1 AND section_type = ?2 LIMIT 1".into(),
2654 params: vec![
2655 SqlValue::Text(atom_id.clone()),
2656 SqlValue::Text(stype.as_str().to_string()),
2657 ],
2658 label: None,
2659 })
2660 .await
2661 .map_err(|e| sql_err("edit section lookup", e))?;
2662
2663 let was_verified = existing_section
2664 .as_ref()
2665 .and_then(|r| row_str(r, "status"))
2666 .as_deref()
2667 == Some("verified");
2668 let section_id = existing_section
2669 .as_ref()
2670 .and_then(|r| row_str(r, "id"))
2671 .unwrap_or_else(new_id);
2672
2673 let mut writer = sql
2674 .writer()
2675 .await
2676 .map_err(|e| sql_err("edit section writer", e))?;
2677 writer
2678 .execute(SqlStatement {
2679 sql: "INSERT INTO knowledge_sections \
2680 (id, atom_id, namespace, section_type, heading, content, tokens, sort_order, created_at, updated_at) \
2681 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
2682 ON CONFLICT(atom_id, section_type) DO UPDATE SET \
2683 heading=excluded.heading, \
2684 content=excluded.content, \
2685 tokens=excluded.tokens, \
2686 sort_order=excluded.sort_order, \
2687 embedding=NULL, \
2688 updated_at=excluded.updated_at"
2689 .into(),
2690 params: vec![
2691 SqlValue::Text(section_id.clone()),
2692 SqlValue::Text(atom_id.clone()),
2693 SqlValue::Text(ns.clone()),
2694 SqlValue::Text(stype.as_str().to_string()),
2695 SqlValue::Text(heading.clone()),
2696 SqlValue::Text(su.content.clone()),
2697 SqlValue::Integer(tokens),
2698 SqlValue::Integer(sort_order),
2699 SqlValue::Integer(now),
2700 SqlValue::Integer(now),
2701 ],
2702 label: None,
2703 })
2704 .await
2705 .map_err(|e| sql_err("edit section upsert", e))?;
2706
2707 if was_verified {
2708 writer
2709 .execute(SqlStatement {
2710 sql: "UPDATE knowledge_sections SET status='reviewed' WHERE atom_id=?1 AND section_type=?2 AND status='verified'".into(),
2711 params: vec![
2712 SqlValue::Text(atom_id.clone()),
2713 SqlValue::Text(stype.as_str().to_string()),
2714 ],
2715 label: None,
2716 })
2717 .await
2718 .map_err(|e| sql_err("edit section status transition", e))?;
2719 }
2720
2721 upserted += 1;
2722 section_results.push(json!({
2723 "id": section_id,
2724 "atom_id": atom_id,
2725 "section_type": stype.as_str(),
2726 "heading": heading,
2727 "tokens": tokens,
2728 }));
2729 }
2730
2731 Ok(json!({
2732 "atom_id": atom_id,
2733 "upserted": upserted,
2734 "sections": section_results,
2735 }))
2736 }
2737
2738 pub(crate) async fn import(
2751 runtime: &KhiveRuntime,
2752 token: &NamespaceToken,
2753 params: Value,
2754 ) -> Result<Value, RuntimeError> {
2755 let p: ImportParams = deser(params)?;
2756 let path_str = p.path.trim().to_string();
2757 if path_str.is_empty() {
2758 return Err(RuntimeError::InvalidInput("path must not be empty".into()));
2759 }
2760
2761 let chunk_strategy = p
2762 .chunk_strategy
2763 .as_deref()
2764 .unwrap_or("section")
2765 .to_ascii_lowercase();
2766 if !["section", "atom"].contains(&chunk_strategy.as_str()) {
2767 return Err(RuntimeError::InvalidInput(format!(
2768 "unknown chunk_strategy {:?}; valid: section | atom",
2769 chunk_strategy
2770 )));
2771 }
2772 let format = p.format.as_deref().unwrap_or("atlas_md");
2773 if format != "atlas_md" {
2774 return Err(RuntimeError::InvalidInput(format!(
2775 "unknown format {format:?}; only \"atlas_md\" is supported"
2776 )));
2777 }
2778
2779 let md_path = std::path::Path::new(&path_str);
2780 if !md_path.exists() {
2781 return Err(RuntimeError::NotFound(format!(
2782 "path does not exist: {path_str:?}"
2783 )));
2784 }
2785
2786 let files: Vec<std::path::PathBuf> = if md_path.is_file() {
2788 vec![md_path.to_path_buf()]
2789 } else if md_path.is_dir() {
2790 let mut v = Vec::new();
2791 collect_md_files(md_path, &mut v);
2792 v
2793 } else {
2794 return Err(RuntimeError::InvalidInput(format!(
2795 "path is not a file or directory: {path_str:?}"
2796 )));
2797 };
2798
2799 if files.is_empty() {
2800 return Ok(json!({
2801 "imported_atoms": 0,
2802 "imported_sections": 0,
2803 "files_processed": 0,
2804 }));
2805 }
2806
2807 let mut imported_atoms = 0usize;
2808 let mut imported_sections = 0usize;
2809
2810 for file in &files {
2811 let content = std::fs::read_to_string(file)
2812 .map_err(|e| RuntimeError::Internal(format!("failed to read {:?}: {e}", file)))?;
2813
2814 let stem = file
2815 .file_stem()
2816 .and_then(|s| s.to_str())
2817 .unwrap_or("unknown");
2818 let slug = to_slug(stem);
2819
2820 let (atom_name, atom_body, sections) = parse_atlas_md(&content);
2821 let name = if atom_name.is_empty() {
2822 slug.replace('-', " ")
2823 } else {
2824 atom_name
2825 };
2826
2827 let atlas_id = extract_atlas_id(&content);
2829 let citation_count = sections
2830 .iter()
2831 .filter(|(stype, _, _)| *stype == SectionType::References)
2832 .map(|(_, _, body)| body.lines().filter(|line| !line.trim().is_empty()).count())
2833 .sum::<usize>();
2834 let source_uri = atlas_id.as_ref().map(|id| format!("atlas:{id}"));
2835 let source_type = if citation_count > 0 {
2836 "paper"
2837 } else {
2838 "imported"
2839 };
2840 let mut properties = serde_json::Map::new();
2841 if let Some(ref id) = atlas_id {
2842 properties.insert("atlas_id".to_string(), Value::String(id.clone()));
2843 }
2844
2845 let upsert_params = serde_json::json!({
2846 "atoms": [{
2847 "slug": slug,
2848 "name": name,
2849 "content": atom_body,
2850 "properties": Value::Object(properties),
2851 "source_uri": source_uri,
2852 "source_type": source_type,
2853 }]
2854 });
2855 KnowledgeHandlers::upsert_atoms(runtime, token, upsert_params).await?;
2856 imported_atoms += 1;
2857
2858 if chunk_strategy == "section" && !sections.is_empty() {
2860 let section_updates: Vec<Value> = sections
2861 .iter()
2862 .map(|(stype, heading, body)| {
2863 json!({
2864 "section_type": stype.as_str(),
2865 "heading": heading,
2866 "content": body,
2867 })
2868 })
2869 .collect();
2870 let edit_params = json!({
2871 "id": slug,
2872 "sections": section_updates,
2873 });
2874 let result = KnowledgeHandlers::edit(runtime, token, edit_params).await?;
2875 if let Some(n) = result.get("upserted").and_then(|v| v.as_u64()) {
2876 imported_sections += n as usize;
2877 }
2878 }
2879 }
2880
2881 Ok(json!({
2882 "imported_atoms": imported_atoms,
2883 "imported_sections": imported_sections,
2884 "files_processed": files.len(),
2885 }))
2886 }
2887
2888 pub(crate) async fn challenge(
2891 runtime: &KhiveRuntime,
2892 token: &NamespaceToken,
2893 params: Value,
2894 ) -> Result<Value, RuntimeError> {
2895 let p: ChallengeParams = deser(params)?;
2896 let ns = token.namespace().as_str().to_owned();
2897 let sql = runtime.sql();
2898
2899 let atom_id = resolve_atom_id(runtime, &ns, &p.atom_id).await?;
2900 let stype = parse_section_type(&p.section_type)?;
2901
2902 let mut writer = sql
2903 .writer()
2904 .await
2905 .map_err(|e| sql_err("challenge writer", e))?;
2906
2907 let affected = writer
2908 .execute(SqlStatement {
2909 sql: "UPDATE knowledge_sections SET status='disputed' WHERE atom_id=?1 AND section_type=?2 AND status NOT IN ('disputed','deprecated')".into(),
2910 params: vec![
2911 SqlValue::Text(atom_id.clone()),
2912 SqlValue::Text(stype.as_str().to_string()),
2913 ],
2914 label: None,
2915 })
2916 .await
2917 .map_err(|e| sql_err("challenge section status", e))?;
2918
2919 if affected == 0 {
2920 return Err(RuntimeError::InvalidInput(
2921 "section not found, already disputed, or deprecated".into(),
2922 ));
2923 }
2924
2925 writer
2926 .execute(SqlStatement {
2927 sql: "UPDATE knowledge_atoms SET properties=json_set(coalesce(properties,'{}'),'$.dispute_count',coalesce(json_extract(properties,'$.dispute_count'),0)+1) WHERE id=?1 AND namespace=?2".into(),
2928 params: vec![
2929 SqlValue::Text(atom_id.clone()),
2930 SqlValue::Text(ns.clone()),
2931 ],
2932 label: None,
2933 })
2934 .await
2935 .map_err(|e| sql_err("challenge dispute_count increment", e))?;
2936
2937 Ok(json!({
2938 "atom_id": atom_id,
2939 "section_type": stype.as_str(),
2940 "reason": p.reason,
2941 }))
2942 }
2943
2944 pub(crate) async fn adjudicate(
2947 runtime: &KhiveRuntime,
2948 token: &NamespaceToken,
2949 params: Value,
2950 ) -> Result<Value, RuntimeError> {
2951 let p: AdjudicateParams = deser(params)?;
2952 let ns = token.namespace().as_str().to_owned();
2953 let sql = runtime.sql();
2954
2955 let resolution = p.resolution.trim().to_ascii_lowercase();
2956 if resolution != "accept" && resolution != "reject" {
2957 return Err(RuntimeError::InvalidInput(
2958 "resolution must be \"accept\" or \"reject\"".into(),
2959 ));
2960 }
2961
2962 let atom_id = resolve_atom_id(runtime, &ns, &p.atom_id).await?;
2963 let stype = parse_section_type(&p.section_type)?;
2964
2965 let new_status = if resolution == "accept" {
2966 "verified"
2967 } else {
2968 "reviewed"
2969 };
2970
2971 let mut writer = sql
2972 .writer()
2973 .await
2974 .map_err(|e| sql_err("adjudicate writer", e))?;
2975
2976 let affected = writer
2977 .execute(SqlStatement {
2978 sql: format!(
2979 "UPDATE knowledge_sections SET status='{new_status}' WHERE atom_id=?1 AND section_type=?2 AND status='disputed'"
2980 ),
2981 params: vec![
2982 SqlValue::Text(atom_id.clone()),
2983 SqlValue::Text(stype.as_str().to_string()),
2984 ],
2985 label: None,
2986 })
2987 .await
2988 .map_err(|e| sql_err("adjudicate section status", e))?;
2989
2990 if affected == 0 {
2991 return Err(RuntimeError::InvalidInput(
2992 "section not found or not in disputed state".into(),
2993 ));
2994 }
2995
2996 writer
2997 .execute(SqlStatement {
2998 sql: "UPDATE knowledge_atoms SET properties=json_set(coalesce(properties,'{}'),'$.dispute_count',CASE WHEN coalesce(json_extract(properties,'$.dispute_count'),0) > 0 THEN coalesce(json_extract(properties,'$.dispute_count'),0)-1 ELSE 0 END) WHERE id=?1 AND namespace=?2".into(),
2999 params: vec![
3000 SqlValue::Text(atom_id.clone()),
3001 SqlValue::Text(ns.clone()),
3002 ],
3003 label: None,
3004 })
3005 .await
3006 .map_err(|e| sql_err("adjudicate dispute_count decrement", e))?;
3007
3008 Ok(json!({
3009 "atom_id": atom_id,
3010 "section_type": stype.as_str(),
3011 "resolution": resolution,
3012 "new_status": new_status,
3013 }))
3014 }
3015}
3016
3017fn collect_md_files(dir: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
3021 if let Ok(entries) = std::fs::read_dir(dir) {
3022 for entry in entries.flatten() {
3023 let path = entry.path();
3024 if path.is_dir() {
3025 collect_md_files(&path, out);
3026 } else if path.extension().and_then(|e| e.to_str()) == Some("md") {
3027 out.push(path);
3028 }
3029 }
3030 }
3031}
3032
3033fn to_slug(stem: &str) -> String {
3038 stem.to_ascii_lowercase()
3039 .chars()
3040 .map(|c| {
3041 if c.is_ascii_alphanumeric() || c == '-' {
3042 c
3043 } else {
3044 '-'
3045 }
3046 })
3047 .collect::<String>()
3048 .split('-')
3049 .filter(|s| !s.is_empty())
3050 .collect::<Vec<_>>()
3051 .join("-")
3052}
3053
3054fn extract_atlas_id(content: &str) -> Option<String> {
3077 content.lines().take(32).find_map(|line| {
3078 let trimmed = line.trim();
3079 trimmed
3080 .strip_prefix("atlas_id:")
3081 .or_else(|| trimmed.strip_prefix("atlas-id:"))
3082 .map(str::trim)
3083 .filter(|s| !s.is_empty())
3084 .map(str::to_string)
3085 })
3086}
3087
3088fn parse_atlas_md(content: &str) -> (String, String, Vec<(SectionType, String, String)>) {
3089 let mut name = String::new();
3090 let mut pre_body = String::new();
3091 let mut sections: Vec<(SectionType, String, String)> = Vec::new();
3092
3093 let mut in_pre = true;
3095 let mut current_heading: Option<(SectionType, String)> = None;
3096 let mut current_body = String::new();
3097
3098 for line in content.lines() {
3099 if let Some(rest) = line.strip_prefix("# ") {
3100 if name.is_empty() {
3101 name = rest.trim().to_string();
3103 in_pre = true;
3104 }
3105 continue;
3106 }
3107 if let Some(rest) = line.strip_prefix("## ") {
3108 if let Some((stype, heading)) = current_heading.take() {
3110 sections.push((stype, heading, current_body.trim_end().to_string()));
3111 current_body.clear();
3112 } else if in_pre {
3113 pre_body = current_body.trim_end().to_string();
3115 current_body.clear();
3116 in_pre = false;
3117 }
3118 let heading_text = rest.trim().to_string();
3119 let stype = SectionType::from_str_loose(&heading_text).unwrap_or(SectionType::Other);
3120 current_heading = Some((stype, heading_text));
3121 continue;
3122 }
3123 current_body.push_str(line);
3125 current_body.push('\n');
3126 }
3127
3128 if let Some((stype, heading)) = current_heading {
3130 sections.push((stype, heading, current_body.trim_end().to_string()));
3131 } else {
3132 pre_body = current_body.trim_end().to_string();
3133 }
3134
3135 (name, pre_body, sections)
3136}
3137
3138#[cfg(test)]
3139mod tests {
3140 use super::*;
3141
3142 #[test]
3144 fn normalize_rrf_score_is_bounded_and_monotonic() {
3145 let k = RRF_K;
3146 let max_single = 1.0f32 / (k as f32 + 1.0);
3148 let scores_single = [
3149 max_single * 0.25,
3150 max_single * 0.5,
3151 max_single,
3152 max_single * 1.5,
3153 ];
3154 let normed_single: Vec<f32> = scores_single
3155 .iter()
3156 .map(|&r| normalize_rrf_score(r, 1, k))
3157 .collect();
3158 for &s in &normed_single {
3160 assert!((0.0..=1.0).contains(&s), "score out of range: {s}");
3161 }
3162 assert!(normed_single[0] < normed_single[1]);
3164 assert!(normed_single[1] < normed_single[2]);
3165 assert_eq!(normed_single[3], 1.0);
3167
3168 let max_two = 2.0f32 / (k as f32 + 1.0);
3170 let scores_two = [max_two * 0.25, max_two * 0.75, max_two, max_two * 2.0];
3171 let normed_two: Vec<f32> = scores_two
3172 .iter()
3173 .map(|&r| normalize_rrf_score(r, 2, k))
3174 .collect();
3175 for &s in &normed_two {
3176 assert!((0.0..=1.0).contains(&s), "score out of range: {s}");
3177 }
3178 assert!(normed_two[0] < normed_two[1]);
3179 assert!(normed_two[1] < normed_two[2]);
3180 assert_eq!(normed_two[3], 1.0);
3181
3182 let raw = [0.001f32, 0.005, 0.010, 0.015];
3184 let normed: Vec<f32> = raw.iter().map(|&r| normalize_rrf_score(r, 1, k)).collect();
3185 let raw_order: Vec<usize> = {
3186 let mut idx: Vec<usize> = (0..raw.len()).collect();
3187 idx.sort_by(|&a, &b| raw[b].partial_cmp(&raw[a]).unwrap());
3188 idx
3189 };
3190 let norm_order: Vec<usize> = {
3191 let mut idx: Vec<usize> = (0..normed.len()).collect();
3192 idx.sort_by(|&a, &b| normed[b].partial_cmp(&normed[a]).unwrap());
3193 idx
3194 };
3195 assert_eq!(
3196 raw_order, norm_order,
3197 "normalization must not invert ranking"
3198 );
3199 }
3200
3201 #[test]
3202 fn normalize_rrf_score_zero_source_count_returns_zero() {
3203 assert_eq!(normalize_rrf_score(0.5, 0, RRF_K), 0.0);
3204 }
3205}