harn-cli 0.7.25

CLI for the Harn programming language — run, test, REPL, format, and lint
use crate::cli::OrchestratorDlqArgs;
use serde::Serialize;

use super::common::{
    append_dlq_entry, discard_dlq_entry, load_local_runtime, print_json, trigger_inspect_dlq,
    trigger_replay,
};

#[derive(Debug, Serialize)]
struct DlqListPayload {
    dispatcher_dlq_depth: u64,
    pending_entries: usize,
    entries: Vec<super::common::DlqEntryRecord>,
}

#[derive(Debug, Serialize)]
struct DlqReplayPayload {
    entry_id: String,
    handle: super::common::DispatchHandleRecord,
}

#[derive(Debug, Serialize)]
struct DlqDiscardPayload {
    entry: super::common::DlqEntryRecord,
}

pub(super) async fn run(args: OrchestratorDlqArgs) -> Result<(), String> {
    let mut ctx = load_local_runtime(&args.local).await?;
    let entries = trigger_inspect_dlq(&mut ctx).await?;

    if let Some(entry_id) = args.replay.as_deref() {
        let entry = entries
            .iter()
            .find(|entry| entry.id == entry_id)
            .ok_or_else(|| format!("unknown pending DLQ entry '{entry_id}'"))?;
        let handle = trigger_replay(&mut ctx, &entry.event_id).await?;
        if args.json {
            return print_json(&DlqReplayPayload {
                entry_id: entry.id.clone(),
                handle,
            });
        }
        println!("DLQ replay:");
        println!("- entry_id={}", entry.id);
        println!("- event_id={}", handle.event_id);
        println!("- status={}", handle.status);
        println!("- error={}", handle.error.as_deref().unwrap_or("-"));
        println!(
            "- replay_of_event_id={}",
            handle.replay_of_event_id.as_deref().unwrap_or("-")
        );
        return Ok(());
    }

    if let Some(entry_id) = args.discard.as_deref() {
        let entry = entries
            .iter()
            .find(|entry| entry.id == entry_id)
            .ok_or_else(|| format!("unknown pending DLQ entry '{entry_id}'"))?;
        let discarded = discard_dlq_entry(entry)?;
        append_dlq_entry(&ctx.event_log, &discarded).await?;
        if args.json {
            return print_json(&DlqDiscardPayload { entry: discarded });
        }
        println!("DLQ entry discarded:");
        println!("- entry_id={}", discarded.id);
        println!("- event_id={}", discarded.event_id);
        println!("- state={}", discarded.state);
        return Ok(());
    }

    let stats = harn_vm::snapshot_dispatcher_stats();
    if args.json {
        return print_json(&DlqListPayload {
            dispatcher_dlq_depth: stats.dlq_depth,
            pending_entries: entries.len(),
            entries,
        });
    }
    println!("DLQ:");
    println!("- dispatcher_dlq_depth={}", stats.dlq_depth);
    println!("- pending_entries={}", entries.len());
    if entries.is_empty() {
        println!("- none");
        return Ok(());
    }
    for entry in entries {
        println!(
            "- {} binding={} event={} error={}",
            entry.id, entry.binding_id, entry.event_id, entry.error
        );
    }
    Ok(())
}