harn-vm 0.8.6

Async bytecode virtual machine for the Harn programming language
Documentation
use std::collections::BTreeMap;
use std::time::Duration;

use crate::orchestration::{
    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH,
    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
};
use crate::triggers::registry::TriggerBinding;

use super::types::{DispatchError, Dispatcher};
use super::uri::DispatchUri;
use super::TriggerEvent;

impl Dispatcher {
    pub(super) async fn emit_action_graph(
        &self,
        event: &TriggerEvent,
        nodes: Vec<RunActionGraphNodeRecord>,
        edges: Vec<RunActionGraphEdgeRecord>,
        extra: serde_json::Value,
    ) -> Result<(), DispatchError> {
        let mut headers = BTreeMap::new();
        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
        headers.insert("event_id".to_string(), event.id.0.clone());
        let observability = RunObservabilityRecord {
            schema_version: 1,
            action_graph_nodes: nodes,
            action_graph_edges: edges,
            ..Default::default()
        };
        append_action_graph_update(
            headers,
            serde_json::json!({
                "source": "dispatcher",
                "trace_id": event.trace_id.0,
                "event_id": event.id.0,
                "observability": observability,
                "context": extra,
            }),
        )
        .await
        .map_err(DispatchError::from)
    }
}

pub(super) fn dispatch_error_label(error: &DispatchError) -> &'static str {
    match error {
        DispatchError::Denied(_) => "denied",
        DispatchError::Timeout(_) => "timeout",
        DispatchError::Waiting(_) => "waiting",
        DispatchError::Cancelled(_) => "cancelled",
        _ => "failed",
    }
}

pub(super) fn dispatch_success_outcome(
    route: &DispatchUri,
    result: &serde_json::Value,
) -> &'static str {
    match route {
        DispatchUri::Worker { .. } => "enqueued",
        DispatchUri::Persona { .. } => "recorded",
        DispatchUri::A2a { .. }
            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
        {
            "pending"
        }
        DispatchUri::A2a { .. } => "completed",
        DispatchUri::Local { .. } => "success",
    }
}

pub(super) fn dispatch_node_id(
    route: &DispatchUri,
    binding_key: &str,
    event_id: &str,
    attempt: u32,
) -> String {
    let prefix = match route {
        DispatchUri::A2a { .. } => "a2a",
        _ => "dispatch",
    };
    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
}

pub(super) fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
    match route {
        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
    }
}

pub(super) fn dispatch_node_label(route: &DispatchUri) -> String {
    match route {
        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
        _ => route.target_uri(),
    }
}

pub(super) fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
    match route {
        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
        _ => None,
    }
}

pub(super) fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
    match route {
        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
    }
}

pub(super) fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
    match status {
        crate::triggers::SignatureStatus::Verified => "verified",
        crate::triggers::SignatureStatus::Unsigned => "unsigned",
        crate::triggers::SignatureStatus::Failed { .. } => "failed",
    }
}

pub(super) fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
    let mut metadata = BTreeMap::new();
    metadata.insert(
        "provider".to_string(),
        serde_json::json!(event.provider.as_str()),
    );
    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
    metadata.insert(
        "dedupe_key".to_string(),
        serde_json::json!(event.dedupe_key),
    );
    metadata.insert(
        "signature_status".to_string(),
        serde_json::json!(signature_status_label(&event.signature_status)),
    );
    metadata
}

pub(super) fn trigger_event_persona_metadata(event: &TriggerEvent) -> BTreeMap<String, String> {
    let mut metadata = BTreeMap::new();
    metadata.insert("trigger_event_id".to_string(), event.id.0.clone());
    metadata.insert("event_id".to_string(), event.id.0.clone());
    metadata.insert("dedupe_key".to_string(), event.dedupe_key.clone());
    metadata.insert("trace_id".to_string(), event.trace_id.0.clone());
    if let Some(tenant_id) = &event.tenant_id {
        metadata.insert("tenant_id".to_string(), tenant_id.0.clone());
    }
    for (key, value) in &event.headers {
        metadata.insert(format!("header.{key}"), value.clone());
    }
    if let Ok(payload) = serde_json::to_value(&event.provider_payload) {
        collect_persona_payload_metadata("", &payload, &mut metadata);
        if let Some(raw) = payload.get("raw") {
            collect_persona_payload_metadata("", raw, &mut metadata);
        }
    }
    metadata
}

fn collect_persona_payload_metadata(
    prefix: &str,
    value: &serde_json::Value,
    metadata: &mut BTreeMap<String, String>,
) {
    let serde_json::Value::Object(object) = value else {
        return;
    };
    for (key, value) in object {
        let path = if prefix.is_empty() {
            key.clone()
        } else {
            format!("{prefix}.{key}")
        };
        match value {
            serde_json::Value::String(text) => {
                metadata.insert(path, text.clone());
            }
            serde_json::Value::Number(number) => {
                metadata.insert(path, number.to_string());
            }
            serde_json::Value::Bool(flag) => {
                metadata.insert(path, flag.to_string());
            }
            serde_json::Value::Object(_) if prefix.is_empty() => {
                collect_persona_payload_metadata(&path, value, metadata);
            }
            _ => {}
        }
    }
}

pub(super) fn dispatch_node_metadata(
    route: &DispatchUri,
    binding: &TriggerBinding,
    event: &TriggerEvent,
    attempt: u32,
) -> BTreeMap<String, serde_json::Value> {
    let mut metadata = BTreeMap::new();
    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
    metadata.insert(
        "target_uri".to_string(),
        serde_json::json!(route.target_uri()),
    );
    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
    metadata.insert(
        "trigger_id".to_string(),
        serde_json::json!(binding.id.as_str()),
    );
    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
    if let Some(target_agent) = dispatch_target_agent(route) {
        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
    }
    if let DispatchUri::Worker { queue } = route {
        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
    }
    if let DispatchUri::Persona { name } = route {
        metadata.insert("persona".to_string(), serde_json::json!(name));
    }
    metadata
}

pub(super) fn dispatch_success_metadata(
    route: &DispatchUri,
    binding: &TriggerBinding,
    event: &TriggerEvent,
    attempt: u32,
    result: &serde_json::Value,
) -> BTreeMap<String, serde_json::Value> {
    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
    match route {
        DispatchUri::A2a { .. } => {
            if let Some(task_id) = result
                .get("task_id")
                .or_else(|| result.get("id"))
                .and_then(|value| value.as_str())
            {
                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
            }
            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
                metadata.insert("state".to_string(), serde_json::json!(state));
            }
        }
        DispatchUri::Worker { .. } => {
            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
            {
                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
            }
            if let Some(response_topic) = result
                .get("response_topic")
                .and_then(|value| value.as_str())
            {
                metadata.insert(
                    "response_topic".to_string(),
                    serde_json::json!(response_topic),
                );
            }
        }
        DispatchUri::Persona { .. } => {
            if let Some(receipt_id) = result.get("receipt_id").and_then(|value| value.as_str()) {
                metadata.insert("receipt_id".to_string(), serde_json::json!(receipt_id));
            }
            if let Some(status) = result.get("status").and_then(|value| value.as_str()) {
                metadata.insert("status".to_string(), serde_json::json!(status));
            }
        }
        DispatchUri::Local { .. } => {}
    }
    metadata
}

pub(super) fn dispatch_error_metadata(
    route: &DispatchUri,
    binding: &TriggerBinding,
    event: &TriggerEvent,
    attempt: u32,
    error: &DispatchError,
) -> BTreeMap<String, serde_json::Value> {
    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
    metadata
}

pub(super) fn retry_node_metadata(
    binding: &TriggerBinding,
    event: &TriggerEvent,
    attempt: u32,
    delay: Duration,
    error: &DispatchError,
) -> BTreeMap<String, serde_json::Value> {
    let mut metadata = BTreeMap::new();
    metadata.insert(
        "trigger_id".to_string(),
        serde_json::json!(binding.id.as_str()),
    );
    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
    metadata
}