thal 0.0.1

Reactive semantic runtime — molecules, reactions, and effect actors for building LLM-backed applications as dataflow programs.
Documentation
use super::EffectActor;
use crate::runtime::{Molecule, ReactorHandle};
use crate::value::Value;
use crate::Error;
use async_trait::async_trait;

pub struct ProcessActor;

#[async_trait]
impl EffectActor for ProcessActor {
    fn kind_name(&self) -> &'static str {
        "Process"
    }

    async fn run(&self, request: Molecule, handle: ReactorHandle) -> Result<(), Error> {
        let cmd = request
            .fields
            .get("cmd")
            .ok_or_else(|| Error::Runtime("Process missing cmd".into()))?
            .as_string()?
            .to_string();

        let args: Vec<String> = match request.fields.get("args") {
            Some(Value::List(items)) => items
                .iter()
                .map(|v| v.as_string().map(String::from))
                .collect::<Result<Vec<_>, _>>()?,
            Some(other) => {
                return Err(Error::Runtime(format!(
                    "Process.args must be a list of strings, got {}",
                    other.type_name()
                )))
            }
            None => Vec::new(),
        };

        let output = tokio::process::Command::new(&cmd)
            .args(&args)
            .output()
            .await
            .map_err(|e| Error::Runtime(format!("Process spawn failed: {e}")))?;

        let stdout = String::from_utf8_lossy(&output.stdout).into_owned();
        let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
        let exit_code = i64::from(output.status.code().unwrap_or(-1));

        let mut updated = request.clone();
        updated.fields.insert("stdout".into(), Value::String(stdout));
        updated.fields.insert("stderr".into(), Value::String(stderr));
        updated.fields.insert("exit_code".into(), Value::Int(exit_code));
        updated
            .fields
            .insert("status".into(), Value::String("Done".into()));

        handle.emit(updated).await
    }
}