use crate::engine::compiled_scorer::CompiledScorer;
use crate::engine::config::EngineConfig;
use crate::engine::ingest::{Ingested, Outcome};
use crate::errors::IngestErr;
use crate::event::{ActionIngest, KindRef, StateUpdate, Trigger};
use crate::location::action_entry::ActionEntry;
use crate::location::table::{LocationTable, ReloadErr};
use crate::location::LocationDef;
use crate::metrics::EngineMetrics;
use crate::scoring::{Candidate, ScoreResult, ScorerSpec, VectorMetric};
use crate::{ActionId, LocId, MetricsSnapshot, Schema};
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub struct Engine<T: Clone + Send + Sync + 'static> {
pub(crate) schema: Arc<Schema>,
pub(crate) table: Arc<LocationTable<T>>,
pub(crate) config: EngineConfig,
pub(crate) metrics: Arc<EngineMetrics>,
}
impl<T: Clone + Send + Sync + 'static> Engine<T> {
#[must_use]
pub const fn schema(&self) -> &Arc<Schema> {
&self.schema
}
#[must_use]
pub fn location_count(&self) -> usize {
self.table.total_locations()
}
pub fn reload_all(&self, defs: impl IntoIterator<Item = LocationDef>) -> Result<(), ReloadErr> {
self.table.reload_all(defs)
}
pub fn upsert_location(&self, def: &LocationDef) -> Result<(), ReloadErr> {
self.table.upsert(def)
}
pub fn remove_location(&self, id: LocId) -> bool {
self.table.remove(id)
}
#[allow(clippy::needless_pass_by_value)]
pub fn ingest_update(&self, e: StateUpdate<'_>) -> Ingested<T> {
if self.table.reload_in_progress.load(Ordering::Acquire) {
return Ingested::ReloadInProgress;
}
let Some(state_arc) = self.table.get(e.location) else {
return Ingested::Rejected(IngestErr::UnknownLocation(e.location));
};
let ok = state_arc.lock().apply_update(e.kind, &e.attrs);
if ok {
self.metrics.updates.fetch_add(1, Ordering::Relaxed);
Ingested::Updated
} else {
Ingested::Rejected(IngestErr::UnknownKind(describe_kind(&e.kind)))
}
}
pub fn ingest_action(&self, e: ActionIngest<'_, T>) -> Ingested<T> {
if self.table.reload_in_progress.load(Ordering::Acquire) {
return Ingested::ReloadInProgress;
}
let Some(state_arc) = self.table.get(e.location) else {
return Ingested::Rejected(IngestErr::UnknownLocation(e.location));
};
let scorer: CompiledScorer = match &e.scorer {
ScorerSpec::Predicate(src) => {
let ast =
match crate::scoring::backends::predicate::parser::Parser::new(src).parse() {
Ok(a) => a,
Err(err) => {
return Ingested::Rejected(IngestErr::ScorerBuild(
crate::scoring::BuildErr::Parse(err),
))
}
};
let program = match crate::scoring::backends::predicate::typecheck::compile(
&ast,
&self.schema,
) {
Ok(p) => p,
Err(err) => return Ingested::Rejected(IngestErr::ScorerBuild(err)),
};
CompiledScorer::Predicate(program)
}
ScorerSpec::Vector { target, metric } => {
if target.is_empty() {
return Ingested::Rejected(IngestErr::ScorerBuild(
crate::scoring::BuildErr::Vector("empty target vector".into()),
));
}
if target.len() > crate::schema::attr::MAX_EMBEDDING_DIM {
return Ingested::Rejected(IngestErr::ScorerBuild(
crate::scoring::BuildErr::Vector(format!(
"target vector dim {} exceeds MAX_EMBEDDING_DIM ({})",
target.len(),
crate::schema::attr::MAX_EMBEDDING_DIM,
)),
));
}
if self.config.embedding_slot.is_none() {
return Ingested::Rejected(IngestErr::ScorerBuild(
crate::scoring::BuildErr::Vector(
"vector scorer used but engine has no embedding slot configured".into(),
),
));
}
CompiledScorer::VectorLinear {
target: target.to_vec().into_boxed_slice(),
metric: *metric,
}
}
};
let entry = ActionEntry {
action_id: e.action_id.clone(),
start: e.start,
end: e.end,
priority: e.priority,
scorer,
payload: e.payload,
post: e.post,
};
let mut st = state_arc.lock();
let idx = st.actions.partition_point(|a| a.end <= entry.end);
st.actions.insert(idx, entry);
drop(st);
self.metrics.registrations.fetch_add(1, Ordering::Relaxed);
Ingested::Registered(e.action_id)
}
#[allow(clippy::too_many_lines)]
#[allow(clippy::needless_pass_by_value)]
pub fn ingest_trigger(&self, e: Trigger<'_>) -> Ingested<T> {
let t0 = std::time::Instant::now();
if self.table.reload_in_progress.load(Ordering::Acquire) {
return Ingested::ReloadInProgress;
}
let Some(state_arc) = self.table.get(e.location) else {
return Ingested::Rejected(IngestErr::UnknownLocation(e.location));
};
let now = self.config.clock.now();
let mut st = state_arc.lock();
st.expire(now);
if st.actions.is_empty() {
let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
self.metrics.record_decide_latency_ns(elapsed_ns);
return Ingested::NoWinner;
}
let arr_buf_ref: &[u8] = &st.arr_buf;
let view = st.view();
let mut cands: smallvec::SmallVec<[Candidate; 16]> = smallvec::SmallVec::new();
for a in &st.actions {
if a.start > now {
continue;
}
let s = match &a.scorer {
CompiledScorer::Predicate(prog) => {
crate::scoring::backends::predicate::vm::run(prog, &view)
}
CompiledScorer::VectorLinear { target, metric } => {
self.config
.embedding_slot
.map_or(f32::NEG_INFINITY, |(k, attr)| {
view.read_f32_arr_in(arr_buf_ref, k, attr).map_or(
f32::NEG_INFINITY,
|v| match metric {
VectorMetric::Dot => {
crate::scoring::vector::linear::dot(v, target)
}
VectorMetric::Cosine => {
crate::scoring::vector::linear::cosine(v, target)
}
},
)
})
}
};
cands.push(Candidate {
action_id: a.action_id.clone(),
score: ScoreResult {
priority: a.priority,
score: s,
},
});
}
if cands.is_empty() {
let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
self.metrics.record_decide_latency_ns(elapsed_ns);
return Ingested::NoWinner;
}
let Some(idx) = self.config.decider.decide(&cands) else {
let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
self.metrics.record_decide_latency_ns(elapsed_ns);
return Ingested::NoWinner;
};
let winning = &st.actions[find_by_action_id(&st.actions, &cands[idx].action_id)];
let payload = winning.post.as_ref().map_or_else(
|| winning.payload.clone(),
|f| f(&winning.payload, &st.view()),
);
drop(st);
let elapsed_ns = u64::try_from(t0.elapsed().as_nanos()).unwrap_or(u64::MAX);
self.metrics.triggers.fetch_add(1, Ordering::Relaxed);
self.metrics.record_decide_latency_ns(elapsed_ns);
Ingested::Decided(Outcome {
action_id: cands[idx].action_id.clone(),
score: cands[idx].score,
payload,
})
}
#[must_use]
pub fn metrics(&self) -> MetricsSnapshot {
self.metrics.snapshot()
}
}
impl<T: Clone + Send + Sync + 'static> Engine<T> {
#[must_use]
pub fn builder() -> crate::engine::builder::EngineBuilder<T> {
crate::engine::builder::EngineBuilder::new()
}
}
fn describe_kind(k: &KindRef<'_>) -> String {
match k {
KindRef::Id(id) => format!("{id:?}"),
KindRef::Name(n) => (*n).to_owned(),
}
}
#[allow(clippy::expect_used)]
fn find_by_action_id<T>(entries: &[ActionEntry<T>], id: &ActionId) -> usize {
entries
.iter()
.position(|a| &a.action_id == id)
.expect("winner must be in the scored set")
}
#[doc(hidden)]
impl<T: Clone + Send + Sync + 'static> Engine<T> {
#[doc(hidden)]
pub fn __test_set_reload(&self, v: bool) {
self.table
.reload_in_progress
.store(v, std::sync::atomic::Ordering::Release);
}
}