harn-vm 0.8.4

Async bytecode virtual machine for the Harn programming language
Documentation
use std::collections::BTreeMap;
use std::rc::Rc;

use futures::StreamExt;

use crate::event_log::{
    active_event_log, install_memory_for_current_thread, EventLog, LogEvent, Topic,
};
use crate::llm::vm_value_to_json;
use crate::value::{VmError, VmStream, VmValue};
use crate::vm::Vm;

const EVENT_LOG_QUEUE_DEPTH: usize = 128;

pub(crate) fn register_event_log_builtins(vm: &mut Vm) {
    register_event_log_namespace(vm);

    vm.register_async_builtin("event_log.emit", |args| async move {
        let topic = parse_topic(args.first(), "event_log.emit")?;
        let kind = required_string(args.get(1), "event_log.emit", "kind")?;
        let payload = args
            .get(2)
            .map(vm_value_to_json)
            .unwrap_or(serde_json::Value::Null);
        let headers = parse_headers(args.get(3), "event_log.emit")?;
        let id = ensure_event_log()
            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
            .await
            .map_err(log_error)?;
        Ok(VmValue::Int(id as i64))
    });

    vm.register_async_builtin("event_log.latest", |args| async move {
        let topic = parse_topic(args.first(), "event_log.latest")?;
        let latest = ensure_event_log().latest(&topic).await.map_err(log_error)?;
        Ok(latest
            .map(|id| VmValue::Int(id as i64))
            .unwrap_or(VmValue::Nil))
    });

    vm.register_async_builtin("event_log.subscribe", |args| async move {
        let options = parse_subscribe_options(&args)?;
        let log = ensure_event_log();
        let mut events = log
            .clone()
            .subscribe(&options.topic, options.from_cursor)
            .await
            .map_err(log_error)?;
        let topic_name = options.topic.as_str().to_string();
        let (tx, rx) = tokio::sync::mpsc::channel::<Result<VmValue, VmError>>(1);

        tokio::task::spawn_local(async move {
            while let Some(next) = events.next().await {
                let value = match next {
                    Ok((event_id, event)) => Ok(event_to_value(&topic_name, event_id, event)),
                    Err(error) => Err(log_error(error)),
                };
                if tx.send(value).await.is_err() {
                    return;
                }
            }
        });

        Ok(VmValue::Stream(VmStream {
            done: Rc::new(std::cell::Cell::new(false)),
            receiver: Rc::new(tokio::sync::Mutex::new(rx)),
            cancel: None,
        }))
    });
}

fn register_event_log_namespace(vm: &mut Vm) {
    let names = ["emit", "latest", "subscribe"];
    vm.set_global(
        "event_log",
        VmValue::Dict(Rc::new(
            std::iter::once((
                "_namespace".to_string(),
                VmValue::String(Rc::from("event_log")),
            ))
            .chain(names.into_iter().map(|name| {
                (
                    name.to_string(),
                    VmValue::BuiltinRef(Rc::from(format!("event_log.{name}"))),
                )
            }))
            .collect::<BTreeMap<_, _>>(),
        )),
    );
}

struct SubscribeOptions {
    topic: Topic,
    from_cursor: Option<u64>,
}

fn parse_subscribe_options(args: &[VmValue]) -> Result<SubscribeOptions, VmError> {
    match args.first() {
        Some(VmValue::Dict(options)) => {
            let topic = parse_topic(options.get("topic"), "event_log.subscribe")?;
            let from_cursor = parse_cursor(
                options
                    .get("from_cursor")
                    .or_else(|| options.get("cursor"))
                    .or_else(|| options.get("from")),
                "event_log.subscribe",
            )?;
            Ok(SubscribeOptions { topic, from_cursor })
        }
        other => Ok(SubscribeOptions {
            topic: parse_topic(other, "event_log.subscribe")?,
            from_cursor: parse_cursor(args.get(1), "event_log.subscribe")?,
        }),
    }
}

fn ensure_event_log() -> std::sync::Arc<crate::event_log::AnyEventLog> {
    active_event_log().unwrap_or_else(|| install_memory_for_current_thread(EVENT_LOG_QUEUE_DEPTH))
}

fn parse_topic(value: Option<&VmValue>, builtin: &str) -> Result<Topic, VmError> {
    let raw = required_string(value, builtin, "topic")?;
    Topic::new(raw).map_err(log_error)
}

fn parse_cursor(value: Option<&VmValue>, builtin: &str) -> Result<Option<u64>, VmError> {
    match value {
        None | Some(VmValue::Nil) => Ok(None),
        Some(VmValue::Int(n)) if *n >= 0 => Ok(Some(*n as u64)),
        Some(other) => Err(VmError::TypeError(format!(
            "{builtin}: from_cursor must be a non-negative int or nil, got {}",
            other.type_name()
        ))),
    }
}

fn required_string(value: Option<&VmValue>, builtin: &str, name: &str) -> Result<String, VmError> {
    match value {
        Some(VmValue::String(value)) => Ok(value.to_string()),
        Some(other) => Err(VmError::TypeError(format!(
            "{builtin}: {name} must be a string, got {}",
            other.type_name()
        ))),
        None => Err(VmError::TypeError(format!("{builtin}: missing {name}"))),
    }
}

fn parse_headers(
    value: Option<&VmValue>,
    builtin: &str,
) -> Result<BTreeMap<String, String>, VmError> {
    match value {
        None | Some(VmValue::Nil) => Ok(BTreeMap::new()),
        Some(VmValue::Dict(dict)) => {
            let mut out = BTreeMap::new();
            for (key, value) in dict.iter() {
                match value {
                    VmValue::String(value) => {
                        out.insert(key.clone(), value.to_string());
                    }
                    other => {
                        return Err(VmError::TypeError(format!(
                            "{builtin}: header '{key}' must be a string, got {}",
                            other.type_name()
                        )))
                    }
                }
            }
            Ok(out)
        }
        Some(other) => Err(VmError::TypeError(format!(
            "{builtin}: headers must be a dict, got {}",
            other.type_name()
        ))),
    }
}

fn event_to_value(topic: &str, event_id: u64, event: LogEvent) -> VmValue {
    crate::stdlib::json_to_vm_value(&serde_json::json!({
        "id": event_id,
        "cursor": event_id,
        "topic": topic,
        "kind": event.kind,
        "payload": event.payload,
        "headers": event.headers,
        "occurred_at_ms": event.occurred_at_ms,
    }))
}

fn log_error(error: crate::event_log::LogError) -> VmError {
    VmError::Runtime(format!("event_log: {error}"))
}