thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
use super::EffectActor;
use crate::runtime::{Molecule, ReactorHandle};
use crate::value::Value;
use crate::Error;
use async_trait::async_trait;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};

/// Atomic question-then-answer IO for `TerminalPromptActor`. Abstracted so
/// tests can inject a deterministic source via `ScriptedPromptIo`.
#[async_trait]
pub trait PromptIo: Send + Sync + 'static {
    async fn prompt(&self, question: &str) -> Result<String, Error>;
}

/// Default IO: writes the question to stdout, flushes, reads one line from
/// stdin. Trailing `\n` / `\r\n` are stripped.
pub struct StdinStdoutIo;

#[async_trait]
impl PromptIo for StdinStdoutIo {
    async fn prompt(&self, question: &str) -> Result<String, Error> {
        let mut stdout = tokio::io::stdout();
        stdout
            .write_all(question.as_bytes())
            .await
            .map_err(Error::Io)?;
        if !question.ends_with(' ') {
            stdout.write_all(b" ").await.map_err(Error::Io)?;
        }
        stdout.flush().await.map_err(Error::Io)?;

        let stdin = tokio::io::stdin();
        let mut reader = BufReader::new(stdin);
        let mut line = String::new();
        reader.read_line(&mut line).await.map_err(Error::Io)?;
        if line.ends_with('\n') {
            line.pop();
        }
        if line.ends_with('\r') {
            line.pop();
        }
        Ok(line)
    }
}

/// Test IO: returns canned answers in FIFO order. Errors out if the script
/// is exhausted, which surfaces as a test failure.
pub struct ScriptedPromptIo {
    answers: Mutex<VecDeque<String>>,
}

impl ScriptedPromptIo {
    pub fn new(answers: Vec<String>) -> Self {
        Self {
            answers: Mutex::new(answers.into()),
        }
    }
}

#[async_trait]
impl PromptIo for ScriptedPromptIo {
    async fn prompt(&self, _question: &str) -> Result<String, Error> {
        self.answers.lock().pop_front().ok_or_else(|| {
            Error::Runtime("ScriptedPromptIo: script exhausted".into())
        })
    }
}

pub struct TerminalPromptActor {
    io: Arc<dyn PromptIo>,
}

impl TerminalPromptActor {
    pub fn new() -> Self {
        Self::with_io(Arc::new(StdinStdoutIo))
    }

    pub fn with_io(io: Arc<dyn PromptIo>) -> Self {
        Self { io }
    }
}

impl Default for TerminalPromptActor {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl EffectActor for TerminalPromptActor {
    fn kind_name(&self) -> &'static str {
        "TerminalPrompt"
    }

    async fn run(&self, request: Molecule, handle: ReactorHandle) -> Result<(), Error> {
        let question = request
            .fields
            .get("question")
            .ok_or_else(|| Error::Runtime("TerminalPrompt missing question".into()))?
            .as_string()?
            .to_string();

        let answer = self.io.prompt(&question).await?;

        let mut updated = request.clone();
        updated
            .fields
            .insert("answer".into(), Value::String(answer));
        updated
            .fields
            .insert("status".into(), Value::String("Done".into()));
        handle.emit(updated).await
    }
}