kitt_score 0.1.0

Decision engine at the core of Project KITT — in-memory stateful matching with pluggable scoring backends.
Documentation
//! The `Engine<T>` — holds the location table, schema, config; implements
//! the three ingest methods.

use crate::engine::compiled_scorer::CompiledScorer;
use crate::engine::config::EngineConfig;
use crate::engine::ingest::{Ingested, Outcome};
use crate::errors::IngestErr;
use crate::event::{ActionIngest, KindRef, StateUpdate, Trigger};
use crate::location::action_entry::ActionEntry;
use crate::location::table::{LocationTable, ReloadErr};
use crate::location::LocationDef;
use crate::metrics::EngineMetrics;
use crate::scoring::{Candidate, ScoreResult, ScorerSpec, VectorMetric};
use crate::{ActionId, LocId, MetricsSnapshot, Schema};
use std::sync::atomic::Ordering;
use std::sync::Arc;

/// The decision engine: holds the location table, schema, and config;
/// dispatches `StateUpdate`, `ActionIngest`, and `Trigger` events.
pub struct Engine<T: Clone + Send + Sync + 'static> {
    pub(crate) schema: Arc<Schema>,
    pub(crate) table: Arc<LocationTable<T>>,
    pub(crate) config: EngineConfig,
    pub(crate) metrics: Arc<EngineMetrics>,
}

impl<T: Clone + Send + Sync + 'static> Engine<T> {
    /// Returns a reference to the schema this engine was built with.
    #[must_use]
    pub const fn schema(&self) -> &Arc<Schema> {
        &self.schema
    }

    /// Returns the total number of locations currently in the table.
    #[must_use]
    pub fn location_count(&self) -> usize {
        self.table.total_locations()
    }

    /// Replaces the entire location table atomically from an iterator of defs.
    ///
    /// # Errors
    ///
    /// Returns [`ReloadErr`] if any def contains an unresolvable reference attribute.
    pub fn reload_all(&self, defs: impl IntoIterator<Item = LocationDef>) -> Result<(), ReloadErr> {
        self.table.reload_all(defs)
    }

    /// Inserts or replaces a single location in the table.
    ///
    /// # Errors
    ///
    /// Returns [`ReloadErr`] if any reference attribute on the def cannot be
    /// resolved through the schema.
    pub fn upsert_location(&self, def: &LocationDef) -> Result<(), ReloadErr> {
        self.table.upsert(def)
    }

    /// Removes a location by ID. Returns `true` if the location was present.
    pub fn remove_location(&self, id: LocId) -> bool {
        self.table.remove(id)
    }

    /// Applies a `StateUpdate` event to the named location.
    ///
    /// Passing by value is correct — `StateUpdate` contains a `SmallVec` with
    /// inline storage; taking ownership avoids an extra copy.
    #[allow(clippy::needless_pass_by_value)]
    pub fn ingest_update(&self, e: StateUpdate<'_>) -> Ingested<T> {
        if self.table.reload_in_progress.load(Ordering::Acquire) {
            return Ingested::ReloadInProgress;
        }
        let Some(state_arc) = self.table.get(e.location) else {
            return Ingested::Rejected(IngestErr::UnknownLocation(e.location));
        };
        let ok = state_arc.lock().apply_update(e.kind, &e.attrs);
        if ok {
            self.metrics.updates.fetch_add(1, Ordering::Relaxed);
            Ingested::Updated
        } else {
            Ingested::Rejected(IngestErr::UnknownKind(describe_kind(&e.kind)))
        }
    }

    /// Registers a new action at a location.
    ///
    /// The scorer is compiled from the `ScorerSpec` at registration time.
    /// Passing `ActionIngest` by value is correct — it carries an owned
    /// payload `T` and an owned `ScorerSpec`.
    pub fn ingest_action(&self, e: ActionIngest<'_, T>) -> Ingested<T> {
        if self.table.reload_in_progress.load(Ordering::Acquire) {
            return Ingested::ReloadInProgress;
        }
        let Some(state_arc) = self.table.get(e.location) else {
            return Ingested::Rejected(IngestErr::UnknownLocation(e.location));
        };

        // Compile the scorer spec into an owned `CompiledScorer`.
        let scorer: CompiledScorer = match &e.scorer {
            ScorerSpec::Predicate(src) => {
                let ast =
                    match crate::scoring::backends::predicate::parser::Parser::new(src).parse() {
                        Ok(a) => a,
                        Err(err) => {
                            return Ingested::Rejected(IngestErr::ScorerBuild(
                                crate::scoring::BuildErr::Parse(err),
                            ))
                        }
                    };
                let program = match crate::scoring::backends::predicate::typecheck::compile(
                    &ast,
                    &self.schema,
                ) {
                    Ok(p) => p,
                    Err(err) => return Ingested::Rejected(IngestErr::ScorerBuild(err)),
                };
                CompiledScorer::Predicate(program)
            }
            ScorerSpec::Vector { target, metric } => {
                if target.is_empty() {
                    return Ingested::Rejected(IngestErr::ScorerBuild(
                        crate::scoring::BuildErr::Vector("empty target vector".into()),
                    ));
                }
                if target.len() > crate::schema::attr::MAX_EMBEDDING_DIM {
                    return Ingested::Rejected(IngestErr::ScorerBuild(
                        crate::scoring::BuildErr::Vector(format!(
                            "target vector dim {} exceeds MAX_EMBEDDING_DIM ({})",
                            target.len(),
                            crate::schema::attr::MAX_EMBEDDING_DIM,
                        )),
                    ));
                }
                if self.config.embedding_slot.is_none() {
                    return Ingested::Rejected(IngestErr::ScorerBuild(
                        crate::scoring::BuildErr::Vector(
                            "vector scorer used but engine has no embedding slot configured".into(),
                        ),
                    ));
                }
                CompiledScorer::VectorLinear {
                    target: target.to_vec().into_boxed_slice(),
                    metric: *metric,
                }
            }
        };

        let entry = ActionEntry {
            action_id: e.action_id.clone(),
            start: e.start,
            end: e.end,
            priority: e.priority,
            scorer,
            payload: e.payload,
            post: e.post,
        };

        let mut st = state_arc.lock();
        // Keep entries sorted by end ascending so expire() can drain from the front.
        let idx = st.actions.partition_point(|a| a.end <= entry.end);
        st.actions.insert(idx, entry);
        drop(st);
        self.metrics.registrations.fetch_add(1, Ordering::Relaxed);
        Ingested::Registered(e.action_id)
    }

    /// Fires a trigger: expires stale actions, scores all live ones, and
    /// returns the winning payload (with post-closure applied if present).
    ///
    /// # Allow: `too_many_lines`
    // hot path kept in one function for branch-prediction friendliness
    #[allow(clippy::too_many_lines)]
    #[allow(clippy::needless_pass_by_value)]
    pub fn ingest_trigger(&self, e: Trigger<'_>) -> Ingested<T> {
        let t0 = std::time::Instant::now();

        if self.table.reload_in_progress.load(Ordering::Acquire) {
            return Ingested::ReloadInProgress;
        }
        let Some(state_arc) = self.table.get(e.location) else {
            return Ingested::Rejected(IngestErr::UnknownLocation(e.location));
        };

        let now = self.config.clock.now();
        let mut st = state_arc.lock();
        st.expire(now);

        if st.actions.is_empty() {
            let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
            self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
            self.metrics.record_decide_latency_ns(elapsed_ns);
            return Ingested::NoWinner;
        }

        // Borrow arr_buf before view() so both can coexist as immutable borrows
        // of separate fields.
        let arr_buf_ref: &[u8] = &st.arr_buf;
        let view = st.view();

        let mut cands: smallvec::SmallVec<[Candidate; 16]> = smallvec::SmallVec::new();
        for a in &st.actions {
            if a.start > now {
                continue;
            }
            let s = match &a.scorer {
                CompiledScorer::Predicate(prog) => {
                    crate::scoring::backends::predicate::vm::run(prog, &view)
                }
                CompiledScorer::VectorLinear { target, metric } => {
                    // embedding_slot is validated at registration; if somehow None here,
                    // score as NEG_INFINITY rather than panic.
                    self.config
                        .embedding_slot
                        .map_or(f32::NEG_INFINITY, |(k, attr)| {
                            view.read_f32_arr_in(arr_buf_ref, k, attr).map_or(
                                f32::NEG_INFINITY,
                                |v| match metric {
                                    VectorMetric::Dot => {
                                        crate::scoring::vector::linear::dot(v, target)
                                    }
                                    VectorMetric::Cosine => {
                                        crate::scoring::vector::linear::cosine(v, target)
                                    }
                                },
                            )
                        })
                }
            };
            cands.push(Candidate {
                action_id: a.action_id.clone(),
                score: ScoreResult {
                    priority: a.priority,
                    score: s,
                },
            });
        }
        if cands.is_empty() {
            let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
            self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
            self.metrics.record_decide_latency_ns(elapsed_ns);
            return Ingested::NoWinner;
        }

        let Some(idx) = self.config.decider.decide(&cands) else {
            let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
            self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
            self.metrics.record_decide_latency_ns(elapsed_ns);
            return Ingested::NoWinner;
        };

        // Recompute view for post-closure; `st.view()` borrows `st` so we
        // produce a fresh one.
        let winning = &st.actions[find_by_action_id(&st.actions, &cands[idx].action_id)];
        let payload = winning.post.as_ref().map_or_else(
            || winning.payload.clone(),
            |f| f(&winning.payload, &st.view()),
        );
        drop(st);

        let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
        self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
        self.metrics.record_decide_latency_ns(elapsed_ns);

        Ingested::Decided(Outcome {
            action_id: cands[idx].action_id.clone(),
            score: cands[idx].score,
            payload,
        })
    }

    /// Returns a point-in-time snapshot of engine metrics.
    ///
    /// Counters are read with `Ordering::Relaxed`; the snapshot is inherently
    /// racy across atoms, which is acceptable for observability data.
    #[must_use]
    pub fn metrics(&self) -> MetricsSnapshot {
        self.metrics.snapshot()
    }
}

impl<T: Clone + Send + Sync + 'static> Engine<T> {
    /// Returns a fresh [`crate::engine::builder::EngineBuilder`] for constructing an engine.
    #[must_use]
    pub fn builder() -> crate::engine::builder::EngineBuilder<T> {
        crate::engine::builder::EngineBuilder::new()
    }
}

fn describe_kind(k: &KindRef<'_>) -> String {
    match k {
        KindRef::Id(id) => format!("{id:?}"),
        KindRef::Name(n) => (*n).to_owned(),
    }
}

/// Locate the index of a winning action entry by its action ID.
///
/// # Panics
///
/// Panics if `id` is not found — this is a crate-internal invariant: the
/// decider returns an index into `cands`, which was built from `st.actions`,
/// so the winner must always be present.
#[allow(clippy::expect_used)]
fn find_by_action_id<T>(entries: &[ActionEntry<T>], id: &ActionId) -> usize {
    entries
        .iter()
        .position(|a| &a.action_id == id)
        .expect("winner must be in the scored set")
}

#[doc(hidden)]
impl<T: Clone + Send + Sync + 'static> Engine<T> {
    /// Test-only helper — do NOT call in production. Sets the `reload_in_progress` flag.
    #[doc(hidden)]
    pub fn __test_set_reload(&self, v: bool) {
        self.table
            .reload_in_progress
            .store(v, std::sync::atomic::Ordering::Release);
    }
}