pub mod delta;
pub mod eval;
pub mod molecule;
pub mod store;
pub use delta::{Delta, LogicalTime};
pub use molecule::{Molecule, MoleculeBuilder, PrimaryKey};
pub use store::{Arrangement, MoleculeStore};
use crate::actors::ActorRegistry;
use crate::value::Value;
use crate::verify::VerifiedProgram;
use crate::Error;
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
pub struct Reactor {
store: Arc<MoleculeStore>,
program: Arc<VerifiedProgram>,
actor_registry: Arc<ActorRegistry>,
delta_tx: mpsc::Sender<Delta>,
delta_rx: mpsc::Receiver<Delta>,
clock: AtomicU64,
}
impl Reactor {
pub fn new(program: VerifiedProgram) -> Self {
Self::with_actors(program, ActorRegistry::with_builtins())
}
pub fn with_actors(program: VerifiedProgram, actor_registry: ActorRegistry) -> Self {
let (delta_tx, delta_rx) = mpsc::channel(1024);
let actor_registry = Arc::new(actor_registry);
let type_registry = program.type_registry().clone();
for entry in actor_registry.effects_by_name.iter() {
if let Some(id) = type_registry.id_by_name(entry.key()) {
actor_registry.effects_by_id.insert(id, entry.value().clone());
}
}
for entry in actor_registry.sources_by_name.iter() {
if let Some(id) = type_registry.id_by_name(entry.key()) {
actor_registry.sources_by_id.insert(id, entry.value().clone());
}
}
Self {
store: Arc::new(MoleculeStore::new(type_registry)),
program: Arc::new(program),
actor_registry,
delta_tx,
delta_rx,
clock: AtomicU64::new(0),
}
}
pub fn handle(&self) -> ReactorHandle {
ReactorHandle {
delta_tx: self.delta_tx.clone(),
}
}
pub fn store(&self) -> Arc<MoleculeStore> {
self.store.clone()
}
pub fn llm_providers(&self) -> &Arc<crate::llm::LlmProviderRegistry> {
&self.actor_registry.llm_providers
}
pub async fn run(mut self) -> Result<(), Error> {
let registry = self.program.type_registry().clone();
let cancel = CancellationToken::new();
let boot = build_boot_molecule(®istry)?;
if let Err(e) = self.delta_tx.send(Delta::Insert(boot)).await {
tracing::error!(error = %e, "failed to enqueue Boot molecule");
}
while let Some(mut delta) = self.delta_rx.recv().await {
if let Delta::Insert(ref mut m) = delta {
if m.kind.0 == u32::MAX {
let id = registry.id_by_name(&m.kind_name).ok_or_else(|| {
Error::Runtime(format!("unknown molecule kind {}", m.kind_name))
})?;
m.kind = id;
}
let ts = LogicalTime(self.clock.fetch_add(1, Ordering::Relaxed));
m.ts = ts;
}
if let Delta::Insert(ref m) = delta {
if m.kind_name == "LlmProvider" {
self.register_llm_provider_from_molecule(m)?;
continue;
}
}
let merge_registry = registry.clone();
let result = self.store.apply(&delta, |merge, old, new| {
eval::eval_merge(merge, old, new, &merge_registry)
})?;
if let Delta::Insert(ref m) = delta {
if matches!(result, store::ApplyResult::Inserted) {
self.maybe_dispatch_source(m, cancel.clone());
}
self.maybe_dispatch_effect(m);
self.evaluate_reactions_for_delta(&delta, ®istry).await?;
}
}
Ok(())
}
fn maybe_dispatch_source(&self, m: &Molecule, cancel: CancellationToken) {
let actor = match self
.actor_registry
.sources_by_id
.get(&m.kind)
.map(|r| r.value().clone())
{
Some(a) => a,
None => return,
};
let molecule = m.clone();
let kind_name = m.kind_name.clone();
let handle = self.handle();
tokio::spawn(async move {
if let Err(e) = actor.run(molecule, handle, cancel).await {
tracing::error!(error = %e, kind = %kind_name, "source actor failed");
}
});
}
fn maybe_dispatch_effect(&self, m: &Molecule) {
let actor = match self
.actor_registry
.effects_by_id
.get(&m.kind)
.map(|r| r.value().clone())
{
Some(a) => a,
None => return,
};
let is_pending = match m.fields.get("status") {
None => true,
Some(Value::String(s)) => s == "Pending",
_ => false,
};
if !is_pending {
return;
}
let molecule = m.clone();
let kind_name = m.kind_name.clone();
let handle = self.handle();
tokio::spawn(async move {
if let Err(e) = actor.run(molecule, handle).await {
tracing::error!(error = %e, kind = %kind_name, "effect actor failed");
}
});
}
async fn evaluate_reactions_for_delta(
&self,
delta: &Delta,
registry: &Arc<store::TypeRegistry>,
) -> Result<(), Error> {
let m = match delta {
Delta::Insert(m) => m,
Delta::Retract(_, _, _) => return Ok(()),
};
let triggers = self.program.reactions_for(m.kind).to_vec();
'next_reaction: for (ridx, pin) in triggers {
let reaction = &self.program.reactions()[ridx];
let mut candidates: Vec<(String, Vec<Molecule>)> =
Vec::with_capacity(reaction.when.len().saturating_sub(1));
for (i, pat) in reaction.when.iter().enumerate() {
if i == pin {
continue;
}
let kind = registry.id_by_name(&pat.molecule_name).ok_or_else(|| {
Error::Runtime(format!("unknown molecule {}", pat.molecule_name))
})?;
let bucket = self.store.scan(kind);
if bucket.is_empty() {
continue 'next_reaction;
}
candidates.push((pat.binding.clone(), bucket));
}
for combo in cartesian_product(&candidates) {
let mut ctx = eval::EvalCtx::default()
.bind(&reaction.when[pin].binding, m.clone());
for (name, molecule) in combo {
ctx = ctx.bind(name, molecule);
}
self.evaluate_reaction_body(reaction, ctx, registry).await?;
}
}
let rollup_triggers = self.program.rollup_reactions_for(m.kind).to_vec();
'next_rollup_reaction: for ridx in rollup_triggers {
let reaction = &self.program.reactions()[ridx];
let mut candidates: Vec<(String, Vec<Molecule>)> =
Vec::with_capacity(reaction.when.len());
for pat in &reaction.when {
let kind = registry.id_by_name(&pat.molecule_name).ok_or_else(|| {
Error::Runtime(format!("unknown molecule {}", pat.molecule_name))
})?;
let bucket = self.store.scan(kind);
if bucket.is_empty() {
continue 'next_rollup_reaction;
}
candidates.push((pat.binding.clone(), bucket));
}
for combo in cartesian_product(&candidates) {
let mut ctx = eval::EvalCtx::default();
for (name, molecule) in combo {
ctx = ctx.bind(name, molecule);
}
self.evaluate_reaction_body(reaction, ctx, registry).await?;
}
}
Ok(())
}
async fn evaluate_reaction_body(
&self,
reaction: &crate::syntax::ast::ReactionDecl,
mut ctx: eval::EvalCtx,
registry: &Arc<store::TypeRegistry>,
) -> Result<(), Error> {
if let Some(rollup) = &reaction.rollup {
let kind = registry
.id_by_name(&rollup.molecule_name)
.ok_or_else(|| {
Error::Runtime(format!("unknown rollup molecule {}", rollup.molecule_name))
})?;
let candidates = self.store.scan(kind);
let mut group: Vec<Molecule> = Vec::new();
for cand in candidates {
let saved = ctx.bindings.insert(rollup.binding.clone(), cand.clone());
let matched = match eval::eval_expr(&rollup.predicate, &ctx)? {
Value::Bool(b) => b,
other => {
return Err(Error::Runtime(format!(
"rollup predicate in reaction `{}` returned {}, expected Bool",
reaction.name,
other.type_name()
)))
}
};
if let Some(prev) = saved {
ctx.bindings.insert(rollup.binding.clone(), prev);
} else {
ctx.bindings.remove(&rollup.binding);
}
if matched {
group.push(cand);
}
}
ctx = ctx.bind_group(&rollup.binding, group);
}
if let Some(where_expr) = &reaction.where_clause {
match eval::eval_expr(where_expr, &ctx)? {
Value::Bool(true) => {}
Value::Bool(false) => return Ok(()),
other => {
return Err(Error::Runtime(format!(
"where clause in reaction `{}` returned {}, expected Bool",
reaction.name,
other.type_name()
)))
}
}
}
for emit in &reaction.emit {
let emitted = eval::eval_emit(emit, &ctx, registry)?;
if let Err(e) = self.delta_tx.send(Delta::Insert(emitted)).await {
tracing::warn!(error = %e, "delta channel closed");
}
}
Ok(())
}
}
fn cartesian_product<T: Clone>(lists: &[(String, Vec<T>)]) -> Vec<Vec<(String, T)>> {
let mut result: Vec<Vec<(String, T)>> = vec![Vec::new()];
for (name, items) in lists {
let mut next = Vec::with_capacity(result.len() * items.len());
for prefix in &result {
for item in items {
let mut combo = prefix.clone();
combo.push((name.clone(), item.clone()));
next.push(combo);
}
}
result = next;
}
result
}
#[derive(Clone)]
pub struct ReactorHandle {
delta_tx: mpsc::Sender<Delta>,
}
impl ReactorHandle {
pub async fn emit(&self, m: Molecule) -> Result<(), Error> {
self.delta_tx
.send(Delta::Insert(m))
.await
.map_err(|_| Error::ChannelClosed)
}
}
impl Reactor {
fn register_llm_provider_from_molecule(&self, m: &Molecule) -> Result<(), Error> {
use crate::llm::{CodexResponsesProvider, MockLlmProvider, TokenFileProvider};
let name = m
.fields
.get("name")
.ok_or_else(|| Error::Runtime("LlmProvider missing name".into()))?
.as_string()?
.to_string();
let kind = m
.fields
.get("kind")
.map(|v| v.as_string().map(|s| s.to_string()))
.transpose()?
.unwrap_or_else(|| "openai_compat".to_string());
let optional_string = |key: &str| -> Result<Option<String>, Error> {
match m.fields.get(key) {
Some(Value::String(s)) => Ok(Some(s.clone())),
Some(Value::Null) | None => Ok(None),
Some(other) => Err(Error::Runtime(format!(
"LlmProvider.{key} must be String or null, got {}",
other.type_name()
))),
}
};
let provider: std::sync::Arc<dyn crate::llm::LlmProvider> = match kind.as_str() {
"mock" => std::sync::Arc::new(MockLlmProvider),
"openai_compat" => {
let base_url = optional_string("base_url")?.ok_or_else(|| {
Error::Runtime(format!(
"LlmProvider `{name}` (kind: openai_compat) requires base_url"
))
})?;
let token_file = optional_string("token_file")?.ok_or_else(|| {
Error::Runtime(format!(
"LlmProvider `{name}` (kind: openai_compat) requires token_file"
))
})?;
let token_jq = optional_string("token_jq")?;
let mut provider = TokenFileProvider::new(
name.clone(),
base_url,
crate::llm::expand_user_path(&token_file),
);
if let Some(jq) = token_jq {
provider = provider.with_json_path(jq);
}
std::sync::Arc::new(provider)
}
"codex_responses" => {
let base_url = optional_string("base_url")?.ok_or_else(|| {
Error::Runtime(format!(
"LlmProvider `{name}` (kind: codex_responses) requires base_url"
))
})?;
let token_file = optional_string("token_file")?.ok_or_else(|| {
Error::Runtime(format!(
"LlmProvider `{name}` (kind: codex_responses) requires token_file"
))
})?;
let token_jq = optional_string("token_jq")?;
let mut provider = CodexResponsesProvider::new(
name.clone(),
base_url,
crate::llm::expand_user_path(&token_file),
);
if let Some(jq) = token_jq {
provider = provider.with_json_path(jq);
}
std::sync::Arc::new(provider)
}
other => {
return Err(Error::Runtime(format!(
"LlmProvider `{name}` has unknown kind `{other}` (supported: mock, openai_compat)"
)))
}
};
self.actor_registry.llm_providers.register(provider);
tracing::info!(provider = %name, kind = %kind, "registered LlmProvider via startup molecule");
Ok(())
}
}
fn build_boot_molecule(registry: &Arc<store::TypeRegistry>) -> Result<Molecule, Error> {
let kind = registry
.id_by_name("Boot")
.ok_or_else(|| Error::Runtime("Boot molecule schema missing".into()))?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
let mut fields: BTreeMap<String, Value> = BTreeMap::new();
fields.insert("ts".into(), Value::Timestamp(now));
Ok(Molecule {
kind,
kind_name: "Boot".into(),
fields,
ts: LogicalTime(0),
})
}