thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
use super::delta::Delta;
use super::molecule::{Molecule, PrimaryKey};
use crate::syntax::ast::MergeFn;
use crate::value::{MoleculeKindId, Type, Value};
use crate::Error;
use dashmap::DashMap;
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::sync::Arc;

pub struct MoleculeStore {
    arrangements: DashMap<MoleculeKindId, Arrangement>,
    type_registry: Arc<TypeRegistry>,
}

/// Outcome of `MoleculeStore::apply` for an Insert delta. `Inserted` means
/// the primary key was previously empty; `Merged` means the merge clause
/// produced a new value; `NoOp` means the same content was already present
/// (idempotent dedup). Lets the reactor distinguish "first time we've seen
/// this molecule" from "we've seen it before" — useful for one-shot
/// dispatchers like Source actors.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum ApplyResult {
    Inserted,
    Merged,
    NoOp,
}

impl MoleculeStore {
    pub fn new(type_registry: Arc<TypeRegistry>) -> Self {
        Self {
            arrangements: DashMap::new(),
            type_registry,
        }
    }

    pub fn type_registry(&self) -> &Arc<TypeRegistry> {
        &self.type_registry
    }

    /// Apply a delta. For Insert, if a molecule already exists at the same
    /// (kind, primary_key) and the schema declares a merge function, evaluate
    /// it via `merge_eval` and store the result.
    pub fn apply<F>(&self, delta: &Delta, merge_eval: F) -> Result<ApplyResult, Error>
    where
        F: FnOnce(&MergeFn, &Molecule, &Molecule) -> Result<Molecule, Error>,
    {
        match delta {
            Delta::Insert(m) => {
                let pk = self.primary_key_of(m)?;
                let kind = m.kind;
                let schema = self
                    .type_registry
                    .schema_by_id(kind)
                    .ok_or_else(|| Error::Runtime(format!("unknown kind for {}", m.kind_name)))?;

                let arrangement = self
                    .arrangements
                    .entry(kind)
                    .or_insert_with(Arrangement::new);
                let mut by_pk = arrangement.by_pk.write();

                let (new_molecule, result) = if let Some(existing) = by_pk.get(&pk) {
                    if existing.fields == m.fields {
                        return Ok(ApplyResult::NoOp);
                    }
                    if let Some(merge) = &schema.merge {
                        (merge_eval(merge, existing, m)?, ApplyResult::Merged)
                    } else {
                        return Err(Error::Runtime(format!(
                            "primary-key conflict on {} (no merge clause)",
                            m.kind_name
                        )));
                    }
                } else {
                    (m.clone(), ApplyResult::Inserted)
                };
                by_pk.insert(pk, new_molecule);
                Ok(result)
            }
            Delta::Retract(kind, pk, _ts) => {
                if let Some(arrangement) = self.arrangements.get(kind) {
                    arrangement.by_pk.write().remove(pk);
                }
                Ok(ApplyResult::Inserted) // retraction isn't a Source-dispatch trigger
            }
        }
    }

    pub fn scan(&self, kind: MoleculeKindId) -> Vec<Molecule> {
        self.arrangements
            .get(&kind)
            .map(|a| a.by_pk.read().values().cloned().collect())
            .unwrap_or_default()
    }

    pub fn get(&self, kind: MoleculeKindId, pk: &PrimaryKey) -> Option<Molecule> {
        self.arrangements
            .get(&kind)
            .and_then(|a| a.by_pk.read().get(pk).cloned())
    }

    pub fn get_singleton(&self, kind_name: &str) -> Option<Molecule> {
        let kind = self.type_registry.id_by_name(kind_name)?;
        self.arrangements
            .get(&kind)
            .and_then(|a| a.by_pk.read().values().next().cloned())
    }

    pub fn scan_by_name(&self, kind_name: &str) -> Vec<Molecule> {
        match self.type_registry.id_by_name(kind_name) {
            Some(kind) => self.scan(kind),
            None => Vec::new(),
        }
    }

    fn primary_key_of(&self, m: &Molecule) -> Result<PrimaryKey, Error> {
        let schema = self.type_registry.schema_by_id(m.kind).ok_or_else(|| {
            Error::Runtime(format!("unknown kind for {}", m.kind_name))
        })?;
        let mut parts = Vec::with_capacity(schema.primary_key.len());
        for field in &schema.primary_key {
            let v = m.fields.get(field).cloned().ok_or_else(|| {
                Error::Runtime(format!(
                    "{} missing primary-key field {field}",
                    m.kind_name
                ))
            })?;
            parts.push(v);
        }
        Ok(PrimaryKey(parts))
    }
}

pub struct Arrangement {
    pub by_pk: RwLock<BTreeMap<PrimaryKey, Molecule>>,
}

impl Arrangement {
    fn new() -> Self {
        Self {
            by_pk: RwLock::new(BTreeMap::new()),
        }
    }
}

#[derive(Default)]
pub struct TypeRegistry {
    inner: parking_lot::RwLock<TypeRegistryInner>,
}

#[derive(Default)]
struct TypeRegistryInner {
    by_id: BTreeMap<MoleculeKindId, MoleculeSchema>,
    by_name: BTreeMap<String, MoleculeKindId>,
    next_id: u32,
}

impl TypeRegistry {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn register(&self, schema: MoleculeSchema) -> MoleculeKindId {
        let mut inner = self.inner.write();
        if let Some(existing) = inner.by_name.get(&schema.name).copied() {
            inner.by_id.insert(existing, schema);
            return existing;
        }
        let id = MoleculeKindId(inner.next_id);
        inner.next_id += 1;
        inner.by_name.insert(schema.name.clone(), id);
        inner.by_id.insert(id, schema);
        id
    }

    pub fn id_by_name(&self, name: &str) -> Option<MoleculeKindId> {
        self.inner.read().by_name.get(name).copied()
    }

    pub fn schema_by_id(&self, id: MoleculeKindId) -> Option<MoleculeSchema> {
        self.inner.read().by_id.get(&id).cloned()
    }

    pub fn schema_by_name(&self, name: &str) -> Option<MoleculeSchema> {
        let inner = self.inner.read();
        inner
            .by_name
            .get(name)
            .and_then(|id| inner.by_id.get(id).cloned())
    }
}

#[derive(Clone, Debug)]
pub struct MoleculeSchema {
    pub name: String,
    pub fields: Vec<FieldSchema>,
    pub primary_key: Vec<String>,
    pub merge: Option<MergeFn>,
    pub is_singleton: bool,
}

#[derive(Clone, Debug)]
pub struct FieldSchema {
    pub name: String,
    pub ty: Type,
    pub default: Option<crate::syntax::ast::Expr>,
}

#[allow(dead_code)]
fn _value_unused(_: &Value) {}