Skip to main content

khive_pack_knowledge/knowledge/
mod.rs

1//! Knowledge corpus handlers — atoms, domains, TF-IDF search, fold, index.
2//!
3//! Atoms and domains are stored in dedicated `knowledge_atoms` /
4//! `knowledge_domains` tables (V19 migration) separate from the notes/entities
5//! substrate. This lets the knowledge corpus scale to hundreds of thousands of
6//! items without polluting the general-purpose store.
7//!
8//! Verbs implemented here:
9//! - `upsert_atoms`  — bulk insert/update atoms by slug
10//! - `upsert_domains` — bulk insert/update domains (named atom groups)
11//! - `knowledge.get`    — fetch one atom or domain by ID or slug
12//! - `knowledge.list`   — paginated listing
13//! - `delete_atoms`     — soft-delete by slug
14//! - `stats`            — corpus statistics
15//! - `index`            — backfill embeddings + FTS for atoms
16//! - `fold`             — budget-constrained knapsack selection
17//! - `knowledge.search` — TF-IDF + embedding rerank (default; opt out with rerank=false)
18
19pub(crate) mod matching;
20pub(crate) mod schema;
21pub(crate) mod vamana;
22
23use std::collections::{HashMap, HashSet};
24
25use chrono::Utc;
26use khive_fold::{GreedySelector, Selector, SelectorInput, SelectorWeights};
27use khive_runtime::{KhiveRuntime, NamespaceToken, RuntimeError};
28use khive_score::DeterministicScore;
29use khive_storage::types::{SqlStatement, SqlValue};
30use khive_types::SubstrateKind;
31use serde_json::{json, Value};
32use uuid::Uuid;
33
34use crate::knowledge::schema::{
35    AdjudicateParams, Atom, ChallengeParams, ComposeParams, DeleteAtomsParams, Domain, EditParams,
36    FoldCandidate, FoldParams, GetParams, ImportParams, IndexParams, ListParams, SearchParams,
37    Section, SectionType, SuggestParams, UpsertAtomsParams, UpsertDomainsParams,
38};
39
40// ─── TF-IDF weight defaults ───────────────────────────────────────────────────
41
42const D_W_EXACT_NAME: f32 = 5.0;
43const D_W_NAME: f32 = 3.0;
44const D_W_DESCRIPTION: f32 = 1.5;
45const D_W_TAGS: f32 = 1.25;
46const D_W_CONTENT: f32 = 1.0;
47const D_EXPAND_DISCOUNT: f32 = 0.35;
48const D_COVERAGE_ALPHA: f32 = 0.5;
49const D_W_BIGRAM: f32 = 2.0;
50
51const CANDIDATE_POOL: usize = 2000;
52const MIN_TERM_LEN: usize = 3;
53const EMBED_BATCH: usize = 32;
54const MAX_EMBED_BYTES: usize = 32_768;
55
56static STOP_WORDS: &[&str] = &[
57    "and", "are", "also", "but", "can", "did", "does", "for", "from", "had", "has", "have", "its",
58    "just", "may", "not", "our", "out", "than", "that", "the", "then", "this", "was", "were",
59    "will", "with",
60];
61
62fn is_stop(w: &str) -> bool {
63    STOP_WORDS.contains(&w)
64}
65
66// ─── runtime error helpers ───────────────────────────────────────────────────
67
68fn sql_err(ctx: &str, e: impl std::fmt::Display) -> RuntimeError {
69    RuntimeError::Internal(format!("{ctx}: {e}"))
70}
71
72fn deser<T: serde::de::DeserializeOwned>(params: Value) -> Result<T, RuntimeError> {
73    serde_json::from_value(params)
74        .map_err(|e| RuntimeError::InvalidInput(format!("bad params: {e}")))
75}
76
77// ─── SQL helpers ─────────────────────────────────────────────────────────────
78
79fn now_us() -> i64 {
80    Utc::now().timestamp_micros()
81}
82
83fn new_id() -> String {
84    Uuid::new_v4().to_string()
85}
86
87fn tags_to_json(tags: Option<&Vec<String>>) -> String {
88    match tags {
89        Some(t) => serde_json::to_string(t).unwrap_or_else(|_| "[]".into()),
90        None => "[]".to_string(),
91    }
92}
93
94fn row_str(row: &khive_storage::types::SqlRow, col: &str) -> Option<String> {
95    match row.get(col) {
96        Some(SqlValue::Text(s)) => Some(s.clone()),
97        _ => None,
98    }
99}
100
101fn row_i64(row: &khive_storage::types::SqlRow, col: &str) -> Option<i64> {
102    match row.get(col) {
103        Some(SqlValue::Integer(n)) => Some(*n),
104        _ => None,
105    }
106}
107
108fn row_bool(row: &khive_storage::types::SqlRow, col: &str) -> bool {
109    matches!(row.get(col), Some(SqlValue::Integer(1)))
110}
111
112fn atom_from_row(row: &khive_storage::types::SqlRow) -> Option<Atom> {
113    let id: Uuid = row_str(row, "id")?.parse().ok()?;
114    Some(Atom {
115        id,
116        namespace: row_str(row, "namespace")?,
117        slug: row_str(row, "slug")?,
118        name: row_str(row, "name")?,
119        description: row_str(row, "description"),
120        content: row_str(row, "content").unwrap_or_default(),
121        tags: row_str(row, "tags").unwrap_or_else(|| "[]".into()),
122        properties: row_str(row, "properties"),
123        status: row_str(row, "status"),
124        source_uri: row_str(row, "source_uri"),
125        source_type: row_str(row, "source_type"),
126        finalized: row_bool(row, "finalized"),
127        created_at: row_i64(row, "created_at").unwrap_or(0),
128        updated_at: row_i64(row, "updated_at").unwrap_or(0),
129        deleted_at: row_i64(row, "deleted_at"),
130    })
131}
132
133fn domain_from_row(row: &khive_storage::types::SqlRow) -> Option<Domain> {
134    let id: Uuid = row_str(row, "id")?.parse().ok()?;
135    Some(Domain {
136        id,
137        namespace: row_str(row, "namespace")?,
138        slug: row_str(row, "slug")?,
139        name: row_str(row, "name")?,
140        description: row_str(row, "description"),
141        tags: row_str(row, "tags").unwrap_or_else(|| "[]".into()),
142        members: row_str(row, "members").unwrap_or_else(|| "[]".into()),
143        created_at: row_i64(row, "created_at").unwrap_or(0),
144        updated_at: row_i64(row, "updated_at").unwrap_or(0),
145        deleted_at: row_i64(row, "deleted_at"),
146    })
147}
148
149fn atom_to_json(atom: &Atom) -> Value {
150    json!({
151        "id": atom.id.to_string(),
152        "namespace": atom.namespace,
153        "slug": atom.slug,
154        "name": atom.name,
155        "description": atom.description,
156        "content": atom.content,
157        "tags": serde_json::from_str::<Value>(&atom.tags).unwrap_or(Value::Array(vec![])),
158        "properties": atom.properties.as_deref().and_then(|s| serde_json::from_str::<Value>(s).ok()),
159        "status": atom.status,
160        "source_uri": atom.source_uri,
161        "source_type": atom.source_type,
162        "finalized": atom.finalized,
163        "kind": "atom",
164        "created_at": atom.created_at,
165        "updated_at": atom.updated_at,
166    })
167}
168
169fn domain_to_json(domain: &Domain) -> Value {
170    json!({
171        "id": domain.id.to_string(),
172        "namespace": domain.namespace,
173        "slug": domain.slug,
174        "name": domain.name,
175        "description": domain.description,
176        "tags": serde_json::from_str::<Value>(&domain.tags).unwrap_or(Value::Array(vec![])),
177        "members": serde_json::from_str::<Value>(&domain.members).unwrap_or(Value::Array(vec![])),
178        "kind": "domain",
179        "created_at": domain.created_at,
180        "updated_at": domain.updated_at,
181    })
182}
183
184// ─── public handler entry points ─────────────────────────────────────────────
185
186pub(crate) struct KnowledgeHandlers;
187
188impl KnowledgeHandlers {
189    // ── upsert_atoms ──────────────────────────────────────────────────────────
190
191    pub(crate) async fn upsert_atoms(
192        runtime: &KhiveRuntime,
193        token: &NamespaceToken,
194        params: Value,
195    ) -> Result<Value, RuntimeError> {
196        let p: UpsertAtomsParams = deser(params)?;
197        if p.atoms.is_empty() {
198            return Err(RuntimeError::InvalidInput(
199                "atoms list must not be empty".into(),
200            ));
201        }
202        if p.atoms.len() > 5000 {
203            return Err(RuntimeError::InvalidInput(
204                "max 5000 atoms per request".into(),
205            ));
206        }
207
208        let ns = token.namespace().as_str().to_owned();
209        let sql = runtime.sql();
210        let now = now_us();
211        let mut created = 0usize;
212        let mut updated = 0usize;
213
214        for atom_in in &p.atoms {
215            let slug = atom_in.slug.trim().to_string();
216            if slug.is_empty() {
217                return Err(RuntimeError::InvalidInput(
218                    "atom slug must not be empty".into(),
219                ));
220            }
221
222            let tags_json = tags_to_json(atom_in.tags.as_ref());
223            let content = atom_in.content.as_deref().unwrap_or("").to_string();
224            let props_json = atom_in
225                .properties
226                .as_ref()
227                .map(|v| serde_json::to_string(v).unwrap_or_default());
228            let source_uri = atom_in
229                .source_uri
230                .as_ref()
231                .map(|s| s.trim())
232                .filter(|s| !s.is_empty());
233            let source_type = atom_in
234                .source_type
235                .as_ref()
236                .map(|s| s.trim())
237                .filter(|s| !s.is_empty());
238
239            // Check if slug already exists.
240            let mut reader = sql
241                .reader()
242                .await
243                .map_err(|e| sql_err("upsert_atoms reader", e))?;
244            let existing = reader
245                .query_row(SqlStatement {
246                    sql: "SELECT id FROM knowledge_atoms WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
247                    params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(slug.clone())],
248                    label: None,
249                })
250                .await
251                .map_err(|e| sql_err("upsert_atoms lookup", e))?;
252
253            let mut writer = sql
254                .writer()
255                .await
256                .map_err(|e| sql_err("upsert_atoms writer", e))?;
257            if let Some(row) = existing {
258                let id = row_str(&row, "id").ok_or_else(|| {
259                    RuntimeError::Internal("missing id in existing atom row".into())
260                })?;
261                writer
262                    .execute(SqlStatement {
263                        // Promote draft -> reviewed when this upsert finalizes the atom, mirroring
264                        // the V22 backfill (finalized=1 => reviewed). Never demote an already
265                        // reviewed/verified row, and leave status untouched when not finalizing
266                        // (codex #527 round 2). ?8 (finalized) is reused in the CASE.
267                        sql: "UPDATE knowledge_atoms SET name=?1, description=?2, content=?3, tags=?4, properties=?5, source_uri=?6, source_type=?7, finalized=?8, status = CASE WHEN ?8 = 1 AND status = 'draft' THEN 'reviewed' ELSE status END, updated_at=?9 WHERE id=?10 AND namespace=?11".into(),
268                        params: vec![
269                            SqlValue::Text(atom_in.name.clone()),
270                            atom_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
271                            SqlValue::Text(content.clone()),
272                            SqlValue::Text(tags_json.clone()),
273                            props_json.as_ref().map_or(SqlValue::Null, |p| SqlValue::Text(p.clone())),
274                            source_uri.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
275                            source_type.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
276                            SqlValue::Integer(atom_in.finalized.unwrap_or(false) as i64),
277                            SqlValue::Integer(now),
278                            SqlValue::Text(id),
279                            SqlValue::Text(ns.clone()),
280                        ],
281                        label: None,
282                    })
283                    .await
284                    .map_err(|e| sql_err("upsert_atoms update", e))?;
285                updated += 1;
286            } else {
287                let id = new_id();
288                writer
289                    .execute(SqlStatement {
290                        sql: "INSERT INTO knowledge_atoms (id, namespace, slug, name, description, content, tags, properties, source_uri, source_type, status, finalized, created_at, updated_at) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14)".into(),
291                        params: vec![
292                            SqlValue::Text(id),
293                            SqlValue::Text(ns.clone()),
294                            SqlValue::Text(slug.clone()),
295                            SqlValue::Text(atom_in.name.clone()),
296                            atom_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
297                            SqlValue::Text(content.clone()),
298                            SqlValue::Text(tags_json.clone()),
299                            props_json.as_ref().map_or(SqlValue::Null, |p| SqlValue::Text(p.clone())),
300                            source_uri.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
301                            source_type.map_or(SqlValue::Null, |s| SqlValue::Text(s.to_string())),
302                            // status mirrors the lifecycle backfill (finalized => reviewed) so a
303                            // freshly-finalized atom is never left at the 'draft' default (codex #527).
304                            SqlValue::Text(if atom_in.finalized.unwrap_or(false) { "reviewed" } else { "draft" }.to_string()),
305                            SqlValue::Integer(atom_in.finalized.unwrap_or(false) as i64),
306                            SqlValue::Integer(now),
307                            SqlValue::Integer(now),
308                        ],
309                        label: None,
310                    })
311                    .await
312                    .map_err(|e| sql_err("upsert_atoms insert", e))?;
313                created += 1;
314            }
315        }
316
317        Ok(json!({
318            "created": created,
319            "updated": updated,
320            "total": p.atoms.len(),
321        }))
322    }
323
324    // ── upsert_domains ────────────────────────────────────────────────────────
325
326    pub(crate) async fn upsert_domains(
327        runtime: &KhiveRuntime,
328        token: &NamespaceToken,
329        params: Value,
330    ) -> Result<Value, RuntimeError> {
331        let p: UpsertDomainsParams = deser(params)?;
332        if p.domains.is_empty() {
333            return Err(RuntimeError::InvalidInput(
334                "domains list must not be empty".into(),
335            ));
336        }
337
338        let ns = token.namespace().as_str().to_owned();
339        let sql = runtime.sql();
340        let now = now_us();
341        let mut created = 0usize;
342        let mut updated = 0usize;
343
344        for domain_in in &p.domains {
345            let slug = domain_in.slug.trim().to_string();
346            let name = domain_in.name.trim().to_string();
347            if slug.is_empty() {
348                return Err(RuntimeError::InvalidInput(
349                    "domain slug must not be empty".into(),
350                ));
351            }
352            if name.is_empty() {
353                return Err(RuntimeError::InvalidInput(
354                    "domain name must not be empty".into(),
355                ));
356            }
357
358            let mut tags: Vec<String> = domain_in.tags.clone().unwrap_or_default();
359            if !tags.iter().any(|t| t == "type:domain") {
360                tags.push("type:domain".to_string());
361            }
362            let tags_json = serde_json::to_string(&tags).unwrap_or_else(|_| "[]".into());
363            let members_json = match &domain_in.members {
364                Some(m) => serde_json::to_string(m).unwrap_or_else(|_| "[]".into()),
365                None => "[]".to_string(),
366            };
367            let properties_json = serde_json::to_string(
368                &serde_json::json!({ "members": domain_in.members.as_deref().unwrap_or(&[]) }),
369            )
370            .unwrap_or_else(|_| "{}".into());
371
372            let mut reader = sql
373                .reader()
374                .await
375                .map_err(|e| sql_err("upsert_domains reader", e))?;
376            let existing = reader
377                .query_row(SqlStatement {
378                    sql: "SELECT id FROM knowledge_domains WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
379                    params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(slug.clone())],
380                    label: None,
381                })
382                .await
383                .map_err(|e| sql_err("upsert_domains lookup", e))?;
384
385            let mut writer = sql
386                .writer()
387                .await
388                .map_err(|e| sql_err("upsert_domains writer", e))?;
389            if let Some(row) = existing {
390                let id = row_str(&row, "id").ok_or_else(|| {
391                    RuntimeError::Internal("missing id in existing domain row".into())
392                })?;
393                writer
394                    .execute(SqlStatement {
395                        sql: "UPDATE knowledge_domains SET name=?1, description=?2, tags=?3, members=?4, updated_at=?5 WHERE id=?6 AND namespace=?7".into(),
396                        params: vec![
397                            SqlValue::Text(name.clone()),
398                            domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
399                            SqlValue::Text(tags_json.clone()),
400                            SqlValue::Text(members_json.clone()),
401                            SqlValue::Integer(now),
402                            SqlValue::Text(id.clone()),
403                            SqlValue::Text(ns.clone()),
404                        ],
405                        label: None,
406                    })
407                    .await
408                    .map_err(|e| sql_err("upsert_domains update", e))?;
409                // Dual-write: sync the mirror atom in knowledge_atoms for FTS.
410                writer
411                    .execute(SqlStatement {
412                        sql: "INSERT INTO knowledge_atoms (id, namespace, slug, name, description, content, tags, properties, status, finalized, created_at, updated_at) \
413                              VALUES (?1,?2,?3,?4,?5,?6,?7,?8,'reviewed',1,?9,?10) \
414                              ON CONFLICT(namespace, slug) DO UPDATE SET name=?4, description=?5, content=?6, tags=?7, properties=?8, status='reviewed', updated_at=?10".into(),
415                        params: vec![
416                            SqlValue::Text(id),
417                            SqlValue::Text(ns.clone()),
418                            SqlValue::Text(slug.clone()),
419                            SqlValue::Text(name.clone()),
420                            domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
421                            SqlValue::Text(String::new()),
422                            SqlValue::Text(tags_json.clone()),
423                            SqlValue::Text(properties_json.clone()),
424                            SqlValue::Integer(now),
425                            SqlValue::Integer(now),
426                        ],
427                        label: None,
428                    })
429                    .await
430                    .map_err(|e| sql_err("upsert_domains atom mirror update", e))?;
431                updated += 1;
432            } else {
433                let id = new_id();
434                writer
435                    .execute(SqlStatement {
436                        sql: "INSERT INTO knowledge_domains (id, namespace, slug, name, description, tags, members, created_at, updated_at) VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9)".into(),
437                        params: vec![
438                            SqlValue::Text(id.clone()),
439                            SqlValue::Text(ns.clone()),
440                            SqlValue::Text(slug.clone()),
441                            SqlValue::Text(name.clone()),
442                            domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
443                            SqlValue::Text(tags_json.clone()),
444                            SqlValue::Text(members_json.clone()),
445                            SqlValue::Integer(now),
446                            SqlValue::Integer(now),
447                        ],
448                        label: None,
449                    })
450                    .await
451                    .map_err(|e| sql_err("upsert_domains insert", e))?;
452                // Dual-write: mirror atom in knowledge_atoms for FTS indexing.
453                writer
454                    .execute(SqlStatement {
455                        sql: "INSERT INTO knowledge_atoms (id, namespace, slug, name, description, content, tags, properties, status, finalized, created_at, updated_at) \
456                              VALUES (?1,?2,?3,?4,?5,?6,?7,?8,'reviewed',1,?9,?10)".into(),
457                        params: vec![
458                            SqlValue::Text(id),
459                            SqlValue::Text(ns.clone()),
460                            SqlValue::Text(slug.clone()),
461                            SqlValue::Text(name.clone()),
462                            domain_in.description.as_ref().map_or(SqlValue::Null, |d| SqlValue::Text(d.clone())),
463                            SqlValue::Text(String::new()),
464                            SqlValue::Text(tags_json.clone()),
465                            SqlValue::Text(properties_json.clone()),
466                            SqlValue::Integer(now),
467                            SqlValue::Integer(now),
468                        ],
469                        label: None,
470                    })
471                    .await
472                    .map_err(|e| sql_err("upsert_domains atom mirror insert", e))?;
473                created += 1;
474            }
475        }
476
477        Ok(json!({
478            "created": created,
479            "updated": updated,
480            "total": p.domains.len(),
481        }))
482    }
483
484    // ── get ───────────────────────────────────────────────────────────────────
485
486    pub(crate) async fn get(
487        runtime: &KhiveRuntime,
488        token: &NamespaceToken,
489        params: Value,
490    ) -> Result<Value, RuntimeError> {
491        let p: GetParams = deser(params)?;
492        let ns = token.namespace().as_str().to_owned();
493        let sql = runtime.sql();
494        let id = p.id.trim().to_string();
495
496        // Try as UUID → atoms first, then domains.
497        let is_uuid = id.parse::<Uuid>().is_ok();
498
499        let mut reader = sql.reader().await.map_err(|e| sql_err("get reader", e))?;
500
501        if is_uuid {
502            // Lookup by UUID in atoms.
503            let row = reader
504                .query_row(SqlStatement {
505                    sql: "SELECT * FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
506                    params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
507                    label: None,
508                })
509                .await
510                .map_err(|e| sql_err("get atom by id", e))?;
511            if let Some(r) = row {
512                return atom_from_row(&r)
513                    .map(|a| atom_to_json(&a))
514                    .ok_or_else(|| RuntimeError::Internal("atom row parse failed".into()));
515            }
516            // Try domains.
517            let row = reader
518                .query_row(SqlStatement {
519                    sql: "SELECT * FROM knowledge_domains WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
520                    params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
521                    label: None,
522                })
523                .await
524                .map_err(|e| sql_err("get domain by id", e))?;
525            if let Some(r) = row {
526                return domain_from_row(&r)
527                    .map(|d| domain_to_json(&d))
528                    .ok_or_else(|| RuntimeError::Internal("domain row parse failed".into()));
529            }
530        }
531
532        // Lookup by slug — domains first (authoritative for members),
533        // then atoms.
534        let row = reader
535            .query_row(SqlStatement {
536                sql: "SELECT * FROM knowledge_domains WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
537                params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(id.clone())],
538                label: None,
539            })
540            .await
541            .map_err(|e| sql_err("get domain by slug", e))?;
542        if let Some(r) = row {
543            return domain_from_row(&r)
544                .map(|d| domain_to_json(&d))
545                .ok_or_else(|| RuntimeError::Internal("domain row parse failed".into()));
546        }
547
548        let row = reader
549            .query_row(SqlStatement {
550                sql: "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND slug = ?2 AND deleted_at IS NULL LIMIT 1".into(),
551                params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(id.clone())],
552                label: None,
553            })
554            .await
555            .map_err(|e| sql_err("get atom by slug", e))?;
556        if let Some(r) = row {
557            return atom_from_row(&r)
558                .map(|a| atom_to_json(&a))
559                .ok_or_else(|| RuntimeError::Internal("atom row parse failed".into()));
560        }
561
562        Err(RuntimeError::NotFound(format!(
563            "atom or domain not found: {id:?}"
564        )))
565    }
566
567    // ── list ─────────────────────────────────────────────────────────────────
568
569    pub(crate) async fn list(
570        runtime: &KhiveRuntime,
571        token: &NamespaceToken,
572        params: Value,
573    ) -> Result<Value, RuntimeError> {
574        let p: ListParams = deser(params)?;
575        let ns = token.namespace().as_str().to_owned();
576        let sql = runtime.sql();
577        let limit = p.limit.unwrap_or(20).clamp(1, 500) as i64;
578        let offset = p.offset.unwrap_or(0) as i64;
579
580        let mut reader = sql.reader().await.map_err(|e| sql_err("list reader", e))?;
581
582        match p.kind.as_deref() {
583            Some("domain") => {
584                let rows = reader
585                    .query_all(SqlStatement {
586                        sql: "SELECT * FROM knowledge_domains WHERE namespace = ?1 AND deleted_at IS NULL ORDER BY created_at DESC LIMIT ?2 OFFSET ?3".into(),
587                        params: vec![
588                            SqlValue::Text(ns.clone()),
589                            SqlValue::Integer(limit),
590                            SqlValue::Integer(offset),
591                        ],
592                        label: None,
593                    })
594                    .await
595                    .map_err(|e| sql_err("list domains", e))?;
596
597                let total_row = reader
598                    .query_scalar(SqlStatement {
599                        sql: "SELECT COUNT(*) FROM knowledge_domains WHERE namespace = ?1 AND deleted_at IS NULL".into(),
600                        params: vec![SqlValue::Text(ns)],
601                        label: None,
602                    })
603                    .await
604                    .map_err(|e| sql_err("list domains count", e))?;
605                let total = match total_row {
606                    Some(SqlValue::Integer(n)) => n,
607                    _ => 0,
608                };
609
610                let items: Vec<Value> = rows
611                    .iter()
612                    .filter_map(|r| domain_from_row(r).map(|d| domain_to_json(&d)))
613                    .collect();
614
615                Ok(json!({ "results": items, "total": total, "limit": limit, "offset": offset }))
616            }
617            Some("atom") | None => {
618                let requested_statuses = status_values(p.status.as_ref());
619                let (data_status_clause, data_status_params) =
620                    status_sql_clause(&requested_statuses, p.exclude_status.as_deref(), 4);
621                let (count_status_clause, count_status_params) =
622                    status_sql_clause(&requested_statuses, p.exclude_status.as_deref(), 2);
623
624                let sql_str = format!(
625                    "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'{} ORDER BY created_at DESC LIMIT ?2 OFFSET ?3",
626                    data_status_clause
627                );
628                let count_sql = format!(
629                    "SELECT COUNT(*) FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'{}",
630                    count_status_clause
631                );
632
633                let mut row_params = vec![
634                    SqlValue::Text(ns.clone()),
635                    SqlValue::Integer(limit),
636                    SqlValue::Integer(offset),
637                ];
638                row_params.extend(data_status_params);
639
640                let rows = reader
641                    .query_all(SqlStatement {
642                        sql: sql_str,
643                        params: row_params,
644                        label: None,
645                    })
646                    .await
647                    .map_err(|e| sql_err("list atoms", e))?;
648
649                let mut count_params = vec![SqlValue::Text(ns)];
650                count_params.extend(count_status_params);
651                let total_row = reader
652                    .query_scalar(SqlStatement {
653                        sql: count_sql,
654                        params: count_params,
655                        label: None,
656                    })
657                    .await
658                    .map_err(|e| sql_err("list atoms count", e))?;
659                let total = match total_row {
660                    Some(SqlValue::Integer(n)) => n,
661                    _ => 0,
662                };
663
664                let items: Vec<Value> = rows
665                    .iter()
666                    .filter_map(|r| atom_from_row(r).map(|a| atom_to_json(&a)))
667                    .collect();
668
669                Ok(json!({ "results": items, "total": total, "limit": limit, "offset": offset }))
670            }
671            Some(other) => Err(RuntimeError::InvalidInput(format!(
672                "unknown type {other:?}; valid: atom | domain"
673            ))),
674        }
675    }
676
677    // ── delete_atoms ──────────────────────────────────────────────────────────
678
679    pub(crate) async fn delete_atoms(
680        runtime: &KhiveRuntime,
681        token: &NamespaceToken,
682        params: Value,
683    ) -> Result<Value, RuntimeError> {
684        let p: DeleteAtomsParams = deser(params)?;
685        if p.ids.is_empty() {
686            return Err(RuntimeError::InvalidInput("ids must not be empty".into()));
687        }
688
689        let ns = token.namespace().as_str().to_owned();
690        let sql = runtime.sql();
691        let now = now_us();
692        let mut deleted = 0usize;
693
694        let mut writer = sql
695            .writer()
696            .await
697            .map_err(|e| sql_err("delete_atoms writer", e))?;
698        for id_or_slug in &p.ids {
699            let id_or_slug = id_or_slug.trim().to_string();
700            // Soft-delete by id or slug.
701            let affected = writer
702                .execute(SqlStatement {
703                    sql: "UPDATE knowledge_atoms SET deleted_at = ?1 WHERE namespace = ?2 AND (id = ?3 OR slug = ?3) AND deleted_at IS NULL".into(),
704                    params: vec![
705                        SqlValue::Integer(now),
706                        SqlValue::Text(ns.clone()),
707                        SqlValue::Text(id_or_slug),
708                    ],
709                    label: None,
710                })
711                .await
712                .map_err(|e| sql_err("delete_atoms update", e))?;
713            deleted += affected as usize;
714        }
715
716        Ok(json!({
717            "deleted": deleted,
718            "requested": p.ids.len(),
719        }))
720    }
721
722    // ── stats ─────────────────────────────────────────────────────────────────
723
724    pub(crate) async fn stats(
725        runtime: &KhiveRuntime,
726        token: &NamespaceToken,
727        _params: Value,
728    ) -> Result<Value, RuntimeError> {
729        let ns = token.namespace().as_str().to_owned();
730        let sql = runtime.sql();
731        let mut reader = sql.reader().await.map_err(|e| sql_err("stats reader", e))?;
732
733        let atom_count = reader
734            .query_scalar(SqlStatement {
735                sql: "SELECT COUNT(*) FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'".into(),
736                params: vec![SqlValue::Text(ns.clone())],
737                label: None,
738            })
739            .await
740            .map_err(|e| sql_err("stats atoms", e))?;
741
742        let domain_count = reader
743            .query_scalar(SqlStatement {
744                sql: "SELECT COUNT(*) FROM knowledge_domains WHERE namespace = ?1 AND deleted_at IS NULL".into(),
745                params: vec![SqlValue::Text(ns.clone())],
746                label: None,
747            })
748            .await
749            .map_err(|e| sql_err("stats domains", e))?;
750
751        let finalized_count = reader
752            .query_scalar(SqlStatement {
753                sql: "SELECT COUNT(*) FROM knowledge_atoms WHERE namespace = ?1 AND finalized = 1 AND deleted_at IS NULL AND tags NOT LIKE '%type:domain%'".into(),
754                params: vec![SqlValue::Text(ns.clone())],
755                label: None,
756            })
757            .await
758            .map_err(|e| sql_err("stats finalized", e))?;
759
760        let total_atoms = match atom_count {
761            Some(SqlValue::Integer(n)) => n,
762            _ => 0,
763        };
764        let total_domains = match domain_count {
765            Some(SqlValue::Integer(n)) => n,
766            _ => 0,
767        };
768        let finalized = match finalized_count {
769            Some(SqlValue::Integer(n)) => n,
770            _ => 0,
771        };
772
773        let eval_coverage = if total_atoms > 0 {
774            finalized as f64 / total_atoms as f64
775        } else {
776            0.0
777        };
778
779        let embedding_coverage =
780            compute_embedding_coverage(runtime, token, &ns, total_atoms).await?;
781
782        Ok(json!({
783            "total_atoms": total_atoms,
784            "total_domains": total_domains,
785            "total_events": 0,
786            "eval_coverage": eval_coverage,
787            "embedding_coverage": embedding_coverage,
788            "namespace": ns,
789        }))
790    }
791
792    // ── index ─────────────────────────────────────────────────────────────────
793
794    pub(crate) async fn index(
795        runtime: &KhiveRuntime,
796        token: &NamespaceToken,
797        params: Value,
798        ann: &vamana::SharedAnn,
799    ) -> Result<Value, RuntimeError> {
800        let p: IndexParams = deser(params)?;
801        let rebuild_ann = p.rebuild_ann.unwrap_or(false);
802        let ns = token.namespace().as_str().to_owned();
803
804        // If no embedder is configured, return immediately — nothing to index.
805        if runtime.default_embedder_name().is_empty() {
806            return Ok(
807                json!({ "indexed": 0, "skipped": 0, "total": 0, "reason": "no embedding model configured" }),
808            );
809        }
810
811        let sql = runtime.sql();
812        let batch_size = p.batch_size.unwrap_or(500).clamp(1, 1000);
813        let insert_only = p.insert_only.unwrap_or(false);
814
815        // Resolve which atoms to index.
816        let atoms: Vec<Atom> = if let Some(ref ids) = p.ids {
817            let mut out = Vec::with_capacity(ids.len());
818            let mut reader = sql.reader().await.map_err(|e| sql_err("index reader", e))?;
819            for id_or_slug in ids {
820                let row = reader
821                    .query_row(SqlStatement {
822                        sql: "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND (id = ?2 OR slug = ?2) AND deleted_at IS NULL LIMIT 1".into(),
823                        params: vec![SqlValue::Text(ns.clone()), SqlValue::Text(id_or_slug.clone())],
824                        label: None,
825                    })
826                    .await
827                    .map_err(|e| sql_err("index atom lookup", e))?;
828                if let Some(r) = row {
829                    if let Some(a) = atom_from_row(&r) {
830                        out.push(a);
831                    }
832                }
833            }
834            out
835        } else {
836            let mut out = Vec::new();
837            let mut offset = 0i64;
838            loop {
839                let mut reader = sql
840                    .reader()
841                    .await
842                    .map_err(|e| sql_err("index page reader", e))?;
843                let rows = reader
844                    .query_all(SqlStatement {
845                        sql: "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL ORDER BY created_at LIMIT ?2 OFFSET ?3".into(),
846                        params: vec![
847                            SqlValue::Text(ns.clone()),
848                            SqlValue::Integer(batch_size as i64),
849                            SqlValue::Integer(offset),
850                        ],
851                        label: None,
852                    })
853                    .await
854                    .map_err(|e| sql_err("index page", e))?;
855                let n = rows.len();
856                out.extend(rows.iter().filter_map(atom_from_row));
857                if n < batch_size {
858                    break;
859                }
860                offset += n as i64;
861            }
862            out
863        };
864
865        let total = atoms.len();
866        let mut indexed = 0usize;
867        let mut skipped = 0usize;
868
869        let mut ann_vectors: Vec<f32> = Vec::new();
870        let mut ann_ids: Vec<Uuid> = Vec::new();
871        let mut ann_dim: usize = 0;
872
873        for chunk in atoms.chunks(EMBED_BATCH) {
874            let mut staged: Vec<(Uuid, String)> = Vec::with_capacity(chunk.len());
875            for atom in chunk {
876                let text = atom_embed_text(atom);
877                if text.trim().is_empty() {
878                    skipped += 1;
879                    continue;
880                }
881                staged.push((atom.id, text));
882            }
883            if staged.is_empty() {
884                continue;
885            }
886
887            let texts: Vec<String> = staged
888                .iter()
889                .map(|(_, t)| {
890                    if t.len() <= MAX_EMBED_BYTES {
891                        t.clone()
892                    } else {
893                        let mut end = MAX_EMBED_BYTES;
894                        while !t.is_char_boundary(end) {
895                            end -= 1;
896                        }
897                        t[..end].to_string()
898                    }
899                })
900                .collect();
901
902            let embeddings = match runtime.embed_batch(&texts).await {
903                Ok(e) => e,
904                Err(_) => {
905                    skipped += staged.len();
906                    continue;
907                }
908            };
909            if embeddings.len() != staged.len() {
910                skipped += staged.len();
911                continue;
912            }
913
914            if let Ok(vectors) = runtime.vectors(token) {
915                let ns_str = token.namespace().as_str();
916                if !insert_only {
917                    for (id, _) in &staged {
918                        let _ = vectors.delete(*id).await;
919                    }
920                }
921                for ((id, _), emb) in staged.iter().zip(embeddings.iter()) {
922                    let _ = vectors
923                        .insert(
924                            *id,
925                            SubstrateKind::Entity,
926                            ns_str,
927                            "knowledge.atom",
928                            vec![emb.clone()],
929                        )
930                        .await;
931                }
932            }
933
934            if rebuild_ann {
935                for ((id, _), emb) in staged.iter().zip(embeddings.iter()) {
936                    if ann_dim == 0 {
937                        ann_dim = emb.len();
938                    }
939                    if emb.len() == ann_dim {
940                        ann_ids.push(*id);
941                        ann_vectors.extend_from_slice(emb);
942                    }
943                }
944            }
945
946            indexed += staged.len();
947        }
948
949        // Any vector write invalidates the existing snapshot — the corpus has changed.
950        // If rebuild_ann=true a fresh snapshot is persisted below; if false, ensure_ann
951        // rebuilds lazily on the next search call.
952        if indexed > 0 {
953            vamana::invalidate_snapshot(runtime, &ns).await;
954            *ann.index.write().await = None;
955        }
956
957        let mut ann_count: Option<usize> = None;
958        let is_full_corpus = p.ids.is_none();
959        if rebuild_ann && is_full_corpus && !ann_vectors.is_empty() && ann_dim > 0 {
960            match vamana::AnnBridge::build(ann_vectors, ann_dim, ann_ids) {
961                Ok(bridge) => {
962                    ann_count = Some(bridge.num_vectors());
963                    let model_name = runtime.default_embedder_name();
964                    if let Some(fp) = vamana::compute_fingerprint(runtime, token, model_name).await
965                    {
966                        if let Err(e) =
967                            vamana::persist_snapshot(runtime, &ns, model_name, &bridge, fp).await
968                        {
969                            tracing::error!(error = %e, "failed to persist Vamana snapshot");
970                        }
971                    }
972                    let mut guard = ann.index.write().await;
973                    *guard = Some(bridge);
974                }
975                Err(e) => {
976                    tracing::warn!(error = %e, "failed to build Vamana ANN index");
977                }
978            }
979        }
980
981        Ok(json!({
982            "indexed": indexed,
983            "skipped": skipped,
984            "total": total,
985            "ann_vectors": ann_count,
986        }))
987    }
988
989    // ── fold ─────────────────────────────────────────────────────────────────
990
991    pub(crate) async fn fold(
992        _runtime: &KhiveRuntime,
993        _token: &NamespaceToken,
994        params: Value,
995    ) -> Result<Value, RuntimeError> {
996        let p: FoldParams = deser(params)?;
997
998        if p.candidates.is_empty() {
999            return Ok(json!({
1000                "selected": [],
1001                "total_size": 0,
1002                "budget": p.budget,
1003                "selected_count": 0,
1004            }));
1005        }
1006
1007        let inputs: Vec<SelectorInput<FoldCandidate>> = p
1008            .candidates
1009            .iter()
1010            .cloned()
1011            .map(|c| SelectorInput {
1012                id: c.id.clone(),
1013                score: c.score,
1014                size: c.size,
1015                category: c.category.clone(),
1016                information_gain: c.information_gain,
1017                content: c,
1018            })
1019            .collect();
1020
1021        let weights = SelectorWeights {
1022            min_score: p.min_score.unwrap_or(0.0),
1023            category_weights: p.category_weights.unwrap_or_default().into_iter().collect(),
1024            diversity_bias: p.diversity_bias.unwrap_or(0.0),
1025            epistemic_weight: p.epistemic_weight.unwrap_or(0.0),
1026        };
1027
1028        let output = GreedySelector
1029            .select(inputs, p.budget, &weights)
1030            .map_err(|e| RuntimeError::Internal(format!("fold selector: {e}")))?;
1031
1032        let selected: Vec<Value> = output
1033            .selected
1034            .iter()
1035            .map(|item| {
1036                json!({
1037                    "id": item.id,
1038                    "score": item.score,
1039                    "size": item.size,
1040                    "content": item.content.content,
1041                    "category": item.content.category,
1042                })
1043            })
1044            .collect();
1045
1046        Ok(json!({
1047            "selected": selected,
1048            "total_size": output.total_size,
1049            "budget": p.budget,
1050            "selected_count": output.selected.len(),
1051        }))
1052    }
1053
1054    // ── search ────────────────────────────────────────────────────────────────
1055
1056    pub(crate) async fn search(
1057        runtime: &KhiveRuntime,
1058        token: &NamespaceToken,
1059        params: Value,
1060        ann: &vamana::SharedAnn,
1061    ) -> Result<Value, RuntimeError> {
1062        let p: SearchParams = deser(params)?;
1063        let raw_query = p.query.trim().to_string();
1064        if raw_query.is_empty() {
1065            return Err(RuntimeError::InvalidInput("query must not be empty".into()));
1066        }
1067
1068        let limit = p.limit.unwrap_or(10).clamp(1, 100);
1069        let min_score = p.min_score.unwrap_or(0.0) as f32;
1070        let w = Weights::from_opts(&p);
1071        let type_filter = p.kind.as_deref();
1072        let do_decompose = p.decompose.unwrap_or(false);
1073        let decompose_threshold = p.decompose_threshold.unwrap_or(4);
1074        let intersection_bonus = p.intersection_bonus.unwrap_or(0.25) as f32;
1075        let requested_rerank = p.rerank.unwrap_or(true);
1076        let do_rerank = requested_rerank && !runtime.default_embedder_name().is_empty();
1077        let rerank_alpha = p.rerank_alpha.unwrap_or(0.7) as f32;
1078        let fetch_limit = if do_rerank { limit * 3 } else { limit }.min(100);
1079
1080        let non_stop_count = raw_query
1081            .split_whitespace()
1082            .filter(|w| w.len() >= MIN_TERM_LEN && !is_stop(&w.to_lowercase()))
1083            .count();
1084
1085        let ns = token.namespace().as_str().to_owned();
1086        let requested_statuses = status_values(p.status.as_ref());
1087        let include_deprecated = explicitly_requested_status(&requested_statuses, "deprecated");
1088
1089        let ctx = SearchCtx {
1090            runtime,
1091            ns: &ns,
1092            role: p.role.as_deref(),
1093            type_filter,
1094            min_score,
1095            w: &w,
1096            fetch_limit,
1097            statuses: &requested_statuses,
1098            exclude_status: p.exclude_status.as_deref(),
1099        };
1100
1101        let mut hits = if do_decompose && non_stop_count >= decompose_threshold {
1102            search_decomposed(&ctx, &raw_query, intersection_bonus).await?
1103        } else {
1104            search_core(&ctx, &raw_query).await?
1105        };
1106
1107        // Trigger a fire-once background warm (ADR-049): never block search on the
1108        // ANN rebuild. The fusion below only runs if the index is already populated.
1109        vamana::ensure_ann_background(runtime, token, ann);
1110
1111        // ANN parallel signal: embed query, search Vamana, fuse via RRF
1112        let ann_guard = ann.index.read().await;
1113        if let Some(ref bridge) = *ann_guard {
1114            if let Ok(query_emb) = runtime.embed(&raw_query).await {
1115                let ann_k = fetch_limit.max(20);
1116                let ann_hits = bridge.search(&query_emb, ann_k);
1117                if !ann_hits.is_empty() {
1118                    fuse_ann_hits(&mut hits, &ann_hits, min_score);
1119                    hydrate_empty_hits(runtime, &ns, &mut hits).await;
1120                }
1121            }
1122        }
1123        drop(ann_guard);
1124
1125        if do_rerank && !hits.is_empty() {
1126            rerank_with_embeddings(runtime, &raw_query, &mut hits, rerank_alpha).await?;
1127        }
1128
1129        apply_status_multipliers(&mut hits, include_deprecated);
1130        hits.truncate(limit);
1131
1132        let results: Vec<Value> = hits
1133            .iter()
1134            .map(|h| {
1135                json!({
1136                    "id": h.id,
1137                    "slug": h.slug,
1138                    "name": h.name,
1139                    "description": h.description,
1140                    "tags": h.tags,
1141                    "status": h.status,
1142                    "finalized": h.finalized,
1143                    "kind": if h.is_domain { "domain" } else { "atom" },
1144                    "score": h.score,
1145                })
1146            })
1147            .collect();
1148        let count = results.len();
1149
1150        Ok(json!({
1151            "status": "ok",
1152            "data": { "results": results, "count": count },
1153        }))
1154    }
1155
1156    // ── suggest ───────────────────────────────────────────────────────────────
1157
1158    pub(crate) async fn suggest(
1159        runtime: &KhiveRuntime,
1160        token: &NamespaceToken,
1161        params: Value,
1162        ann: &vamana::SharedAnn,
1163    ) -> Result<Value, RuntimeError> {
1164        let p: SuggestParams = deser(params)?;
1165        let raw_query = p.query.trim().to_string();
1166        if raw_query.is_empty() {
1167            return Err(RuntimeError::InvalidInput("query must not be empty".into()));
1168        }
1169        let limit = p.limit.unwrap_or(8).clamp(1, 100);
1170        let ns = token.namespace().as_str().to_owned();
1171
1172        let ctx = SearchCtx {
1173            runtime,
1174            ns: &ns,
1175            role: p.role.as_deref(),
1176            type_filter: Some("domain"),
1177            min_score: 0.0,
1178            w: &Weights::default(),
1179            fetch_limit: limit * 3,
1180            statuses: &[],
1181            exclude_status: None,
1182        };
1183
1184        let mut hits = search_core(&ctx, &raw_query).await?;
1185
1186        vamana::ensure_ann_background(runtime, token, ann);
1187        let ann_guard = ann.index.read().await;
1188        if let Some(ref bridge) = *ann_guard {
1189            if let Ok(query_emb) = runtime.embed(&raw_query).await {
1190                let ann_k = (limit * 3).max(20);
1191                let ann_hits = bridge.search(&query_emb, ann_k);
1192                if !ann_hits.is_empty() {
1193                    fuse_ann_hits(&mut hits, &ann_hits, 0.0);
1194                    hydrate_empty_hits(runtime, &ns, &mut hits).await;
1195                }
1196            }
1197        }
1198        drop(ann_guard);
1199
1200        rerank_with_embeddings(runtime, &raw_query, &mut hits, 0.7).await?;
1201
1202        hits.retain(|h| h.is_domain);
1203        hits.truncate(limit);
1204
1205        let results: Vec<Value> = hits
1206            .iter()
1207            .map(|h| json!({ "id": h.id, "name": h.name, "score": h.score }))
1208            .collect();
1209        let count = results.len();
1210
1211        Ok(json!({
1212            "status": "ok",
1213            "data": { "results": results, "count": count },
1214        }))
1215    }
1216
1217    // ── compose ───────────────────────────────────────────────────────────────
1218
1219    pub(crate) async fn compose(
1220        runtime: &KhiveRuntime,
1221        token: &NamespaceToken,
1222        params: Value,
1223    ) -> Result<Value, RuntimeError> {
1224        let p: ComposeParams = deser(params)?;
1225        let raw_query = p.query.trim().to_string();
1226        if raw_query.is_empty() {
1227            return Err(RuntimeError::InvalidInput("query must not be empty".into()));
1228        }
1229
1230        let domain_ids: Vec<String> = p
1231            .domain_ids
1232            .unwrap_or_default()
1233            .into_iter()
1234            .filter(|s| !s.trim().is_empty())
1235            .collect();
1236        let atom_ids: Vec<String> = p
1237            .atom_ids
1238            .unwrap_or_default()
1239            .into_iter()
1240            .filter(|s| !s.trim().is_empty())
1241            .collect();
1242
1243        if domain_ids.is_empty() && atom_ids.is_empty() {
1244            return Err(RuntimeError::InvalidInput(
1245                "domain_ids or atom_ids must be provided".into(),
1246            ));
1247        }
1248
1249        let ns = token.namespace().as_str().to_owned();
1250
1251        let mut resolved_domains: Vec<Domain> = Vec::new();
1252        let mut member_slugs: Vec<String> = Vec::new();
1253
1254        for id in &domain_ids {
1255            let domain = load_domain_by_id_or_slug(runtime, &ns, id).await?;
1256            let members = parse_domain_members(&domain)?;
1257            member_slugs.extend(members);
1258            resolved_domains.push(domain);
1259        }
1260
1261        let mut seen_ids: HashSet<String> = HashSet::new();
1262        let mut ordered_atoms: Vec<Atom> = Vec::new();
1263
1264        for slug in &member_slugs {
1265            let atom = load_atom_by_id_or_slug(runtime, &ns, slug).await?;
1266            if seen_ids.insert(atom.id.to_string()) {
1267                ordered_atoms.push(atom);
1268            }
1269        }
1270        for id in &atom_ids {
1271            let atom = load_atom_by_id_or_slug(runtime, &ns, id).await?;
1272            if seen_ids.insert(atom.id.to_string()) {
1273                ordered_atoms.push(atom);
1274            }
1275        }
1276
1277        if ordered_atoms.is_empty() {
1278            return Ok(json!({
1279                "status": "ok",
1280                "data": {
1281                    "query": raw_query,
1282                    "markdown": "# Knowledge Briefing\n\nNo atoms found.",
1283                    "domains": [],
1284                    "atoms": [],
1285                    "count": 0,
1286                },
1287            }));
1288        }
1289
1290        let mut items: Vec<ScoredTextItem> = ordered_atoms
1291            .iter()
1292            .map(|a| ScoredTextItem {
1293                id: a.id.to_string(),
1294                slug: a.slug.clone(),
1295                name: a.name.clone(),
1296                text: atom_embed_text(a),
1297                score: 1.0,
1298            })
1299            .collect();
1300
1301        rerank_text_items(runtime, &raw_query, &mut items).await?;
1302
1303        let sorted_atoms: Vec<(&Atom, f32)> = items
1304            .iter()
1305            .filter_map(|item| {
1306                ordered_atoms
1307                    .iter()
1308                    .find(|a| a.id.to_string() == item.id)
1309                    .map(|a| (a, item.score))
1310            })
1311            .collect();
1312
1313        let markdown = format_compose_markdown(&raw_query, &resolved_domains, &sorted_atoms);
1314
1315        let atom_json: Vec<Value> = items
1316            .iter()
1317            .map(|item| {
1318                json!({
1319                    "id": item.id,
1320                    "slug": item.slug,
1321                    "name": item.name,
1322                    "score": item.score,
1323                })
1324            })
1325            .collect();
1326
1327        let domain_json: Vec<Value> = resolved_domains
1328            .iter()
1329            .map(|d| json!({ "id": d.id.to_string(), "slug": d.slug, "name": d.name }))
1330            .collect();
1331
1332        let count = atom_json.len();
1333
1334        Ok(json!({
1335            "status": "ok",
1336            "data": {
1337                "query": raw_query,
1338                "markdown": markdown,
1339                "domains": domain_json,
1340                "atoms": atom_json,
1341                "count": count,
1342            },
1343        }))
1344    }
1345}
1346
1347// ─── TF-IDF weight container ─────────────────────────────────────────────────
1348
1349struct Weights {
1350    w_exact_name: f32,
1351    w_name: f32,
1352    w_description: f32,
1353    w_tags: f32,
1354    w_content: f32,
1355    expand_discount: f32,
1356    coverage_alpha: f32,
1357    w_bigram: f32,
1358}
1359
1360impl Default for Weights {
1361    fn default() -> Self {
1362        Self {
1363            w_exact_name: D_W_EXACT_NAME,
1364            w_name: D_W_NAME,
1365            w_description: D_W_DESCRIPTION,
1366            w_tags: D_W_TAGS,
1367            w_content: D_W_CONTENT,
1368            expand_discount: D_EXPAND_DISCOUNT,
1369            coverage_alpha: D_COVERAGE_ALPHA,
1370            w_bigram: D_W_BIGRAM,
1371        }
1372    }
1373}
1374
1375impl Weights {
1376    fn from_opts(opts: &SearchParams) -> Self {
1377        let w = opts.weights.as_ref();
1378        Self {
1379            w_exact_name: w
1380                .and_then(|w| w.w_exact_name)
1381                .map_or(D_W_EXACT_NAME, |v| v as f32),
1382            w_name: w.and_then(|w| w.w_name).map_or(D_W_NAME, |v| v as f32),
1383            w_description: w
1384                .and_then(|w| w.w_description)
1385                .map_or(D_W_DESCRIPTION, |v| v as f32),
1386            w_tags: w.and_then(|w| w.w_tags).map_or(D_W_TAGS, |v| v as f32),
1387            w_content: w
1388                .and_then(|w| w.w_content)
1389                .map_or(D_W_CONTENT, |v| v as f32),
1390            expand_discount: w
1391                .and_then(|w| w.expand_discount)
1392                .map_or(D_EXPAND_DISCOUNT, |v| v as f32),
1393            coverage_alpha: w
1394                .and_then(|w| w.coverage_alpha)
1395                .map_or(D_COVERAGE_ALPHA, |v| v as f32),
1396            w_bigram: w.and_then(|w| w.w_bigram).map_or(D_W_BIGRAM, |v| v as f32),
1397        }
1398    }
1399}
1400
1401// ─── scored hit (internal) ────────────────────────────────────────────────────
1402
1403struct ScoredHit {
1404    id: String,
1405    slug: String,
1406    name: String,
1407    description: Option<String>,
1408    tags: Option<String>,
1409    finalized: bool,
1410    is_domain: bool,
1411    status: Option<String>,
1412    score: f32,
1413}
1414
1415// ─── ANN fusion (symmetric RRF) ─────────────────────────────────────────────
1416
1417const RRF_K: usize = 60;
1418
1419fn normalize_rrf_score(raw: f32, source_count: usize, k: usize) -> f32 {
1420    if source_count == 0 {
1421        return 0.0;
1422    }
1423    let theoretical_max = source_count as f32 / (k as f32 + 1.0);
1424    (raw / theoretical_max).clamp(0.0, 1.0)
1425}
1426
1427fn fuse_ann_hits(fts_hits: &mut Vec<ScoredHit>, ann_hits: &[(Uuid, f32)], min_score: f32) {
1428    let drained: Vec<ScoredHit> = std::mem::take(fts_hits);
1429
1430    let fts_source: Vec<(String, DeterministicScore)> = drained
1431        .iter()
1432        .map(|hit| (hit.id.clone(), DeterministicScore::from_f32(hit.score)))
1433        .collect();
1434    let mut by_id: HashMap<String, ScoredHit> = drained
1435        .into_iter()
1436        .map(|hit| (hit.id.clone(), hit))
1437        .collect();
1438    let ann_source: Vec<(String, DeterministicScore)> = ann_hits
1439        .iter()
1440        .map(|(uuid, score)| (uuid.to_string(), DeterministicScore::from_f32(*score)))
1441        .collect();
1442
1443    let source_count = usize::from(!fts_source.is_empty()) + usize::from(!ann_source.is_empty());
1444    let fused = khive_fusion::reciprocal_rank_fusion(vec![fts_source, ann_source], RRF_K);
1445
1446    for (id, fused_score) in fused {
1447        let raw_score = fused_score.to_f64() as f32;
1448        let score = normalize_rrf_score(raw_score, source_count, RRF_K);
1449        if score < min_score {
1450            continue;
1451        }
1452
1453        if let Some(mut hit) = by_id.remove(&id) {
1454            hit.score = score;
1455            fts_hits.push(hit);
1456        } else {
1457            fts_hits.push(ScoredHit {
1458                id,
1459                slug: String::new(),
1460                name: String::new(),
1461                description: None,
1462                tags: None,
1463                finalized: false,
1464                is_domain: false,
1465                status: None,
1466                score,
1467            });
1468        }
1469    }
1470}
1471
1472async fn hydrate_empty_hits(runtime: &KhiveRuntime, ns: &str, hits: &mut Vec<ScoredHit>) {
1473    let ids: Vec<String> = hits
1474        .iter()
1475        .filter(|hit| hit.slug.is_empty())
1476        .map(|hit| hit.id.clone())
1477        .collect();
1478    if ids.is_empty() {
1479        return;
1480    }
1481
1482    let sql = runtime.sql();
1483    let mut reader = match sql.reader().await {
1484        Ok(r) => r,
1485        Err(_) => return,
1486    };
1487
1488    let placeholders = ids
1489        .iter()
1490        .enumerate()
1491        .map(|(i, _)| format!("?{}", i + 2))
1492        .collect::<Vec<_>>()
1493        .join(",");
1494    let mut params = vec![SqlValue::Text(ns.to_owned())];
1495    params.extend(ids.iter().cloned().map(SqlValue::Text));
1496
1497    let atom_rows = reader
1498        .query_all(SqlStatement {
1499            sql: format!(
1500                "SELECT id, slug, name, description, tags, finalized, status FROM knowledge_atoms WHERE namespace = ?1 AND id IN ({placeholders}) AND deleted_at IS NULL"
1501            ),
1502            params,
1503            label: None,
1504        })
1505        .await
1506        .unwrap_or_default();
1507
1508    let mut atom_rows_by_id: HashMap<String, khive_storage::types::SqlRow> = HashMap::new();
1509    for row in atom_rows {
1510        if let Some(id) = row_str(&row, "id") {
1511            atom_rows_by_id.insert(id, row);
1512        }
1513    }
1514
1515    for hit in hits.iter_mut().filter(|hit| hit.slug.is_empty()) {
1516        if let Some(row) = atom_rows_by_id.get(&hit.id) {
1517            hit.slug = row_str(row, "slug").unwrap_or_default();
1518            hit.name = row_str(row, "name").unwrap_or_default();
1519            hit.description = row_str(row, "description");
1520            hit.tags = row_str(row, "tags");
1521            hit.finalized = row_bool(row, "finalized");
1522            hit.status = row_str(row, "status");
1523            let tags_arr: Vec<String> = hit
1524                .tags
1525                .as_deref()
1526                .and_then(|tags| serde_json::from_str(tags).ok())
1527                .unwrap_or_default();
1528            hit.is_domain = tags_arr.iter().any(|t| t == "type:domain");
1529        }
1530    }
1531
1532    let missing_ids: Vec<String> = hits
1533        .iter()
1534        .filter(|hit| hit.slug.is_empty())
1535        .map(|hit| hit.id.clone())
1536        .collect();
1537    if missing_ids.is_empty() {
1538        return;
1539    }
1540
1541    let placeholders = missing_ids
1542        .iter()
1543        .enumerate()
1544        .map(|(i, _)| format!("?{}", i + 2))
1545        .collect::<Vec<_>>()
1546        .join(",");
1547    let mut params = vec![SqlValue::Text(ns.to_owned())];
1548    params.extend(missing_ids.iter().cloned().map(SqlValue::Text));
1549
1550    let domain_rows = reader
1551        .query_all(SqlStatement {
1552            sql: format!(
1553                "SELECT id, slug, name, description, tags FROM knowledge_domains WHERE namespace = ?1 AND id IN ({placeholders}) AND deleted_at IS NULL"
1554            ),
1555            params,
1556            label: None,
1557        })
1558        .await
1559        .unwrap_or_default();
1560
1561    let mut domain_rows_by_id: HashMap<String, khive_storage::types::SqlRow> = HashMap::new();
1562    for row in domain_rows {
1563        if let Some(id) = row_str(&row, "id") {
1564            domain_rows_by_id.insert(id, row);
1565        }
1566    }
1567
1568    for hit in hits.iter_mut().filter(|hit| hit.slug.is_empty()) {
1569        if let Some(row) = domain_rows_by_id.get(&hit.id) {
1570            hit.slug = row_str(row, "slug").unwrap_or_default();
1571            hit.name = row_str(row, "name").unwrap_or_default();
1572            hit.description = row_str(row, "description");
1573            hit.tags = row_str(row, "tags");
1574            hit.finalized = false;
1575            hit.is_domain = true;
1576        }
1577    }
1578
1579    hits.retain(|hit| !hit.slug.is_empty());
1580}
1581
1582// ─── status helpers (W5) ─────────────────────────────────────────────────────
1583
1584fn status_values(value: Option<&Value>) -> Vec<String> {
1585    match value {
1586        Some(Value::String(s)) => {
1587            let s = s.trim();
1588            if s.is_empty() {
1589                Vec::new()
1590            } else {
1591                vec![s.to_string()]
1592            }
1593        }
1594        Some(Value::Array(items)) => items
1595            .iter()
1596            .filter_map(Value::as_str)
1597            .map(str::trim)
1598            .filter(|s| !s.is_empty())
1599            .map(str::to_string)
1600            .collect(),
1601        _ => Vec::new(),
1602    }
1603}
1604
1605fn status_sql_clause(
1606    statuses: &[String],
1607    exclude_status: Option<&str>,
1608    first_param: usize,
1609) -> (String, Vec<SqlValue>) {
1610    if !statuses.is_empty() {
1611        let placeholders = statuses
1612            .iter()
1613            .enumerate()
1614            .map(|(i, _)| format!("?{}", first_param + i))
1615            .collect::<Vec<_>>()
1616            .join(",");
1617        let clause = if statuses.len() == 1 {
1618            format!(" AND status = ?{first_param}")
1619        } else {
1620            format!(" AND status IN ({placeholders})")
1621        };
1622        let params = statuses.iter().cloned().map(SqlValue::Text).collect();
1623        return (clause, params);
1624    }
1625
1626    if let Some(status) = exclude_status.map(str::trim).filter(|s| !s.is_empty()) {
1627        return (
1628            format!(" AND (status IS NULL OR status != ?{first_param})"),
1629            vec![SqlValue::Text(status.to_string())],
1630        );
1631    }
1632
1633    (
1634        " AND (status IS NULL OR status != 'deprecated')".to_string(),
1635        Vec::new(),
1636    )
1637}
1638
1639fn explicitly_requested_status(statuses: &[String], status: &str) -> bool {
1640    statuses.iter().any(|s| s == status)
1641}
1642
1643fn status_multiplier(status: Option<&str>) -> f32 {
1644    match status.unwrap_or("reviewed") {
1645        "verified" => 1.2,
1646        "reviewed" => 1.0,
1647        "draft" => 0.8,
1648        "deprecated" => 0.0,
1649        _ => 1.0,
1650    }
1651}
1652
1653fn apply_status_multipliers(hits: &mut Vec<ScoredHit>, include_deprecated: bool) {
1654    hits.retain_mut(|hit| {
1655        let multiplier = status_multiplier(hit.status.as_deref());
1656        // Squash raw score to (0,1) via monotonic s/(s+1) before applying the status
1657        // multiplier so that TF-IDF scores > 1 don't saturate ranking. RRF-normalized
1658        // scores (already ≤ 1) are squashed at most to 0.5, preserving relative order.
1659        hit.score = (hit.score / (hit.score + 1.0) * multiplier).clamp(0.0, 1.0);
1660        include_deprecated || multiplier > 0.0
1661    });
1662    hits.sort_by(|a, b| {
1663        b.score
1664            .partial_cmp(&a.score)
1665            .unwrap_or(std::cmp::Ordering::Equal)
1666            .then_with(|| a.slug.cmp(&b.slug))
1667    });
1668}
1669
1670// ─── candidate (tokenized) ───────────────────────────────────────────────────
1671
1672struct Candidate {
1673    id: String,
1674    slug: String,
1675    name_raw: String,
1676    description_raw: Option<String>,
1677    tags_raw: Option<String>,
1678    status_raw: Option<String>,
1679    finalized: bool,
1680    is_domain: bool,
1681    name: Vec<String>,
1682    description: Vec<String>,
1683    tags: Vec<String>,
1684    content: Vec<String>,
1685}
1686
1687fn load_candidates_from_atoms(atoms: &[Atom], type_filter: Option<&str>) -> Vec<Candidate> {
1688    let want_domain = type_filter == Some("domain");
1689    let want_atom = type_filter == Some("atom");
1690
1691    atoms
1692        .iter()
1693        .filter_map(|atom| {
1694            let tags_str = atom.tags_display();
1695            let is_domain = {
1696                let tags_arr: Vec<String> = serde_json::from_str(&atom.tags).unwrap_or_default();
1697                tags_arr.iter().any(|t| t == "type:domain")
1698            };
1699            if (want_domain && !is_domain) || (want_atom && is_domain) {
1700                return None;
1701            }
1702            Some(Candidate {
1703                id: atom.id.to_string(),
1704                slug: atom.slug.clone(),
1705                name_raw: atom.name.clone(),
1706                description_raw: atom.description.clone(),
1707                tags_raw: Some(tags_str.clone()),
1708                status_raw: atom.status.clone(),
1709                finalized: atom.finalized,
1710                is_domain,
1711                name: matching::tokenize_field(&atom.name),
1712                description: atom
1713                    .description
1714                    .as_deref()
1715                    .map(matching::tokenize_field)
1716                    .unwrap_or_default(),
1717                tags: matching::tokenize_field(&tags_str),
1718                content: matching::tokenize_field(&atom.content),
1719            })
1720        })
1721        .collect()
1722}
1723
1724// ─── IDF computation ──────────────────────────────────────────────────────────
1725
1726fn compute_idf(
1727    candidates: &[Candidate],
1728    terms: &[String],
1729    expanded: &HashSet<String>,
1730    discount: f32,
1731) -> HashMap<String, f32> {
1732    let n = candidates.len() as f32;
1733    let mut df: HashMap<String, usize> = terms.iter().map(|t| (t.clone(), 0)).collect();
1734    for cand in candidates {
1735        for term in terms {
1736            if matching::has_in_tokens(&cand.content, term)
1737                || matching::has_in_tokens(&cand.name, term)
1738                || matching::has_in_tokens(&cand.description, term)
1739                || matching::has_in_tokens(&cand.tags, term)
1740            {
1741                if let Some(d) = df.get_mut(term) {
1742                    *d += 1;
1743                }
1744            }
1745        }
1746    }
1747    df.into_iter()
1748        .map(|(term, d)| {
1749            let raw = (n / (d as f32 + 1.0)).ln().max(0.1);
1750            let idf = if expanded.contains(&term) {
1751                raw * discount
1752            } else {
1753                raw
1754            };
1755            (term, idf)
1756        })
1757        .collect()
1758}
1759
1760fn score_field(tokens: &[String], terms: &[String], idf: &HashMap<String, f32>) -> f32 {
1761    let mut score = 0.0;
1762    for term in terms {
1763        let count = matching::count_in_tokens(tokens, term);
1764        if count > 0 {
1765            let tf = 1.0 + (count as f32).ln();
1766            score += tf * idf.get(term).copied().unwrap_or(1.0);
1767        }
1768    }
1769    score
1770}
1771
1772fn bigram_bonus_field(tokens: &[String], query_order: &[String]) -> f32 {
1773    if query_order.len() < 2 {
1774        return 0.0;
1775    }
1776    let filtered: Vec<&str> = tokens
1777        .iter()
1778        .filter(|t| !is_stop(t))
1779        .map(|t| t.as_str())
1780        .collect();
1781    let mut bonus = 0.0f32;
1782    for window in query_order.windows(2) {
1783        let (a, b) = (window[0].as_str(), window[1].as_str());
1784        for w in filtered.windows(2) {
1785            if w[0] == a && w[1] == b {
1786                bonus += 1.0;
1787                break;
1788            }
1789        }
1790    }
1791    bonus
1792}
1793
1794fn exact_name_bonus(name: &str, raw_query: &str, bonus: f32) -> f32 {
1795    let q = raw_query.trim().to_lowercase();
1796    if !q.is_empty() && name.to_lowercase().contains(&q) {
1797        bonus
1798    } else {
1799        0.0
1800    }
1801}
1802
1803fn score_candidate(
1804    cand: &Candidate,
1805    terms: &[String],
1806    original_terms: &[String],
1807    query_order: &[String],
1808    idf: &HashMap<String, f32>,
1809    raw_query: &str,
1810    w: &Weights,
1811) -> f32 {
1812    let bigrams = bigram_bonus_field(&cand.name, query_order)
1813        + bigram_bonus_field(&cand.description, query_order)
1814        + bigram_bonus_field(&cand.tags, query_order)
1815        + bigram_bonus_field(&cand.content, query_order);
1816
1817    let base = exact_name_bonus(&cand.name_raw, raw_query, w.w_exact_name)
1818        + w.w_name * score_field(&cand.name, terms, idf)
1819        + w.w_description * score_field(&cand.description, terms, idf)
1820        + w.w_tags * score_field(&cand.tags, terms, idf)
1821        + w.w_content * score_field(&cand.content, terms, idf)
1822        + w.w_bigram * bigrams;
1823
1824    if w.coverage_alpha > 0.0 && !original_terms.is_empty() {
1825        // For each original query term, check whether it OR any of its expanded
1826        // variants matches the candidate. This ensures that "agents" → "agent"
1827        // expansion still earns coverage credit.
1828        let matched = original_terms
1829            .iter()
1830            .filter(|orig| {
1831                // Check the original term or any term in `terms` that starts with it
1832                // (expansion variants share the stem with the original).
1833                let has_exact = matching::has_in_tokens(&cand.name, orig)
1834                    || matching::has_in_tokens(&cand.description, orig)
1835                    || matching::has_in_tokens(&cand.tags, orig)
1836                    || matching::has_in_tokens(&cand.content, orig);
1837                if has_exact {
1838                    return true;
1839                }
1840                // Check if any expansion of this original matches.
1841                terms.iter().filter(|t| *t != *orig).any(|exp| {
1842                    matching::has_in_tokens(&cand.name, exp)
1843                        || matching::has_in_tokens(&cand.description, exp)
1844                        || matching::has_in_tokens(&cand.tags, exp)
1845                        || matching::has_in_tokens(&cand.content, exp)
1846                })
1847            })
1848            .count();
1849        let coverage = matched as f32 / original_terms.len() as f32;
1850        base * coverage.powf(w.coverage_alpha)
1851    } else {
1852        base
1853    }
1854}
1855
1856fn expand_terms(terms: &mut Vec<String>) -> HashSet<String> {
1857    let originals: HashSet<String> = terms.iter().cloned().collect();
1858    let snapshot: Vec<String> = terms.clone();
1859    for t in &snapshot {
1860        if !t.ends_with('s') && t.len() >= 3 {
1861            terms.push(format!("{t}s"));
1862        }
1863        if t.ends_with("ies") && t.len() > 4 {
1864            let s = format!("{}y", &t[..t.len() - 3]);
1865            if s.len() >= 3 {
1866                terms.push(s);
1867            }
1868        } else if t.ends_with('s') && !t.ends_with("ss") && t.len() > 3 {
1869            let s = t[..t.len() - 1].to_string();
1870            if s.len() >= 3 {
1871                terms.push(s);
1872            }
1873        }
1874    }
1875    terms.sort();
1876    terms.dedup();
1877    terms
1878        .iter()
1879        .filter(|t| !originals.contains(*t))
1880        .cloned()
1881        .collect()
1882}
1883
1884// ─── FTS5 phrase quoting ─────────────────────────────────────────────────────
1885
1886fn quote_fts5_phrase(raw_query: &str) -> String {
1887    let escaped = raw_query.replace('"', "\"\"");
1888    format!("\"{escaped}\"")
1889}
1890
1891// ─── FTS5 candidate pool fetch ────────────────────────────────────────────────
1892
1893async fn fetch_fts_candidates(
1894    runtime: &KhiveRuntime,
1895    ns: &str,
1896    raw_query: &str,
1897    type_filter: Option<&str>,
1898    statuses: &[String],
1899    exclude_status: Option<&str>,
1900    fetch_limit: usize,
1901) -> Result<Vec<Atom>, RuntimeError> {
1902    let sql = runtime.sql();
1903    let mut reader = sql
1904        .reader()
1905        .await
1906        .map_err(|e| sql_err("search fts reader", e))?;
1907
1908    // Use the FTS5 virtual table to get candidate atom IDs quickly.
1909    let match_expr = quote_fts5_phrase(raw_query);
1910    let fts_rows = reader
1911        .query_all(SqlStatement {
1912            sql: "SELECT id FROM fts_knowledge WHERE fts_knowledge MATCH ?1 AND namespace = ?2 LIMIT ?3".into(),
1913            params: vec![
1914                SqlValue::Text(match_expr),
1915                SqlValue::Text(ns.to_owned()),
1916                SqlValue::Integer(fetch_limit as i64),
1917            ],
1918            label: None,
1919        })
1920        .await
1921        .map_err(|e| sql_err("search fts query", e))?;
1922
1923    if fts_rows.is_empty() {
1924        // FTS returned nothing — fall back to full scan (small corpora) capped at CANDIDATE_POOL.
1925        let (status_clause, status_params) = status_sql_clause(statuses, exclude_status, 3);
1926        let sql_str = format!(
1927            "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND deleted_at IS NULL{} ORDER BY created_at DESC LIMIT ?2",
1928            status_clause
1929        );
1930        let mut params = vec![
1931            SqlValue::Text(ns.to_owned()),
1932            SqlValue::Integer(CANDIDATE_POOL as i64),
1933        ];
1934        params.extend(status_params);
1935
1936        let rows = reader
1937            .query_all(SqlStatement {
1938                sql: sql_str,
1939                params,
1940                label: None,
1941            })
1942            .await
1943            .map_err(|e| sql_err("search full scan", e))?;
1944
1945        let mut atoms: Vec<Atom> = rows.iter().filter_map(atom_from_row).collect();
1946        if let Some(filt) = type_filter {
1947            let want_domain = filt == "domain";
1948            atoms.retain(|a| {
1949                let tags_arr: Vec<String> = serde_json::from_str(&a.tags).unwrap_or_default();
1950                let is_domain = tags_arr.iter().any(|t| t == "type:domain");
1951                if want_domain {
1952                    is_domain
1953                } else {
1954                    !is_domain
1955                }
1956            });
1957        }
1958        return Ok(atoms);
1959    }
1960
1961    let ids: Vec<String> = fts_rows.iter().filter_map(|r| row_str(r, "id")).collect();
1962    let placeholders: String = ids
1963        .iter()
1964        .enumerate()
1965        .map(|(i, _)| format!("?{}", i + 2))
1966        .collect::<Vec<_>>()
1967        .join(",");
1968
1969    let (status_clause, status_params) = status_sql_clause(statuses, exclude_status, ids.len() + 2);
1970    let mut params: Vec<SqlValue> = vec![SqlValue::Text(ns.to_owned())];
1971    params.extend(ids.iter().map(|id| SqlValue::Text(id.clone())));
1972    params.extend(status_params);
1973
1974    let rows = reader
1975        .query_all(SqlStatement {
1976            sql: format!(
1977                "SELECT * FROM knowledge_atoms WHERE namespace = ?1 AND id IN ({placeholders}) AND deleted_at IS NULL{status_clause}"
1978            ),
1979            params,
1980            label: None,
1981        })
1982        .await
1983        .map_err(|e| sql_err("search load atoms", e))?;
1984
1985    Ok(rows.iter().filter_map(atom_from_row).collect())
1986}
1987
1988// ─── search context (groups args to stay under clippy limit) ─────────────────
1989
1990struct SearchCtx<'a> {
1991    runtime: &'a KhiveRuntime,
1992    ns: &'a str,
1993    role: Option<&'a str>,
1994    type_filter: Option<&'a str>,
1995    min_score: f32,
1996    w: &'a Weights,
1997    fetch_limit: usize,
1998    statuses: &'a [String],
1999    exclude_status: Option<&'a str>,
2000}
2001
2002// ─── core single-pass search ──────────────────────────────────────────────────
2003
2004async fn search_core(ctx: &SearchCtx<'_>, query: &str) -> Result<Vec<ScoredHit>, RuntimeError> {
2005    let runtime = ctx.runtime;
2006    let ns = ctx.ns;
2007    let role = ctx.role;
2008    let type_filter = ctx.type_filter;
2009    let min_score = ctx.min_score;
2010    let w = ctx.w;
2011    let fetch_limit = ctx.fetch_limit;
2012    let raw_query = query.trim().to_string();
2013    if raw_query.is_empty() {
2014        return Ok(Vec::new());
2015    }
2016
2017    let scored_query = match role {
2018        Some(r) if !r.trim().is_empty() => format!("{} {}", r.trim(), raw_query),
2019        _ => raw_query.clone(),
2020    };
2021
2022    let (terms, original_terms, query_order, expanded) = {
2023        let raw_tokens: Vec<String> = matching::tokenize_field(&scored_query)
2024            .into_iter()
2025            .filter(|w| w.len() >= MIN_TERM_LEN && !is_stop(w))
2026            .collect();
2027        let mut seen = HashSet::new();
2028        let qo: Vec<String> = raw_tokens
2029            .iter()
2030            .filter(|w| seen.insert(w.as_str()))
2031            .cloned()
2032            .collect();
2033        let mut t = raw_tokens;
2034        t.sort();
2035        t.dedup();
2036        let originals = t.clone();
2037        let exp = expand_terms(&mut t);
2038        (t, originals, qo, exp)
2039    };
2040    // When all query tokens are shorter than MIN_TERM_LEN (e.g. "RAG", "GQA", "LoRA"),
2041    // fall through to exact-name-bonus-only scoring rather than returning early.
2042    let terms_only_exact = terms.is_empty();
2043
2044    let atoms = fetch_fts_candidates(
2045        runtime,
2046        ns,
2047        &raw_query,
2048        type_filter,
2049        ctx.statuses,
2050        ctx.exclude_status,
2051        CANDIDATE_POOL,
2052    )
2053    .await?;
2054    if atoms.is_empty() {
2055        return Ok(Vec::new());
2056    }
2057
2058    let candidates = load_candidates_from_atoms(&atoms, type_filter);
2059    if candidates.is_empty() {
2060        return Ok(Vec::new());
2061    }
2062
2063    let idf = compute_idf(&candidates, &terms, &expanded, w.expand_discount);
2064    let mut scored: Vec<(f32, &Candidate)> = candidates
2065        .iter()
2066        .filter_map(|cand| {
2067            let base = if terms_only_exact {
2068                // All query terms were sub-MIN_TERM_LEN (short acronyms).
2069                // Score only via exact_name_bonus so e.g. "LoRA" or "RAG" match their atom.
2070                exact_name_bonus(&cand.name_raw, &raw_query, w.w_exact_name)
2071            } else {
2072                score_candidate(
2073                    cand,
2074                    &terms,
2075                    &original_terms,
2076                    &query_order,
2077                    &idf,
2078                    &raw_query,
2079                    w,
2080                )
2081            };
2082            (base > min_score).then_some((base, cand))
2083        })
2084        .collect();
2085    scored.sort_by(|a, b| {
2086        b.0.partial_cmp(&a.0)
2087            .unwrap_or(std::cmp::Ordering::Equal)
2088            .then_with(|| a.1.slug.cmp(&b.1.slug))
2089    });
2090
2091    Ok(scored
2092        .into_iter()
2093        .take(fetch_limit)
2094        .map(|(score, cand)| ScoredHit {
2095            id: cand.id.clone(),
2096            slug: cand.slug.clone(),
2097            name: cand.name_raw.clone(),
2098            description: cand.description_raw.clone(),
2099            tags: cand.tags_raw.clone(),
2100            status: cand.status_raw.clone(),
2101            finalized: cand.finalized,
2102            is_domain: cand.is_domain,
2103            score,
2104        })
2105        .collect())
2106}
2107
2108// ─── decomposed search ───────────────────────────────────────────────────────
2109
2110async fn search_decomposed(
2111    ctx: &SearchCtx<'_>,
2112    query: &str,
2113    intersection_bonus: f32,
2114) -> Result<Vec<ScoredHit>, RuntimeError> {
2115    let non_stop: Vec<&str> = query
2116        .split_whitespace()
2117        .filter(|w| w.len() >= MIN_TERM_LEN && !is_stop(&w.to_lowercase()))
2118        .collect();
2119
2120    let mid = non_stop.len() / 2;
2121    let sub_q1: String = non_stop[..mid].join(" ");
2122    let sub_q2: String = non_stop[mid..].join(" ");
2123    let sub_limit = ctx.fetch_limit.min(50);
2124
2125    let full = search_core(ctx, query).await?;
2126    let sub_ctx1 = SearchCtx {
2127        runtime: ctx.runtime,
2128        ns: ctx.ns,
2129        role: None,
2130        type_filter: ctx.type_filter,
2131        min_score: 0.0,
2132        w: ctx.w,
2133        fetch_limit: sub_limit,
2134        statuses: ctx.statuses,
2135        exclude_status: ctx.exclude_status,
2136    };
2137    let s1 = search_core(&sub_ctx1, &sub_q1).await?;
2138    let s2 = search_core(&sub_ctx1, &sub_q2).await?;
2139
2140    let mut scores: HashMap<String, f32> = HashMap::new();
2141    let mut data: HashMap<String, ScoredHit> = HashMap::new();
2142
2143    for hit in full {
2144        scores.insert(hit.id.clone(), hit.score);
2145        data.insert(hit.id.clone(), hit);
2146    }
2147
2148    let mut sub_counts: HashMap<String, u32> = HashMap::new();
2149    for hits in [s1, s2] {
2150        let mut seen: HashSet<String> = HashSet::new();
2151        for hit in hits {
2152            if !seen.insert(hit.id.clone()) {
2153                continue;
2154            }
2155            *sub_counts.entry(hit.id.clone()).or_default() += 1;
2156            if !data.contains_key(&hit.id) {
2157                scores.insert(hit.id.clone(), hit.score * 0.3);
2158                data.insert(hit.id.clone(), hit);
2159            }
2160        }
2161    }
2162
2163    for (id, count) in &sub_counts {
2164        if *count >= 2 {
2165            if let Some(s) = scores.get_mut(id) {
2166                *s *= 1.0 + intersection_bonus * (*count as f32 - 1.0);
2167            }
2168        }
2169    }
2170
2171    let mut ranked: Vec<ScoredHit> = data
2172        .into_values()
2173        .map(|mut h| {
2174            if let Some(&s) = scores.get(&h.id) {
2175                h.score = s;
2176            }
2177            h
2178        })
2179        .collect();
2180    ranked.sort_by(|a, b| {
2181        b.score
2182            .partial_cmp(&a.score)
2183            .unwrap_or(std::cmp::Ordering::Equal)
2184            .then_with(|| a.slug.cmp(&b.slug))
2185    });
2186    ranked.truncate(ctx.fetch_limit);
2187    Ok(ranked)
2188}
2189
2190// ─── embedding rerank ────────────────────────────────────────────────────────
2191
2192// Shared embed+cosine core: embeds [query] + candidate texts and returns one
2193// cosine score per candidate, or None if the embedder is absent or fails.
2194async fn embed_cosine_scores(
2195    runtime: &KhiveRuntime,
2196    query: &str,
2197    candidate_texts: &[String],
2198) -> Option<Vec<f32>> {
2199    if runtime.default_embedder_name().is_empty() || candidate_texts.is_empty() {
2200        return None;
2201    }
2202    let mut texts = Vec::with_capacity(candidate_texts.len() + 1);
2203    texts.push(query.to_string());
2204    texts.extend_from_slice(candidate_texts);
2205    let embeddings = runtime.embed_batch(&texts).await.ok()?;
2206    if embeddings.len() != texts.len() {
2207        return None;
2208    }
2209    let query_emb = &embeddings[0];
2210    Some(
2211        embeddings[1..]
2212            .iter()
2213            .map(|emb| cosine_similarity(query_emb, emb))
2214            .collect(),
2215    )
2216}
2217
2218async fn rerank_with_embeddings(
2219    runtime: &KhiveRuntime,
2220    query: &str,
2221    hits: &mut [ScoredHit],
2222    alpha: f32,
2223) -> Result<(), RuntimeError> {
2224    if hits.is_empty() {
2225        return Ok(());
2226    }
2227    let texts: Vec<String> = hits
2228        .iter()
2229        .map(|h| format!("{} {}", h.name, h.description.as_deref().unwrap_or("")))
2230        .collect();
2231    if let Some(cosines) = embed_cosine_scores(runtime, query, &texts).await {
2232        let max_tfidf = hits
2233            .iter()
2234            .map(|h| h.score)
2235            .fold(0.0f32, f32::max)
2236            .max(1e-6);
2237        for (hit, cos) in hits.iter_mut().zip(cosines.iter()) {
2238            let norm_tfidf = hit.score / max_tfidf;
2239            hit.score = alpha * norm_tfidf + (1.0 - alpha) * cos.max(0.0);
2240        }
2241        hits.sort_by(|a, b| {
2242            b.score
2243                .partial_cmp(&a.score)
2244                .unwrap_or(std::cmp::Ordering::Equal)
2245                .then_with(|| a.slug.cmp(&b.slug))
2246        });
2247    }
2248    Ok(())
2249}
2250
2251fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
2252    if a.len() != b.len() || a.is_empty() {
2253        return 0.0;
2254    }
2255    let mut dot = 0.0f32;
2256    let mut norm_a = 0.0f32;
2257    let mut norm_b = 0.0f32;
2258    for i in 0..a.len() {
2259        dot += a[i] * b[i];
2260        norm_a += a[i] * a[i];
2261        norm_b += b[i] * b[i];
2262    }
2263    let denom = norm_a.sqrt() * norm_b.sqrt();
2264    if denom < 1e-8 {
2265        0.0
2266    } else {
2267        dot / denom
2268    }
2269}
2270
2271// ─── embedding coverage ───────────────────────────────────────────────────────
2272
2273async fn compute_embedding_coverage(
2274    runtime: &KhiveRuntime,
2275    token: &NamespaceToken,
2276    ns: &str,
2277    total_atoms: i64,
2278) -> Result<f64, RuntimeError> {
2279    if total_atoms <= 0 || runtime.default_embedder_name().is_empty() {
2280        return Ok(0.0);
2281    }
2282
2283    match runtime.vectors(token) {
2284        Ok(_) => {}
2285        Err(RuntimeError::Unconfigured(_)) => return Ok(0.0),
2286        Err(e) => return Err(e),
2287    }
2288
2289    let model = runtime.default_embedder_name().to_owned();
2290    let table_name = format!("vec_{}", vamana::sanitize_model_key(&model));
2291    let sql = runtime.sql();
2292    let mut reader = sql
2293        .reader()
2294        .await
2295        .map_err(|e| sql_err("stats embedding coverage reader", e))?;
2296
2297    let count = reader
2298        .query_scalar(SqlStatement {
2299            sql: format!(
2300                "SELECT COUNT(DISTINCT a.id) \
2301                 FROM knowledge_atoms a \
2302                 WHERE a.namespace = ?1 \
2303                   AND a.deleted_at IS NULL \
2304                   AND a.tags NOT LIKE '%type:domain%' \
2305                   AND a.id IN ( \
2306                       SELECT v.subject_id FROM {table_name} v \
2307                       WHERE v.namespace = ?1 \
2308                         AND v.embedding_model = ?2 \
2309                         AND v.field = 'knowledge.atom' \
2310                   )"
2311            ),
2312            params: vec![SqlValue::Text(ns.to_owned()), SqlValue::Text(model.clone())],
2313            label: Some("knowledge_stats_embedding_coverage".into()),
2314        })
2315        .await
2316        .map_err(|e| sql_err("stats embedding coverage", e))?;
2317
2318    let atoms_with_vector = match count {
2319        Some(SqlValue::Integer(n)) => n,
2320        Some(other) => {
2321            return Err(RuntimeError::Internal(format!(
2322                "stats embedding coverage returned non-integer count: {other:?}"
2323            )));
2324        }
2325        None => 0,
2326    };
2327
2328    Ok(atoms_with_vector as f64 / total_atoms as f64)
2329}
2330
2331// ─── compose helpers ──────────────────────────────────────────────────────────
2332
2333struct ScoredTextItem {
2334    id: String,
2335    slug: String,
2336    name: String,
2337    text: String,
2338    score: f32,
2339}
2340
2341async fn load_domain_by_id_or_slug(
2342    runtime: &KhiveRuntime,
2343    ns: &str,
2344    id_or_slug: &str,
2345) -> Result<Domain, RuntimeError> {
2346    let sql = runtime.sql();
2347    let mut reader = sql
2348        .reader()
2349        .await
2350        .map_err(|e| sql_err("compose domain reader", e))?;
2351    let id = id_or_slug.trim().to_string();
2352    let row = if id.parse::<Uuid>().is_ok() {
2353        reader
2354            .query_row(SqlStatement {
2355                sql: "SELECT * FROM knowledge_domains WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2356                params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2357                label: None,
2358            })
2359            .await
2360            .map_err(|e| sql_err("compose domain by id", e))?
2361    } else {
2362        reader
2363            .query_row(SqlStatement {
2364                sql: "SELECT * FROM knowledge_domains WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2365                params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2366                label: None,
2367            })
2368            .await
2369            .map_err(|e| sql_err("compose domain by slug", e))?
2370    };
2371    row.and_then(|r| domain_from_row(&r))
2372        .ok_or_else(|| RuntimeError::NotFound(format!("domain not found: {id:?}")))
2373}
2374
2375async fn load_atom_by_id_or_slug(
2376    runtime: &KhiveRuntime,
2377    ns: &str,
2378    id_or_slug: &str,
2379) -> Result<Atom, RuntimeError> {
2380    let sql = runtime.sql();
2381    let mut reader = sql
2382        .reader()
2383        .await
2384        .map_err(|e| sql_err("compose atom reader", e))?;
2385    let id = id_or_slug.trim().to_string();
2386    let row = if id.parse::<Uuid>().is_ok() {
2387        reader
2388            .query_row(SqlStatement {
2389                sql: "SELECT * FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2390                params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2391                label: None,
2392            })
2393            .await
2394            .map_err(|e| sql_err("compose atom by id", e))?
2395    } else {
2396        reader
2397            .query_row(SqlStatement {
2398                sql: "SELECT * FROM knowledge_atoms WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2399                params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2400                label: None,
2401            })
2402            .await
2403            .map_err(|e| sql_err("compose atom by slug", e))?
2404    };
2405    row.and_then(|r| atom_from_row(&r))
2406        .ok_or_else(|| RuntimeError::NotFound(format!("atom not found: {id:?}")))
2407}
2408
2409fn parse_domain_members(domain: &Domain) -> Result<Vec<String>, RuntimeError> {
2410    if domain.members.is_empty() || domain.members == "[]" {
2411        return Ok(Vec::new());
2412    }
2413    serde_json::from_str::<Vec<String>>(&domain.members).map_err(|e| {
2414        RuntimeError::Internal(format!(
2415            "domain {:?} has invalid members JSON: {e}",
2416            domain.slug
2417        ))
2418    })
2419}
2420
2421async fn rerank_text_items(
2422    runtime: &KhiveRuntime,
2423    query: &str,
2424    items: &mut [ScoredTextItem],
2425) -> Result<(), RuntimeError> {
2426    if items.is_empty() {
2427        return Ok(());
2428    }
2429    let texts: Vec<String> = items.iter().map(|item| item.text.clone()).collect();
2430    if let Some(cosines) = embed_cosine_scores(runtime, query, &texts).await {
2431        for (item, cos) in items.iter_mut().zip(cosines.iter()) {
2432            item.score = cos.max(0.0);
2433        }
2434        items.sort_by(|a, b| {
2435            b.score
2436                .partial_cmp(&a.score)
2437                .unwrap_or(std::cmp::Ordering::Equal)
2438                .then_with(|| a.slug.cmp(&b.slug))
2439        });
2440    }
2441    Ok(())
2442}
2443
2444fn format_compose_markdown(query: &str, domains: &[Domain], atoms: &[(&Atom, f32)]) -> String {
2445    let mut out = String::from("# Knowledge Briefing\n\n");
2446    out.push_str(&format!("Query: {query}\n"));
2447    for (atom, score) in atoms {
2448        out.push_str(&format!("\n## {}\n\n", atom.name));
2449        out.push_str(&format!("Source: {}\n", atom.slug));
2450        out.push_str(&format!("Score: {:.4}\n", score));
2451        if let Some(ref desc) = atom.description {
2452            if !desc.is_empty() {
2453                out.push('\n');
2454                out.push_str(desc);
2455                out.push('\n');
2456            }
2457        }
2458        if !atom.content.is_empty() {
2459            out.push('\n');
2460            out.push_str(&atom.content);
2461            out.push('\n');
2462        }
2463    }
2464    if !domains.is_empty() {
2465        out.push_str("\n---\n\nDomains: ");
2466        let names: Vec<&str> = domains.iter().map(|d| d.name.as_str()).collect();
2467        out.push_str(&names.join(", "));
2468        out.push('\n');
2469    }
2470    out
2471}
2472
2473// ─── embed text helper ────────────────────────────────────────────────────────
2474
2475fn atom_embed_text(atom: &Atom) -> String {
2476    let mut parts: Vec<&str> = Vec::with_capacity(3);
2477    if !atom.name.is_empty() {
2478        parts.push(&atom.name);
2479    }
2480    if let Some(ref desc) = atom.description {
2481        if !desc.is_empty() {
2482            parts.push(desc.as_str());
2483        }
2484    }
2485    if !atom.content.is_empty() {
2486        parts.push(&atom.content);
2487    }
2488    parts.join("\n\n")
2489}
2490
2491// ─── section helpers ──────────────────────────────────────────────────────────
2492
2493#[allow(dead_code)]
2494fn section_from_row(row: &khive_storage::types::SqlRow) -> Option<Section> {
2495    let id: Uuid = row_str(row, "id")?.parse().ok()?;
2496    let st_str = row_str(row, "section_type")?;
2497    let section_type = SectionType::from_str_loose(&st_str)?;
2498    Some(Section {
2499        id,
2500        atom_id: row_str(row, "atom_id")?,
2501        namespace: row_str(row, "namespace")?,
2502        section_type,
2503        heading: row_str(row, "heading").unwrap_or_default(),
2504        content: row_str(row, "content").unwrap_or_default(),
2505        tokens: row_i64(row, "tokens").unwrap_or(0),
2506        sort_order: row_i64(row, "sort_order").unwrap_or(0),
2507        created_at: row_i64(row, "created_at").unwrap_or(0),
2508        updated_at: row_i64(row, "updated_at").unwrap_or(0),
2509    })
2510}
2511
2512#[allow(dead_code)]
2513fn section_to_json(s: &Section) -> Value {
2514    json!({
2515        "id": s.id.to_string(),
2516        "atom_id": s.atom_id,
2517        "namespace": s.namespace,
2518        "section_type": s.section_type.as_str(),
2519        "heading": s.heading,
2520        "content": s.content,
2521        "tokens": s.tokens,
2522        "sort_order": s.sort_order,
2523        "created_at": s.created_at,
2524        "updated_at": s.updated_at,
2525    })
2526}
2527
2528/// Naive token count: whitespace-split word count.
2529fn count_tokens(text: &str) -> i64 {
2530    text.split_whitespace().count() as i64
2531}
2532
2533/// Parse a SectionUpdate's `section_type` field into a `SectionType` enum,
2534/// returning a descriptive error on unknown values.
2535fn parse_section_type(s: &str) -> Result<SectionType, RuntimeError> {
2536    SectionType::from_str_loose(s).ok_or_else(|| {
2537        RuntimeError::InvalidInput(format!(
2538            "unknown section_type {s:?}; valid values: {}",
2539            SectionType::NAMES.join(", ")
2540        ))
2541    })
2542}
2543
2544async fn resolve_atom_id(
2545    runtime: &KhiveRuntime,
2546    ns: &str,
2547    id_or_slug: &str,
2548) -> Result<String, RuntimeError> {
2549    let sql = runtime.sql();
2550    let mut reader = sql
2551        .reader()
2552        .await
2553        .map_err(|e| sql_err("resolve_atom_id reader", e))?;
2554    let id = id_or_slug.trim().to_string();
2555    let row = if id.parse::<Uuid>().is_ok() {
2556        reader
2557            .query_row(SqlStatement {
2558                sql: "SELECT id FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2559                params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2560                label: None,
2561            })
2562            .await
2563            .map_err(|e| sql_err("resolve_atom_id by id", e))?
2564    } else {
2565        reader
2566            .query_row(SqlStatement {
2567                sql: "SELECT id FROM knowledge_atoms WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2568                params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.to_owned())],
2569                label: None,
2570            })
2571            .await
2572            .map_err(|e| sql_err("resolve_atom_id by slug", e))?
2573    };
2574    row.and_then(|r| row_str(&r, "id"))
2575        .ok_or_else(|| RuntimeError::NotFound(format!("atom not found: {id:?}")))
2576}
2577
2578impl KnowledgeHandlers {
2579    // ── edit ─────────────────────────────────────────────────────────────────
2580
2581    /// Upsert sections for a knowledge atom without touching sibling sections.
2582    ///
2583    /// Each (atom_id, section_type) pair is upserted atomically using SQLite's
2584    /// `INSERT OR REPLACE` semantics backed by the UNIQUE(atom_id, section_type)
2585    /// constraint. Sections not named in the call are left untouched.
2586    pub(crate) async fn edit(
2587        runtime: &KhiveRuntime,
2588        token: &NamespaceToken,
2589        params: Value,
2590    ) -> Result<Value, RuntimeError> {
2591        let p: EditParams = deser(params)?;
2592        if p.sections.is_empty() {
2593            return Err(RuntimeError::InvalidInput(
2594                "sections must not be empty".into(),
2595            ));
2596        }
2597
2598        let ns = token.namespace().as_str().to_owned();
2599        let sql = runtime.sql();
2600
2601        // Resolve the atom (by UUID or slug).
2602        let atom_id = {
2603            let mut reader = sql
2604                .reader()
2605                .await
2606                .map_err(|e| sql_err("edit atom reader", e))?;
2607            let id = p.id.trim().to_string();
2608            let row = if id.parse::<Uuid>().is_ok() {
2609                reader
2610                    .query_row(SqlStatement {
2611                        sql: "SELECT id FROM knowledge_atoms WHERE id = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2612                        params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
2613                        label: None,
2614                    })
2615                    .await
2616                    .map_err(|e| sql_err("edit atom lookup by id", e))?
2617            } else {
2618                reader
2619                    .query_row(SqlStatement {
2620                        sql: "SELECT id FROM knowledge_atoms WHERE slug = ?1 AND namespace = ?2 AND deleted_at IS NULL LIMIT 1".into(),
2621                        params: vec![SqlValue::Text(id.clone()), SqlValue::Text(ns.clone())],
2622                        label: None,
2623                    })
2624                    .await
2625                    .map_err(|e| sql_err("edit atom lookup by slug", e))?
2626            };
2627            row.and_then(|r| row_str(&r, "id"))
2628                .ok_or_else(|| RuntimeError::NotFound(format!("atom not found: {:?}", p.id)))?
2629        };
2630
2631        let now = now_us();
2632        let mut upserted = 0usize;
2633        let mut section_results: Vec<Value> = Vec::with_capacity(p.sections.len());
2634
2635        for su in &p.sections {
2636            let stype = parse_section_type(&su.section_type)?;
2637            let heading = su.heading.as_deref().unwrap_or(stype.as_str()).to_string();
2638            let tokens = count_tokens(&su.content);
2639            let sort_order = su.sort_order.unwrap_or_else(|| {
2640                SectionType::ALL
2641                    .iter()
2642                    .position(|&t| t == stype)
2643                    .unwrap_or(9) as i64
2644            });
2645
2646            // Fetch the existing section id (and status) if it exists (for stable IDs on re-edit).
2647            let mut reader = sql
2648                .reader()
2649                .await
2650                .map_err(|e| sql_err("edit section reader", e))?;
2651            let existing_section = reader
2652                .query_row(SqlStatement {
2653                    sql: "SELECT id, status FROM knowledge_sections WHERE atom_id = ?1 AND section_type = ?2 LIMIT 1".into(),
2654                    params: vec![
2655                        SqlValue::Text(atom_id.clone()),
2656                        SqlValue::Text(stype.as_str().to_string()),
2657                    ],
2658                    label: None,
2659                })
2660                .await
2661                .map_err(|e| sql_err("edit section lookup", e))?;
2662
2663            let was_verified = existing_section
2664                .as_ref()
2665                .and_then(|r| row_str(r, "status"))
2666                .as_deref()
2667                == Some("verified");
2668            let section_id = existing_section
2669                .as_ref()
2670                .and_then(|r| row_str(r, "id"))
2671                .unwrap_or_else(new_id);
2672
2673            let mut writer = sql
2674                .writer()
2675                .await
2676                .map_err(|e| sql_err("edit section writer", e))?;
2677            writer
2678                .execute(SqlStatement {
2679                    sql: "INSERT INTO knowledge_sections \
2680                          (id, atom_id, namespace, section_type, heading, content, tokens, sort_order, created_at, updated_at) \
2681                          VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) \
2682                          ON CONFLICT(atom_id, section_type) DO UPDATE SET \
2683                            heading=excluded.heading, \
2684                            content=excluded.content, \
2685                            tokens=excluded.tokens, \
2686                            sort_order=excluded.sort_order, \
2687                            embedding=NULL, \
2688                            updated_at=excluded.updated_at"
2689                        .into(),
2690                    params: vec![
2691                        SqlValue::Text(section_id.clone()),
2692                        SqlValue::Text(atom_id.clone()),
2693                        SqlValue::Text(ns.clone()),
2694                        SqlValue::Text(stype.as_str().to_string()),
2695                        SqlValue::Text(heading.clone()),
2696                        SqlValue::Text(su.content.clone()),
2697                        SqlValue::Integer(tokens),
2698                        SqlValue::Integer(sort_order),
2699                        SqlValue::Integer(now),
2700                        SqlValue::Integer(now),
2701                    ],
2702                    label: None,
2703                })
2704                .await
2705                .map_err(|e| sql_err("edit section upsert", e))?;
2706
2707            if was_verified {
2708                writer
2709                    .execute(SqlStatement {
2710                        sql: "UPDATE knowledge_sections SET status='reviewed' WHERE atom_id=?1 AND section_type=?2 AND status='verified'".into(),
2711                        params: vec![
2712                            SqlValue::Text(atom_id.clone()),
2713                            SqlValue::Text(stype.as_str().to_string()),
2714                        ],
2715                        label: None,
2716                    })
2717                    .await
2718                    .map_err(|e| sql_err("edit section status transition", e))?;
2719            }
2720
2721            upserted += 1;
2722            section_results.push(json!({
2723                "id": section_id,
2724                "atom_id": atom_id,
2725                "section_type": stype.as_str(),
2726                "heading": heading,
2727                "tokens": tokens,
2728            }));
2729        }
2730
2731        Ok(json!({
2732            "atom_id": atom_id,
2733            "upserted": upserted,
2734            "sections": section_results,
2735        }))
2736    }
2737
2738    // ── import ────────────────────────────────────────────────────────────────
2739
2740    /// Ingest a markdown file (or directory of markdown files) into the knowledge corpus.
2741    ///
2742    /// Parses the markdown into section-typed atoms using the atlas heading normalization
2743    /// map in `SectionType::from_str_loose`. Each `## Heading` creates one section of
2744    /// the detected type; content before the first `##` heading becomes the atom's
2745    /// `content` field (flat body).
2746    ///
2747    /// The atom slug is derived from the file stem (lower-kebab). If an atom with that
2748    /// slug already exists it is updated (upsert semantics). Sections are upserted
2749    /// individually, so re-importing a file only changes sections whose content changed.
2750    pub(crate) async fn import(
2751        runtime: &KhiveRuntime,
2752        token: &NamespaceToken,
2753        params: Value,
2754    ) -> Result<Value, RuntimeError> {
2755        let p: ImportParams = deser(params)?;
2756        let path_str = p.path.trim().to_string();
2757        if path_str.is_empty() {
2758            return Err(RuntimeError::InvalidInput("path must not be empty".into()));
2759        }
2760
2761        let chunk_strategy = p
2762            .chunk_strategy
2763            .as_deref()
2764            .unwrap_or("section")
2765            .to_ascii_lowercase();
2766        if !["section", "atom"].contains(&chunk_strategy.as_str()) {
2767            return Err(RuntimeError::InvalidInput(format!(
2768                "unknown chunk_strategy {:?}; valid: section | atom",
2769                chunk_strategy
2770            )));
2771        }
2772        let format = p.format.as_deref().unwrap_or("atlas_md");
2773        if format != "atlas_md" {
2774            return Err(RuntimeError::InvalidInput(format!(
2775                "unknown format {format:?}; only \"atlas_md\" is supported"
2776            )));
2777        }
2778
2779        let md_path = std::path::Path::new(&path_str);
2780        if !md_path.exists() {
2781            return Err(RuntimeError::NotFound(format!(
2782                "path does not exist: {path_str:?}"
2783            )));
2784        }
2785
2786        // Collect markdown files to import.
2787        let files: Vec<std::path::PathBuf> = if md_path.is_file() {
2788            vec![md_path.to_path_buf()]
2789        } else if md_path.is_dir() {
2790            let mut v = Vec::new();
2791            collect_md_files(md_path, &mut v);
2792            v
2793        } else {
2794            return Err(RuntimeError::InvalidInput(format!(
2795                "path is not a file or directory: {path_str:?}"
2796            )));
2797        };
2798
2799        if files.is_empty() {
2800            return Ok(json!({
2801                "imported_atoms": 0,
2802                "imported_sections": 0,
2803                "files_processed": 0,
2804            }));
2805        }
2806
2807        let mut imported_atoms = 0usize;
2808        let mut imported_sections = 0usize;
2809
2810        for file in &files {
2811            let content = std::fs::read_to_string(file)
2812                .map_err(|e| RuntimeError::Internal(format!("failed to read {:?}: {e}", file)))?;
2813
2814            let stem = file
2815                .file_stem()
2816                .and_then(|s| s.to_str())
2817                .unwrap_or("unknown");
2818            let slug = to_slug(stem);
2819
2820            let (atom_name, atom_body, sections) = parse_atlas_md(&content);
2821            let name = if atom_name.is_empty() {
2822                slug.replace('-', " ")
2823            } else {
2824                atom_name
2825            };
2826
2827            // Upsert the atom.
2828            let atlas_id = extract_atlas_id(&content);
2829            let citation_count = sections
2830                .iter()
2831                .filter(|(stype, _, _)| *stype == SectionType::References)
2832                .map(|(_, _, body)| body.lines().filter(|line| !line.trim().is_empty()).count())
2833                .sum::<usize>();
2834            let source_uri = atlas_id.as_ref().map(|id| format!("atlas:{id}"));
2835            let source_type = if citation_count > 0 {
2836                "paper"
2837            } else {
2838                "imported"
2839            };
2840            let mut properties = serde_json::Map::new();
2841            if let Some(ref id) = atlas_id {
2842                properties.insert("atlas_id".to_string(), Value::String(id.clone()));
2843            }
2844
2845            let upsert_params = serde_json::json!({
2846                "atoms": [{
2847                    "slug": slug,
2848                    "name": name,
2849                    "content": atom_body,
2850                    "properties": Value::Object(properties),
2851                    "source_uri": source_uri,
2852                    "source_type": source_type,
2853                }]
2854            });
2855            KnowledgeHandlers::upsert_atoms(runtime, token, upsert_params).await?;
2856            imported_atoms += 1;
2857
2858            // Upsert sections (if chunk_strategy == "section").
2859            if chunk_strategy == "section" && !sections.is_empty() {
2860                let section_updates: Vec<Value> = sections
2861                    .iter()
2862                    .map(|(stype, heading, body)| {
2863                        json!({
2864                            "section_type": stype.as_str(),
2865                            "heading": heading,
2866                            "content": body,
2867                        })
2868                    })
2869                    .collect();
2870                let edit_params = json!({
2871                    "id": slug,
2872                    "sections": section_updates,
2873                });
2874                let result = KnowledgeHandlers::edit(runtime, token, edit_params).await?;
2875                if let Some(n) = result.get("upserted").and_then(|v| v.as_u64()) {
2876                    imported_sections += n as usize;
2877                }
2878            }
2879        }
2880
2881        Ok(json!({
2882            "imported_atoms": imported_atoms,
2883            "imported_sections": imported_sections,
2884            "files_processed": files.len(),
2885        }))
2886    }
2887
2888    // ── challenge ─────────────────────────────────────────────────────────────
2889
2890    pub(crate) async fn challenge(
2891        runtime: &KhiveRuntime,
2892        token: &NamespaceToken,
2893        params: Value,
2894    ) -> Result<Value, RuntimeError> {
2895        let p: ChallengeParams = deser(params)?;
2896        let ns = token.namespace().as_str().to_owned();
2897        let sql = runtime.sql();
2898
2899        let atom_id = resolve_atom_id(runtime, &ns, &p.atom_id).await?;
2900        let stype = parse_section_type(&p.section_type)?;
2901
2902        let mut writer = sql
2903            .writer()
2904            .await
2905            .map_err(|e| sql_err("challenge writer", e))?;
2906
2907        let affected = writer
2908            .execute(SqlStatement {
2909                sql: "UPDATE knowledge_sections SET status='disputed' WHERE atom_id=?1 AND section_type=?2 AND status NOT IN ('disputed','deprecated')".into(),
2910                params: vec![
2911                    SqlValue::Text(atom_id.clone()),
2912                    SqlValue::Text(stype.as_str().to_string()),
2913                ],
2914                label: None,
2915            })
2916            .await
2917            .map_err(|e| sql_err("challenge section status", e))?;
2918
2919        if affected == 0 {
2920            return Err(RuntimeError::InvalidInput(
2921                "section not found, already disputed, or deprecated".into(),
2922            ));
2923        }
2924
2925        writer
2926            .execute(SqlStatement {
2927                sql: "UPDATE knowledge_atoms SET properties=json_set(coalesce(properties,'{}'),'$.dispute_count',coalesce(json_extract(properties,'$.dispute_count'),0)+1) WHERE id=?1 AND namespace=?2".into(),
2928                params: vec![
2929                    SqlValue::Text(atom_id.clone()),
2930                    SqlValue::Text(ns.clone()),
2931                ],
2932                label: None,
2933            })
2934            .await
2935            .map_err(|e| sql_err("challenge dispute_count increment", e))?;
2936
2937        Ok(json!({
2938            "atom_id": atom_id,
2939            "section_type": stype.as_str(),
2940            "reason": p.reason,
2941        }))
2942    }
2943
2944    // ── adjudicate ────────────────────────────────────────────────────────────
2945
2946    pub(crate) async fn adjudicate(
2947        runtime: &KhiveRuntime,
2948        token: &NamespaceToken,
2949        params: Value,
2950    ) -> Result<Value, RuntimeError> {
2951        let p: AdjudicateParams = deser(params)?;
2952        let ns = token.namespace().as_str().to_owned();
2953        let sql = runtime.sql();
2954
2955        let resolution = p.resolution.trim().to_ascii_lowercase();
2956        if resolution != "accept" && resolution != "reject" {
2957            return Err(RuntimeError::InvalidInput(
2958                "resolution must be \"accept\" or \"reject\"".into(),
2959            ));
2960        }
2961
2962        let atom_id = resolve_atom_id(runtime, &ns, &p.atom_id).await?;
2963        let stype = parse_section_type(&p.section_type)?;
2964
2965        let new_status = if resolution == "accept" {
2966            "verified"
2967        } else {
2968            "reviewed"
2969        };
2970
2971        let mut writer = sql
2972            .writer()
2973            .await
2974            .map_err(|e| sql_err("adjudicate writer", e))?;
2975
2976        let affected = writer
2977            .execute(SqlStatement {
2978                sql: format!(
2979                    "UPDATE knowledge_sections SET status='{new_status}' WHERE atom_id=?1 AND section_type=?2 AND status='disputed'"
2980                ),
2981                params: vec![
2982                    SqlValue::Text(atom_id.clone()),
2983                    SqlValue::Text(stype.as_str().to_string()),
2984                ],
2985                label: None,
2986            })
2987            .await
2988            .map_err(|e| sql_err("adjudicate section status", e))?;
2989
2990        if affected == 0 {
2991            return Err(RuntimeError::InvalidInput(
2992                "section not found or not in disputed state".into(),
2993            ));
2994        }
2995
2996        writer
2997            .execute(SqlStatement {
2998                sql: "UPDATE knowledge_atoms SET properties=json_set(coalesce(properties,'{}'),'$.dispute_count',CASE WHEN coalesce(json_extract(properties,'$.dispute_count'),0) > 0 THEN coalesce(json_extract(properties,'$.dispute_count'),0)-1 ELSE 0 END) WHERE id=?1 AND namespace=?2".into(),
2999                params: vec![
3000                    SqlValue::Text(atom_id.clone()),
3001                    SqlValue::Text(ns.clone()),
3002                ],
3003                label: None,
3004            })
3005            .await
3006            .map_err(|e| sql_err("adjudicate dispute_count decrement", e))?;
3007
3008        Ok(json!({
3009            "atom_id": atom_id,
3010            "section_type": stype.as_str(),
3011            "resolution": resolution,
3012            "new_status": new_status,
3013        }))
3014    }
3015}
3016
3017// ─── markdown parsing helpers ─────────────────────────────────────────────────
3018
3019/// Collect all `.md` files recursively under `dir`.
3020fn collect_md_files(dir: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
3021    if let Ok(entries) = std::fs::read_dir(dir) {
3022        for entry in entries.flatten() {
3023            let path = entry.path();
3024            if path.is_dir() {
3025                collect_md_files(&path, out);
3026            } else if path.extension().and_then(|e| e.to_str()) == Some("md") {
3027                out.push(path);
3028            }
3029        }
3030    }
3031}
3032
3033/// Convert a file stem to a URL-safe slug.
3034///
3035/// Lower-cases the stem and replaces spaces and underscores with hyphens,
3036/// keeping alphanumeric characters and hyphens.
3037fn to_slug(stem: &str) -> String {
3038    stem.to_ascii_lowercase()
3039        .chars()
3040        .map(|c| {
3041            if c.is_ascii_alphanumeric() || c == '-' {
3042                c
3043            } else {
3044                '-'
3045            }
3046        })
3047        .collect::<String>()
3048        .split('-')
3049        .filter(|s| !s.is_empty())
3050        .collect::<Vec<_>>()
3051        .join("-")
3052}
3053
3054/// Parse atlas-format markdown into (title, pre-section body, sections).
3055///
3056/// Atlas markdown structure:
3057/// ```text
3058/// # Title
3059///
3060/// Optional introductory text that becomes the atom body.
3061///
3062/// ## Section Heading
3063///
3064/// Section content...
3065///
3066/// ## Another Section
3067///
3068/// More content...
3069/// ```
3070///
3071/// Returns `(name, atom_body, Vec<(SectionType, heading, content)>)`.
3072/// `name` is the `# Title` text (empty if absent).
3073/// `atom_body` is text before the first `##` heading.
3074/// Each tuple in the vec is `(SectionType, heading_text, body_text)`.
3075/// Headings that don't map to a `SectionType` are classified as `Other`.
3076fn extract_atlas_id(content: &str) -> Option<String> {
3077    content.lines().take(32).find_map(|line| {
3078        let trimmed = line.trim();
3079        trimmed
3080            .strip_prefix("atlas_id:")
3081            .or_else(|| trimmed.strip_prefix("atlas-id:"))
3082            .map(str::trim)
3083            .filter(|s| !s.is_empty())
3084            .map(str::to_string)
3085    })
3086}
3087
3088fn parse_atlas_md(content: &str) -> (String, String, Vec<(SectionType, String, String)>) {
3089    let mut name = String::new();
3090    let mut pre_body = String::new();
3091    let mut sections: Vec<(SectionType, String, String)> = Vec::new();
3092
3093    // State: None = pre-first-heading, Some(idx) = inside section at index
3094    let mut in_pre = true;
3095    let mut current_heading: Option<(SectionType, String)> = None;
3096    let mut current_body = String::new();
3097
3098    for line in content.lines() {
3099        if let Some(rest) = line.strip_prefix("# ") {
3100            if name.is_empty() {
3101                // Document title.
3102                name = rest.trim().to_string();
3103                in_pre = true;
3104            }
3105            continue;
3106        }
3107        if let Some(rest) = line.strip_prefix("## ") {
3108            // Save the previous section (if any).
3109            if let Some((stype, heading)) = current_heading.take() {
3110                sections.push((stype, heading, current_body.trim_end().to_string()));
3111                current_body.clear();
3112            } else if in_pre {
3113                // The pre-section body ends here.
3114                pre_body = current_body.trim_end().to_string();
3115                current_body.clear();
3116                in_pre = false;
3117            }
3118            let heading_text = rest.trim().to_string();
3119            let stype = SectionType::from_str_loose(&heading_text).unwrap_or(SectionType::Other);
3120            current_heading = Some((stype, heading_text));
3121            continue;
3122        }
3123        // Accumulate content.
3124        current_body.push_str(line);
3125        current_body.push('\n');
3126    }
3127
3128    // Flush the last section or pre-body.
3129    if let Some((stype, heading)) = current_heading {
3130        sections.push((stype, heading, current_body.trim_end().to_string()));
3131    } else {
3132        pre_body = current_body.trim_end().to_string();
3133    }
3134
3135    (name, pre_body, sections)
3136}
3137
3138#[cfg(test)]
3139mod tests {
3140    use super::*;
3141
3142    // #523: RRF normalization must be monotonic and bounded.
3143    #[test]
3144    fn normalize_rrf_score_is_bounded_and_monotonic() {
3145        let k = RRF_K;
3146        // Single source: theoretical max = 1/(k+1).
3147        let max_single = 1.0f32 / (k as f32 + 1.0);
3148        let scores_single = [
3149            max_single * 0.25,
3150            max_single * 0.5,
3151            max_single,
3152            max_single * 1.5,
3153        ];
3154        let normed_single: Vec<f32> = scores_single
3155            .iter()
3156            .map(|&r| normalize_rrf_score(r, 1, k))
3157            .collect();
3158        // All in [0,1].
3159        for &s in &normed_single {
3160            assert!((0.0..=1.0).contains(&s), "score out of range: {s}");
3161        }
3162        // Monotonic for values under the max (clamping only at theoretical max).
3163        assert!(normed_single[0] < normed_single[1]);
3164        assert!(normed_single[1] < normed_single[2]);
3165        // Clamped at 1.0 for values above the theoretical max.
3166        assert_eq!(normed_single[3], 1.0);
3167
3168        // Two sources: theoretical max = 2/(k+1).
3169        let max_two = 2.0f32 / (k as f32 + 1.0);
3170        let scores_two = [max_two * 0.25, max_two * 0.75, max_two, max_two * 2.0];
3171        let normed_two: Vec<f32> = scores_two
3172            .iter()
3173            .map(|&r| normalize_rrf_score(r, 2, k))
3174            .collect();
3175        for &s in &normed_two {
3176            assert!((0.0..=1.0).contains(&s), "score out of range: {s}");
3177        }
3178        assert!(normed_two[0] < normed_two[1]);
3179        assert!(normed_two[1] < normed_two[2]);
3180        assert_eq!(normed_two[3], 1.0);
3181
3182        // Raw order equals normalized order for unequal inputs (no rank inversion).
3183        let raw = [0.001f32, 0.005, 0.010, 0.015];
3184        let normed: Vec<f32> = raw.iter().map(|&r| normalize_rrf_score(r, 1, k)).collect();
3185        let raw_order: Vec<usize> = {
3186            let mut idx: Vec<usize> = (0..raw.len()).collect();
3187            idx.sort_by(|&a, &b| raw[b].partial_cmp(&raw[a]).unwrap());
3188            idx
3189        };
3190        let norm_order: Vec<usize> = {
3191            let mut idx: Vec<usize> = (0..normed.len()).collect();
3192            idx.sort_by(|&a, &b| normed[b].partial_cmp(&normed[a]).unwrap());
3193            idx
3194        };
3195        assert_eq!(
3196            raw_order, norm_order,
3197            "normalization must not invert ranking"
3198        );
3199    }
3200
3201    #[test]
3202    fn normalize_rrf_score_zero_source_count_returns_zero() {
3203        assert_eq!(normalize_rrf_score(0.5, 0, RRF_K), 0.0);
3204    }
3205}