thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
pub mod llm;
pub mod process;
pub mod reedline_io;
pub mod spinner;
pub mod terminal_prompt;
pub mod terminal_write;
pub mod timer;

use crate::llm::LlmProviderRegistry;
use crate::runtime::{Molecule, ReactorHandle};
use crate::value::MoleculeKindId;
use crate::Error;
use async_trait::async_trait;
use dashmap::DashMap;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

#[async_trait]
pub trait SourceActor: Send + Sync + 'static {
    fn kind_name(&self) -> &'static str;

    async fn run(
        &self,
        config_molecule: Molecule,
        handle: ReactorHandle,
        cancel: CancellationToken,
    ) -> Result<(), Error>;
}

#[async_trait]
pub trait EffectActor: Send + Sync + 'static {
    fn kind_name(&self) -> &'static str;

    async fn run(&self, request: Molecule, handle: ReactorHandle) -> Result<(), Error>;
}

pub struct ActorRegistry {
    pub sources_by_id: DashMap<MoleculeKindId, Arc<dyn SourceActor>>,
    pub effects_by_id: DashMap<MoleculeKindId, Arc<dyn EffectActor>>,
    pub sources_by_name: DashMap<String, Arc<dyn SourceActor>>,
    pub effects_by_name: DashMap<String, Arc<dyn EffectActor>>,
    pub llm_providers: Arc<LlmProviderRegistry>,
}

impl Default for ActorRegistry {
    fn default() -> Self {
        Self {
            sources_by_id: DashMap::new(),
            effects_by_id: DashMap::new(),
            sources_by_name: DashMap::new(),
            effects_by_name: DashMap::new(),
            llm_providers: Arc::new(LlmProviderRegistry::new()),
        }
    }
}

impl ActorRegistry {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn with_builtins() -> Self {
        // Default to the rich reedline-backed input. Falls back gracefully
        // when stdin isn't a TTY (piped input still works for tests / scripts).
        Self::with_prompt_io(Arc::new(reedline_io::ReedlineIo::new()))
    }

    /// Construct with the basic stdin/stdout PromptIo. Useful when reedline's
    /// terminal control conflicts with whatever's hosting the binary
    /// (e.g., embedded test harnesses).
    pub fn with_basic_io() -> Self {
        Self::with_prompt_io(Arc::new(terminal_prompt::StdinStdoutIo))
    }

    /// Construct the builtin registry with a caller-supplied `PromptIo`.
    /// The same `PromptIo` is shared by `TerminalPromptActor` *and* by
    /// `LlmActor`'s missing-provider auto-prompt path, so a single test
    /// override (e.g. `ScriptedPromptIo`) covers both.
    pub fn with_prompt_io(prompt_io: Arc<dyn terminal_prompt::PromptIo>) -> Self {
        let llm_providers = Arc::new(LlmProviderRegistry::with_builtins());
        let registry = Self {
            sources_by_id: DashMap::new(),
            effects_by_id: DashMap::new(),
            sources_by_name: DashMap::new(),
            effects_by_name: DashMap::new(),
            llm_providers: llm_providers.clone(),
        };
        registry.register_source(Arc::new(timer::TimerActor));
        registry.register_effect(Arc::new(terminal_write::TerminalWriteActor));
        registry.register_effect(Arc::new(terminal_prompt::TerminalPromptActor::with_io(
            prompt_io.clone(),
        )));
        registry.register_effect(Arc::new(process::ProcessActor));
        registry.register_effect(Arc::new(spinner::SpinnerActor::new()));
        registry.register_effect(Arc::new(llm::LlmActor::new(llm_providers, prompt_io)));
        registry
    }

    pub fn register_source(&self, actor: Arc<dyn SourceActor>) {
        self.sources_by_name
            .insert(actor.kind_name().to_string(), actor);
    }

    pub fn register_effect(&self, actor: Arc<dyn EffectActor>) {
        self.effects_by_name
            .insert(actor.kind_name().to_string(), actor);
    }
}