Skip to main content

fathomdb_engine/admin/
introspection.rs

1//! Pack H: admin introspection APIs.
2//!
3//! Read-side aggregation surfaces that let callers detect per-kind
4//! vector / FTS configuration drift. fathomdb deliberately has no
5//! client-side "expected kinds" registry — these methods expose what
6//! has actually been configured in the database so that callers can
7//! cross-reference against their own kind list.
8
9use std::collections::BTreeMap;
10
11use rusqlite::OptionalExtension;
12use serde::Serialize;
13
14use crate::EngineError;
15
16use super::AdminService;
17
18/// Static install/build surface: feature flags, presets, and versions.
19///
20/// Pure function — does NOT touch the database. Intended for
21/// `admin.capabilities()` to let clients assert what the running binary
22/// supports without opening a connection.
23#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
24pub struct Capabilities {
25    /// `sqlite-vec` feature compiled in.
26    pub sqlite_vec: bool,
27    /// FTS tokenizer preset names (matches the first column of
28    /// [`crate::TOKENIZER_PRESETS`]).
29    pub fts_tokenizers: Vec<String>,
30    /// Known embedder slots. `"builtin"` is always present; its
31    /// `available` flag reflects the `default-embedder` feature.
32    pub embedders: BTreeMap<String, EmbedderCapability>,
33    /// Latest schema version this binary knows how to apply.
34    pub schema_version: u32,
35    /// `CARGO_PKG_VERSION` of the `fathomdb-engine` crate at build time.
36    pub fathomdb_version: String,
37}
38
39/// Per-embedder capability entry on [`Capabilities::embedders`].
40#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
41pub struct EmbedderCapability {
42    /// True if this embedder is compiled in and could be constructed by
43    /// the engine at `open()` time.
44    pub available: bool,
45    /// Model identity the embedder reports (populated only when
46    /// `available`). e.g. `"BAAI/bge-small-en-v1.5"`.
47    pub model_identity: Option<String>,
48    /// Vector dimension the embedder produces.
49    pub dimensions: Option<usize>,
50    /// Maximum tokens per single embed call.
51    pub max_tokens: Option<usize>,
52}
53
54/// Snapshot of the runtime configuration that drives vector / FTS
55/// projection behaviour.
56#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
57pub struct CurrentConfig {
58    /// Currently active embedding profile row, if any.
59    pub active_embedding_profile: Option<EmbeddingProfileSummary>,
60    /// All rows in `vector_index_schemas`, keyed by `kind`.
61    pub vec_kinds: BTreeMap<String, VecKindConfig>,
62    /// All FTS profiles (from `projection_profiles` where facet='fts'),
63    /// keyed by `kind`.
64    pub fts_kinds: BTreeMap<String, FtsKindConfig>,
65    /// Bulk counts across `vector_projection_work`.
66    pub work_queue: WorkQueueSummary,
67}
68
69/// Slim projection of `vector_embedding_profiles` WHERE active=1.
70#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
71pub struct EmbeddingProfileSummary {
72    pub profile_id: i64,
73    pub model_identity: String,
74    pub model_version: Option<String>,
75    pub dimensions: i64,
76    pub normalization_policy: Option<String>,
77    pub max_tokens: Option<i64>,
78    pub activated_at: Option<i64>,
79}
80
81/// Per-kind vector index configuration (one row of `vector_index_schemas`).
82#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
83pub struct VecKindConfig {
84    pub kind: String,
85    pub enabled: bool,
86    pub source_mode: String,
87    pub state: String,
88    pub last_error: Option<String>,
89    pub last_completed_at: Option<i64>,
90    pub updated_at: i64,
91}
92
93/// Slim per-kind FTS view — enough for a drift check.
94#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
95pub struct FtsKindConfig {
96    pub kind: String,
97    pub tokenizer: String,
98    pub property_schema_present: bool,
99}
100
101/// Aggregated counts across `vector_projection_work`.
102#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize)]
103pub struct WorkQueueSummary {
104    pub pending_incremental: u64,
105    pub pending_backfill: u64,
106    pub inflight: u64,
107    pub failed: u64,
108    pub discarded: u64,
109}
110
111/// Per-kind view produced by [`AdminService::describe_kind`].
112#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
113pub struct KindDescription {
114    pub kind: String,
115    pub vec: Option<VecKindConfig>,
116    pub fts: Option<FtsKindConfig>,
117    /// Count of canonical chunks belonging to active nodes of this kind.
118    pub chunk_count: u64,
119    /// Row count in `vec_<kind>` if the table exists, else `None`.
120    pub vec_rows: Option<u64>,
121    /// Active embedding profile identity, for convenience.
122    pub embedding_identity: Option<String>,
123}
124
125impl AdminService {
126    /// Return the static install/build surface. Does not open the DB.
127    #[must_use]
128    pub fn capabilities() -> Capabilities {
129        let fts_tokenizers: Vec<String> = super::TOKENIZER_PRESETS
130            .iter()
131            .map(|(name, _)| (*name).to_owned())
132            .collect();
133
134        let mut embedders: BTreeMap<String, EmbedderCapability> = BTreeMap::new();
135        embedders.insert("builtin".to_owned(), builtin_embedder_capability());
136
137        let schema_version = fathomdb_schema::SchemaManager::new().current_version().0;
138
139        Capabilities {
140            sqlite_vec: cfg!(feature = "sqlite-vec"),
141            fts_tokenizers,
142            embedders,
143            schema_version,
144            fathomdb_version: env!("CARGO_PKG_VERSION").to_owned(),
145        }
146    }
147
148    /// Return a snapshot of runtime configuration: active embedding
149    /// profile, all `vector_index_schemas` rows, all FTS profiles, and
150    /// aggregate work-queue counts.
151    ///
152    /// Aggregates only — all underlying tables are already individually
153    /// queryable via other admin methods. Single read transaction.
154    ///
155    /// # Errors
156    /// Returns [`EngineError`] on database failure.
157    pub fn current_config(&self) -> Result<CurrentConfig, EngineError> {
158        let conn = self.connect()?;
159
160        let active_embedding_profile = conn
161            .query_row(
162                "SELECT profile_id, model_identity, model_version, dimensions, \
163                        normalization_policy, max_tokens, activated_at \
164                 FROM vector_embedding_profiles WHERE active = 1",
165                [],
166                |row| {
167                    Ok(EmbeddingProfileSummary {
168                        profile_id: row.get(0)?,
169                        model_identity: row.get(1)?,
170                        model_version: row.get(2)?,
171                        dimensions: row.get(3)?,
172                        normalization_policy: row.get(4)?,
173                        max_tokens: row.get(5)?,
174                        activated_at: row.get(6)?,
175                    })
176                },
177            )
178            .optional()?;
179
180        let mut vec_kinds: BTreeMap<String, VecKindConfig> = BTreeMap::new();
181        {
182            let mut stmt = conn.prepare(
183                "SELECT kind, enabled, source_mode, state, last_error, last_completed_at, updated_at \
184                 FROM vector_index_schemas ORDER BY kind",
185            )?;
186            let rows = stmt.query_map([], |row| {
187                Ok(VecKindConfig {
188                    kind: row.get(0)?,
189                    enabled: row.get::<_, i64>(1)? == 1,
190                    source_mode: row.get(2)?,
191                    state: row.get(3)?,
192                    last_error: row.get(4)?,
193                    last_completed_at: row.get(5)?,
194                    updated_at: row.get(6)?,
195                })
196            })?;
197            for r in rows {
198                let v = r?;
199                vec_kinds.insert(v.kind.clone(), v);
200            }
201        }
202
203        let mut fts_kinds: BTreeMap<String, FtsKindConfig> = BTreeMap::new();
204        {
205            let mut stmt = conn.prepare(
206                "SELECT kind, json_extract(config_json, '$.tokenizer') \
207                 FROM projection_profiles WHERE facet = 'fts' ORDER BY kind",
208            )?;
209            let rows = stmt.query_map([], |row| {
210                Ok((
211                    row.get::<_, String>(0)?,
212                    row.get::<_, Option<String>>(1)?.unwrap_or_default(),
213                ))
214            })?;
215            for r in rows {
216                let (kind, tokenizer) = r?;
217                let property_schema_present: bool = conn
218                    .query_row(
219                        "SELECT 1 FROM fts_property_schemas WHERE kind = ?1",
220                        rusqlite::params![kind],
221                        |_| Ok(true),
222                    )
223                    .optional()?
224                    .unwrap_or(false);
225                fts_kinds.insert(
226                    kind.clone(),
227                    FtsKindConfig {
228                        kind,
229                        tokenizer,
230                        property_schema_present,
231                    },
232                );
233            }
234        }
235
236        let work_queue = aggregate_work_queue(&conn)?;
237
238        Ok(CurrentConfig {
239            active_embedding_profile,
240            vec_kinds,
241            fts_kinds,
242            work_queue,
243        })
244    }
245
246    /// Return a per-kind view: vector config, FTS config, chunk count,
247    /// and vec-row count (if the per-kind vec table exists).
248    ///
249    /// # Errors
250    /// Returns [`EngineError`] on database failure.
251    pub fn describe_kind(&self, kind: &str) -> Result<KindDescription, EngineError> {
252        let conn = self.connect()?;
253
254        let vec: Option<VecKindConfig> = conn
255            .query_row(
256                "SELECT kind, enabled, source_mode, state, last_error, last_completed_at, updated_at \
257                 FROM vector_index_schemas WHERE kind = ?1",
258                rusqlite::params![kind],
259                |row| {
260                    Ok(VecKindConfig {
261                        kind: row.get(0)?,
262                        enabled: row.get::<_, i64>(1)? == 1,
263                        source_mode: row.get(2)?,
264                        state: row.get(3)?,
265                        last_error: row.get(4)?,
266                        last_completed_at: row.get(5)?,
267                        updated_at: row.get(6)?,
268                    })
269                },
270            )
271            .optional()?;
272
273        let fts: Option<FtsKindConfig> = conn
274            .query_row(
275                "SELECT kind, json_extract(config_json, '$.tokenizer') \
276                 FROM projection_profiles WHERE kind = ?1 AND facet = 'fts'",
277                rusqlite::params![kind],
278                |row| {
279                    Ok((
280                        row.get::<_, String>(0)?,
281                        row.get::<_, Option<String>>(1)?.unwrap_or_default(),
282                    ))
283                },
284            )
285            .optional()?
286            .map(|(kind, tokenizer)| {
287                let property_schema_present = conn
288                    .query_row(
289                        "SELECT 1 FROM fts_property_schemas WHERE kind = ?1",
290                        rusqlite::params![&kind],
291                        |_| Ok(true),
292                    )
293                    .optional()
294                    .ok()
295                    .flatten()
296                    .is_some();
297                FtsKindConfig {
298                    kind,
299                    tokenizer,
300                    property_schema_present,
301                }
302            });
303
304        let chunk_count: u64 = conn
305            .query_row(
306                "SELECT count(*) FROM chunks c \
307                 JOIN nodes n ON n.logical_id = c.node_logical_id AND n.superseded_at IS NULL \
308                 WHERE n.kind = ?1",
309                rusqlite::params![kind],
310                |row| row.get::<_, i64>(0),
311            )
312            .map_or(0, i64::cast_unsigned);
313
314        let table_name = fathomdb_schema::vec_kind_table_name(kind);
315        let vec_rows: Option<u64> = table_exists(&conn, &table_name)?
316            .then(|| -> Result<u64, EngineError> {
317                Ok(conn
318                    .query_row(&format!("SELECT count(*) FROM {table_name}"), [], |row| {
319                        row.get::<_, i64>(0)
320                    })
321                    .map(i64::cast_unsigned)?)
322            })
323            .transpose()?;
324
325        let embedding_identity = conn
326            .query_row(
327                "SELECT model_identity FROM vector_embedding_profiles WHERE active = 1",
328                [],
329                |row| row.get::<_, String>(0),
330            )
331            .optional()?;
332
333        Ok(KindDescription {
334            kind: kind.to_owned(),
335            vec,
336            fts,
337            chunk_count,
338            vec_rows,
339            embedding_identity,
340        })
341    }
342}
343
344fn aggregate_work_queue(conn: &rusqlite::Connection) -> Result<WorkQueueSummary, EngineError> {
345    let mut summary = WorkQueueSummary::default();
346    let mut stmt = conn.prepare(
347        "SELECT state, \
348                SUM(CASE WHEN priority >= 1000 THEN 1 ELSE 0 END), \
349                SUM(CASE WHEN priority <  1000 THEN 1 ELSE 0 END), \
350                COUNT(*) \
351         FROM vector_projection_work GROUP BY state",
352    )?;
353    let rows = stmt.query_map([], |row| {
354        Ok((
355            row.get::<_, String>(0)?,
356            row.get::<_, Option<i64>>(1)?.unwrap_or(0),
357            row.get::<_, Option<i64>>(2)?.unwrap_or(0),
358            row.get::<_, i64>(3)?,
359        ))
360    })?;
361    for r in rows {
362        let (state, incr, back, total) = r?;
363        let total_u = i64::cast_unsigned(total);
364        match state.as_str() {
365            "pending" => {
366                summary.pending_incremental = i64::cast_unsigned(incr);
367                summary.pending_backfill = i64::cast_unsigned(back);
368            }
369            "inflight" => summary.inflight = total_u,
370            "failed" => summary.failed = total_u,
371            "discarded" => summary.discarded = total_u,
372            _ => {}
373        }
374    }
375    Ok(summary)
376}
377
378fn table_exists(conn: &rusqlite::Connection, name: &str) -> Result<bool, EngineError> {
379    let exists: Option<i64> = conn
380        .query_row(
381            "SELECT 1 FROM sqlite_master WHERE type IN ('table','view') AND name = ?1",
382            rusqlite::params![name],
383            |row| row.get(0),
384        )
385        .optional()?;
386    Ok(exists.is_some())
387}
388
389#[cfg(feature = "default-embedder")]
390fn builtin_embedder_capability() -> EmbedderCapability {
391    use crate::embedder::{BatchEmbedder, BuiltinBgeSmallEmbedder};
392    let embedder = BuiltinBgeSmallEmbedder::new();
393    let id = BatchEmbedder::identity(&embedder);
394    EmbedderCapability {
395        available: true,
396        model_identity: Some(id.model_identity),
397        dimensions: Some(id.dimension),
398        max_tokens: Some(BatchEmbedder::max_tokens(&embedder)),
399    }
400}
401
402#[cfg(not(feature = "default-embedder"))]
403fn builtin_embedder_capability() -> EmbedderCapability {
404    EmbedderCapability {
405        available: false,
406        model_identity: None,
407        dimensions: None,
408        max_tokens: None,
409    }
410}