harn-cli 0.7.23

CLI for the Harn programming language — run, test, REPL, format, and lint
use std::collections::BTreeSet;
use std::sync::Arc;

use crate::cli::OrchestratorQueueArgs;

use super::common::{
    format_duration, format_timestamp, load_local_runtime, read_topic, stranded_envelopes,
    TRIGGER_ATTEMPTS_TOPIC, TRIGGER_INBOX_CLAIMS_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC,
    TRIGGER_INBOX_LEGACY_TOPIC, TRIGGER_OUTBOX_TOPIC,
};

pub(super) async fn run(args: OrchestratorQueueArgs) -> Result<(), String> {
    let ctx = load_local_runtime(&args.local).await?;
    let dispatcher = harn_vm::snapshot_dispatcher_stats();
    let outbox = read_topic(&ctx.event_log, TRIGGER_OUTBOX_TOPIC).await?;
    let attempts = read_topic(&ctx.event_log, TRIGGER_ATTEMPTS_TOPIC).await?;
    let claim_events = read_topic(&ctx.event_log, TRIGGER_INBOX_CLAIMS_TOPIC).await?;
    let envelope_events = read_topic(&ctx.event_log, TRIGGER_INBOX_ENVELOPES_TOPIC).await?;
    let legacy_inbox_events = read_topic(&ctx.event_log, TRIGGER_INBOX_LEGACY_TOPIC).await?;
    let stranded = stranded_envelopes(&ctx.event_log, std::time::Duration::ZERO).await?;

    let mut started = BTreeSet::new();
    let mut finished = BTreeSet::new();
    for (_, event) in &outbox {
        let Some(key) = dispatch_key(&event.headers) else {
            continue;
        };
        match event.kind.as_str() {
            "dispatch_started" => {
                started.insert(key);
            }
            "dispatch_succeeded" | "dispatch_failed" => {
                finished.insert(key);
            }
            _ => {}
        }
    }
    let in_flight: Vec<_> = started.difference(&finished).cloned().collect();

    let mut scheduled = BTreeSet::new();
    for (_, event) in &attempts {
        if event.kind != "retry_scheduled" {
            continue;
        }
        if let Some(key) = dispatch_key(&event.headers) {
            scheduled.insert(key);
        }
    }
    let pending_retries: Vec<_> = scheduled.difference(&started).cloned().collect();

    let inbox_metrics = Arc::new(harn_vm::MetricsRegistry::default());
    let _inbox = harn_vm::triggers::InboxIndex::new(ctx.event_log.clone(), inbox_metrics.clone())
        .await
        .map_err(|error| error.to_string())?;
    let inbox_snapshot = inbox_metrics.snapshot();
    let inbox_claims_written = claim_events
        .iter()
        .chain(legacy_inbox_events.iter())
        .filter(|(_, event)| event.kind == "dedupe_claim")
        .count();
    let inbox_envelopes_written = envelope_events
        .iter()
        .chain(legacy_inbox_events.iter())
        .filter(|(_, event)| event.kind == "event_ingested")
        .count();

    println!("Queue:");
    println!("- dispatcher_in_flight={}", dispatcher.in_flight);
    println!(
        "- dispatcher_retry_queue_depth={}",
        dispatcher.retry_queue_depth
    );
    println!("- inferred_in_flight={}", in_flight.len());
    println!("- inferred_pending_retries={}", pending_retries.len());
    println!("- stranded_envelopes={}", stranded.len());
    println!("- inbox_claims_written={}", inbox_claims_written);
    println!("- inbox_envelopes_written={}", inbox_envelopes_written);
    println!(
        "- inbox_duplicates_rejected={}",
        inbox_snapshot.inbox_duplicates_rejected
    );
    println!(
        "- inbox_fast_path_hits={}",
        inbox_snapshot.inbox_fast_path_hits
    );
    println!("- inbox_durable_hits={}", inbox_snapshot.inbox_durable_hits);
    println!(
        "- inbox_expired_entries={}",
        inbox_snapshot.inbox_expired_entries
    );
    println!(
        "- inbox_active_entries={}",
        inbox_snapshot.inbox_active_entries
    );

    println!();
    println!("In-flight dispatches:");
    if in_flight.is_empty() {
        println!("- none");
    } else {
        for key in in_flight {
            println!("- {key}");
        }
    }

    println!();
    println!("Pending retries:");
    if pending_retries.is_empty() {
        println!("- none");
    } else {
        for key in pending_retries {
            println!("- {key}");
        }
    }

    println!();
    println!("Stranded envelopes:");
    if stranded.is_empty() {
        println!("- none");
    } else {
        for envelope in stranded {
            println!(
                "- event_id={} trigger_id={} binding_version={} provider={} kind={} age={} received_at={} inbox_offset={}",
                envelope.event_id,
                envelope.trigger_id.as_deref().unwrap_or("-"),
                envelope
                    .binding_version
                    .map(|value| value.to_string())
                    .unwrap_or_else(|| "-".to_string()),
                envelope.provider,
                envelope.kind,
                format_duration(envelope.age),
                format_timestamp(envelope.received_at),
                envelope.inbox_offset,
            );
        }
    }

    Ok(())
}

fn dispatch_key(headers: &std::collections::BTreeMap<String, String>) -> Option<String> {
    let binding_key = headers.get("binding_key")?;
    let event_id = headers.get("event_id")?;
    let attempt = headers.get("attempt")?;
    Some(format!("{binding_key}:{event_id}:{attempt}"))
}