harn-cli 0.7.24

CLI for the Harn programming language — run, test, REPL, format, and lint
use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration as StdDuration;

use harn_vm::event_log::{AnyEventLog, EventLog, LogEvent, Topic};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;

use crate::cli::OrchestratorLocalArgs;
use crate::package::{self, CollectedManifestTrigger, Manifest};

use super::role::OrchestratorRole;

pub(crate) const STATE_SNAPSHOT_FILE: &str = "orchestrator-state.json";
pub(crate) use harn_vm::{
    TRIGGER_ATTEMPTS_TOPIC, TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_CLAIMS_TOPIC,
    TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC, TRIGGER_OUTBOX_TOPIC,
};

pub(crate) struct LoadedOrchestratorContext {
    pub vm: harn_vm::Vm,
    pub event_log: Arc<AnyEventLog>,
    pub collected_triggers: Vec<CollectedManifestTrigger>,
    pub snapshot: Option<PersistedStateSnapshot>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct PersistedStateSnapshot {
    pub status: String,
    pub bind: String,
    #[serde(default)]
    pub connectors: Vec<String>,
    #[serde(default)]
    pub activations: Vec<ConnectorActivationSnapshot>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct ConnectorActivationSnapshot {
    pub provider: String,
    pub binding_count: usize,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct DispatchHandleRecord {
    pub event_id: String,
    pub binding_id: String,
    pub binding_version: u32,
    pub status: String,
    pub replay_of_event_id: Option<String>,
    pub dlq_entry_id: Option<String>,
    pub error: Option<String>,
    pub result: Option<serde_json::Value>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct DlqAttemptRecord {
    pub attempt: u32,
    pub at: String,
    pub status: String,
    pub error: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub(crate) struct DlqEntryRecord {
    pub id: String,
    pub event_id: String,
    pub binding_id: String,
    pub binding_version: u32,
    pub provider: String,
    pub kind: String,
    pub state: String,
    pub error: String,
    pub event: harn_vm::TriggerEvent,
    pub retry_history: Vec<DlqAttemptRecord>,
}

#[derive(Clone, Debug, Serialize)]
pub(crate) struct StrandedEnvelopeRecord {
    pub inbox_offset: u64,
    pub event_id: String,
    pub trigger_id: Option<String>,
    pub binding_version: Option<u32>,
    pub provider: String,
    pub kind: String,
    pub received_at: OffsetDateTime,
    pub age: StdDuration,
}

pub(crate) async fn load_local_runtime(
    args: &OrchestratorLocalArgs,
) -> Result<LoadedOrchestratorContext, String> {
    harn_vm::reset_thread_local_state();

    let config_path = absolutize_from_cwd(&args.config)?;
    let (_manifest, manifest_dir) = load_manifest(&config_path)?;
    let state_dir = absolutize_from_cwd(&args.state_dir)?;
    std::fs::create_dir_all(&state_dir).map_err(|error| {
        format!(
            "failed to create state dir {}: {error}",
            state_dir.display()
        )
    })?;

    let mut vm =
        OrchestratorRole::SingleTenant.build_vm(&manifest_dir, &manifest_dir, &state_dir)?;
    let extensions = package::load_runtime_extensions(&config_path);
    let collected_triggers = package::collect_manifest_triggers(&mut vm, &extensions)
        .await
        .map_err(|error| format!("failed to collect manifest triggers: {error}"))?;
    package::install_collected_manifest_triggers(&collected_triggers).await?;

    let event_log = harn_vm::event_log::active_event_log()
        .ok_or_else(|| "event log was not installed during VM initialization".to_string())?;
    let snapshot = read_state_snapshot(&state_dir.join(STATE_SNAPSHOT_FILE))?;

    Ok(LoadedOrchestratorContext {
        vm,
        event_log,
        collected_triggers,
        snapshot,
    })
}

pub(crate) async fn trigger_list(
    ctx: &mut LoadedOrchestratorContext,
) -> Result<Vec<harn_vm::TriggerBindingSnapshot>, String> {
    eval_json(&mut ctx.vm, "trigger_list()").await
}

pub(crate) async fn trigger_replay(
    ctx: &mut LoadedOrchestratorContext,
    event_id: &str,
) -> Result<DispatchHandleRecord, String> {
    let event_id = serde_json::to_string(event_id).map_err(|error| error.to_string())?;
    eval_json(&mut ctx.vm, &format!("trigger_replay({event_id})")).await
}

pub(crate) async fn trigger_inspect_dlq(
    ctx: &mut LoadedOrchestratorContext,
) -> Result<Vec<DlqEntryRecord>, String> {
    eval_json(&mut ctx.vm, "trigger_inspect_dlq()").await
}

pub(crate) async fn trigger_fire(
    ctx: &mut LoadedOrchestratorContext,
    binding_id: &str,
    event: serde_json::Value,
) -> Result<DispatchHandleRecord, String> {
    let binding_id = serde_json::to_string(binding_id).map_err(|error| error.to_string())?;
    let event_json = serde_json::to_string(&event).map_err(|error| error.to_string())?;
    let event_literal = serde_json::to_string(&event_json).map_err(|error| error.to_string())?;
    eval_json(
        &mut ctx.vm,
        &format!("trigger_fire({binding_id}, json_parse({event_literal}))"),
    )
    .await
}

pub(crate) fn synthetic_event_for_binding(
    ctx: &LoadedOrchestratorContext,
    binding_id: &str,
) -> Result<serde_json::Value, String> {
    let trigger = ctx
        .collected_triggers
        .iter()
        .find(|trigger| trigger.config.id == binding_id)
        .ok_or_else(|| format!("unknown manifest binding '{binding_id}'"))?;
    let kind = trigger
        .config
        .match_
        .events
        .first()
        .cloned()
        .unwrap_or_else(|| match trigger.config.kind {
            crate::package::TriggerKind::Webhook => "webhook".to_string(),
            crate::package::TriggerKind::Cron => "cron.tick".to_string(),
            crate::package::TriggerKind::Poll => "poll".to_string(),
            crate::package::TriggerKind::Stream => "stream".to_string(),
            crate::package::TriggerKind::Predicate => "predicate".to_string(),
            crate::package::TriggerKind::A2aPush => "a2a.task.received".to_string(),
        });
    Ok(json!({
        "provider": trigger.config.provider.as_str(),
        "kind": kind,
        "headers": {
            "x-harn-binding-id": binding_id,
            "x-harn-synthetic": "true",
        }
    }))
}

pub(crate) async fn read_topic(
    log: &Arc<AnyEventLog>,
    topic_name: &str,
) -> Result<Vec<(u64, LogEvent)>, String> {
    let topic = Topic::new(topic_name).map_err(|error| error.to_string())?;
    log.read_range(&topic, None, usize::MAX)
        .await
        .map_err(|error| error.to_string())
}

pub(crate) async fn stranded_envelopes(
    log: &Arc<AnyEventLog>,
    min_age: StdDuration,
) -> Result<Vec<StrandedEnvelopeRecord>, String> {
    let envelopes = read_topic(log, TRIGGER_INBOX_ENVELOPES_TOPIC).await?;
    let legacy_inbox = read_topic(log, TRIGGER_INBOX_LEGACY_TOPIC).await?;
    let outbox = read_topic(log, TRIGGER_OUTBOX_TOPIC).await?;

    let mut any_outbox_event_ids = BTreeSet::new();
    let mut outbox_by_event_id = BTreeMap::<String, BTreeSet<String>>::new();
    for (_, event) in outbox {
        let Some(event_id) = event.headers.get("event_id").cloned() else {
            continue;
        };
        any_outbox_event_ids.insert(event_id.clone());
        if let Some(trigger_id) = event.headers.get("trigger_id").cloned() {
            outbox_by_event_id
                .entry(event_id)
                .or_default()
                .insert(trigger_id);
        }
    }

    let mut stranded = Vec::new();
    for (offset, event) in envelopes.into_iter().chain(legacy_inbox) {
        if event.kind != "event_ingested" {
            continue;
        }
        let envelope: harn_vm::triggers::dispatcher::InboxEnvelope =
            serde_json::from_value(event.payload)
                .map_err(|error| format!("failed to decode trigger inbox envelope: {error}"))?;
        let age = age_since(envelope.event.received_at);
        if age < min_age {
            continue;
        }

        let event_id = envelope.event.id.0.clone();
        let matched_outbox = if let Some(trigger_id) = envelope.trigger_id.as_ref() {
            outbox_by_event_id
                .get(&event_id)
                .is_some_and(|trigger_ids| trigger_ids.contains(trigger_id))
        } else {
            any_outbox_event_ids.contains(&event_id)
        };
        if matched_outbox {
            continue;
        }

        stranded.push(StrandedEnvelopeRecord {
            inbox_offset: offset,
            event_id,
            trigger_id: envelope.trigger_id,
            binding_version: envelope.binding_version,
            provider: envelope.event.provider.as_str().to_string(),
            kind: envelope.event.kind,
            received_at: envelope.event.received_at,
            age,
        });
    }

    stranded.sort_by(|left, right| {
        left.received_at
            .cmp(&right.received_at)
            .then(left.event_id.cmp(&right.event_id))
    });
    Ok(stranded)
}

pub(crate) async fn append_dlq_entry(
    log: &Arc<AnyEventLog>,
    entry: &DlqEntryRecord,
) -> Result<(), String> {
    let topic = Topic::new(TRIGGER_DLQ_TOPIC).map_err(|error| error.to_string())?;
    let payload = serde_json::to_value(entry).map_err(|error| error.to_string())?;
    log.append(&topic, LogEvent::new("dlq_entry", payload))
        .await
        .map(|_| ())
        .map_err(|error| error.to_string())
}

pub(crate) fn discard_dlq_entry(entry: &DlqEntryRecord) -> Result<DlqEntryRecord, String> {
    let mut next = entry.clone();
    next.state = "discarded".to_string();
    next.retry_history.push(DlqAttemptRecord {
        attempt: (next.retry_history.len() + 1) as u32,
        at: OffsetDateTime::now_utc()
            .format(&Rfc3339)
            .map_err(|error| error.to_string())?,
        status: "discarded".to_string(),
        error: None,
    });
    Ok(next)
}

pub(crate) fn print_json<T>(value: &T) -> Result<(), String>
where
    T: Serialize,
{
    let encoded = serde_json::to_string_pretty(value).map_err(|error| error.to_string())?;
    println!("{encoded}");
    Ok(())
}

fn read_state_snapshot(path: &Path) -> Result<Option<PersistedStateSnapshot>, String> {
    if !path.is_file() {
        return Ok(None);
    }
    let content = std::fs::read_to_string(path)
        .map_err(|error| format!("failed to read {}: {error}", path.display()))?;
    let snapshot = serde_json::from_str(&content)
        .map_err(|error| format!("failed to parse {}: {error}", path.display()))?;
    Ok(Some(snapshot))
}

async fn eval_json<T>(vm: &mut harn_vm::Vm, expr: &str) -> Result<T, String>
where
    T: DeserializeOwned,
{
    let source = format!("pipeline default() {{\n  return {expr}\n}}\n");
    let chunk = harn_vm::compile_source(&source)?;
    let value = vm
        .execute(&chunk)
        .await
        .map_err(|error| error.to_string())?;
    serde_json::from_value(harn_vm::llm::vm_value_to_json(&value))
        .map_err(|error| format!("failed to decode builtin result: {error}"))
}

fn load_manifest(config_path: &Path) -> Result<(Manifest, PathBuf), String> {
    if !config_path.is_file() {
        return Err(format!("manifest not found: {}", config_path.display()));
    }
    let content = std::fs::read_to_string(config_path)
        .map_err(|error| format!("failed to read {}: {error}", config_path.display()))?;
    let manifest = toml::from_str::<Manifest>(&content)
        .map_err(|error| format!("failed to parse {}: {error}", config_path.display()))?;
    let manifest_dir = config_path.parent().map(Path::to_path_buf).ok_or_else(|| {
        format!(
            "manifest has no parent directory: {}",
            config_path.display()
        )
    })?;
    Ok((manifest, manifest_dir))
}

fn absolutize_from_cwd(path: &Path) -> Result<PathBuf, String> {
    let candidate = if path.is_absolute() {
        path.to_path_buf()
    } else {
        std::env::current_dir()
            .map_err(|error| format!("failed to read current directory: {error}"))?
            .join(path)
    };
    Ok(candidate)
}

pub(super) fn format_timestamp(value: OffsetDateTime) -> String {
    value.format(&Rfc3339).unwrap_or_else(|_| value.to_string())
}

pub(super) fn format_duration(value: StdDuration) -> String {
    if value.as_secs() == 0 {
        return format!("{}ms", value.as_millis());
    }
    let seconds = value.as_secs();
    if seconds < 60 {
        return format!("{seconds}s");
    }
    if seconds < 60 * 60 {
        return format!("{}m", seconds / 60);
    }
    if seconds < 60 * 60 * 24 {
        return format!("{}h", seconds / (60 * 60));
    }
    format!("{}d", seconds / (60 * 60 * 24))
}

fn age_since(then: OffsetDateTime) -> StdDuration {
    let now = OffsetDateTime::now_utc();
    if now <= then {
        return StdDuration::ZERO;
    }
    let delta = now - then;
    StdDuration::new(
        delta.whole_seconds() as u64,
        delta.subsec_nanoseconds().try_into().unwrap_or_default(),
    )
}