thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
use super::terminal_prompt::PromptIo;
use super::EffectActor;
use crate::llm::{LlmProvider, LlmProviderRegistry, LlmRequest};
use crate::runtime::{Molecule, ReactorHandle};
use crate::value::Value;
use crate::Error;
use async_trait::async_trait;
use std::sync::Arc;
use std::time::{Duration, Instant};

pub struct LlmActor {
    providers: Arc<LlmProviderRegistry>,
    prompt_io: Arc<dyn PromptIo>,
}

impl LlmActor {
    pub fn new(providers: Arc<LlmProviderRegistry>, prompt_io: Arc<dyn PromptIo>) -> Self {
        Self {
            providers,
            prompt_io,
        }
    }

    /// Look up a provider, polling for `grace` before giving up. Handles the
    /// startup race where an LlmCall and the matching LlmProvider config
    /// land in the delta queue together.
    async fn lookup_with_grace(
        &self,
        provider_name: &str,
        grace: Duration,
    ) -> Option<Arc<dyn LlmProvider>> {
        if let Some(p) = self.providers.get(provider_name) {
            return Some(p);
        }
        let deadline = Instant::now() + grace;
        while Instant::now() < deadline {
            tokio::time::sleep(Duration::from_millis(20)).await;
            if let Some(p) = self.providers.get(provider_name) {
                return Some(p);
            }
        }
        None
    }

    /// When an LlmCall references a provider that hasn't been registered,
    /// ask the user for a token file path, emit a corresponding `LlmProvider`
    /// molecule (which the reactor's interception path picks up and registers),
    /// then wait for the registry to be populated. Defaults to
    /// `kind: "openai_compat"` and OpenAI's base URL — Codex via Codex CLI is
    /// the named use case.
    async fn prompt_for_missing_provider(
        &self,
        provider_name: &str,
        handle: &ReactorHandle,
    ) -> Result<Arc<dyn LlmProvider>, Error> {
        let token_file = self
            .prompt_io
            .prompt(&format!(
                "LlmProvider `{provider_name}` is not registered. enter token file path: "
            ))
            .await?;
        if token_file.is_empty() {
            return Err(Error::Runtime(format!(
                "auto-registration aborted: empty token file path for provider `{provider_name}`"
            )));
        }

        // If the user pointed at a .json file, default to extracting
        // `.access_token` — Codex CLI's `~/.codex/auth.json` shape. The user
        // can declare an explicit `LlmProvider` molecule to override.
        let mut builder = Molecule::builder("LlmProvider")
            .field("name", Value::String(provider_name.to_string()))
            .field("kind", Value::String("openai_compat".into()))
            .field("base_url", Value::String("https://api.openai.com".into()))
            .field("token_file", Value::String(token_file.clone()));
        if token_file.trim_end_matches('/').ends_with(".json") {
            builder = builder.field(
                "token_jq",
                Value::String(".access_token".into()),
            );
        }
        handle.emit(builder.build()).await?;

        // Poll the registry until the reactor's interception path has
        // processed our emitted LlmProvider delta. Bounded so a misconfigured
        // setup fails loudly rather than hanging the actor forever.
        let deadline = Instant::now() + Duration::from_secs(5);
        loop {
            if let Some(p) = self.providers.get(provider_name) {
                return Ok(p);
            }
            if Instant::now() > deadline {
                return Err(Error::Runtime(format!(
                    "provider `{provider_name}` did not register within 5s after auto-prompt"
                )));
            }
            tokio::time::sleep(Duration::from_millis(10)).await;
        }
    }
}

#[async_trait]
impl EffectActor for LlmActor {
    fn kind_name(&self) -> &'static str {
        "LlmCall"
    }

    async fn run(&self, request: Molecule, handle: ReactorHandle) -> Result<(), Error> {
        let provider_name = request
            .fields
            .get("provider")
            .ok_or_else(|| Error::Runtime("LlmCall missing provider".into()))?
            .as_string()?
            .to_string();

        // Boot-tick race: an LlmCall and its LlmProvider config can both be
        // emitted in the same tick (user's Init + auto-loaded Init). The
        // LlmCall dispatch can land before the LlmProvider delta finishes
        // registering. Poll briefly before falling back to the auto-prompt.
        let provider = match self
            .lookup_with_grace(&provider_name, Duration::from_millis(500))
            .await
        {
            Some(p) => p,
            None => self
                .prompt_for_missing_provider(&provider_name, &handle)
                .await?,
        };

        let req = LlmRequest {
            model: request
                .fields
                .get("model")
                .ok_or_else(|| Error::Runtime("LlmCall missing model".into()))?
                .as_string()?
                .to_string(),
            prompt: request
                .fields
                .get("prompt")
                .ok_or_else(|| Error::Runtime("LlmCall missing prompt".into()))?
                .as_string()?
                .to_string(),
            temperature: request
                .fields
                .get("temperature")
                .ok_or_else(|| Error::Runtime("LlmCall missing temperature".into()))?
                .as_float()?,
            max_tokens: request
                .fields
                .get("max_tokens")
                .ok_or_else(|| Error::Runtime("LlmCall missing max_tokens".into()))?
                .as_int()?,
        };

        let resp = provider.call(req).await?;

        let mut updated = request.clone();
        updated
            .fields
            .insert("text".into(), Value::String(resp.text));
        updated
            .fields
            .insert("finish_reason".into(), Value::String(resp.finish_reason));
        updated
            .fields
            .insert("status".into(), Value::String("Done".into()));

        handle.emit(updated).await
    }
}