harn-cli 0.7.26

CLI for the Harn programming language — run, test, REPL, format, and lint
use serde::Serialize;

use crate::cli::TriggerCancelArgs;
use crate::commands::trigger::ops::{
    append_bulk_cancel_requests, append_operation_audit, build_operation_audit,
    install_trigger_runtime, load_bulk_targets, load_targets_for_event_id,
    workspace_root_and_event_log, BulkTriggerTarget, ProgressReporter, RateLimiter,
};

#[derive(Clone, Debug, Serialize)]
struct CancelItem {
    event_id: String,
    binding_id: String,
    binding_version: u32,
    binding_key: String,
    latest_status: String,
    status: String,
    cancellable: bool,
}

#[derive(Clone, Debug, Serialize)]
struct CancelReport {
    operation: String,
    dry_run: bool,
    #[serde(skip_serializing_if = "Option::is_none")]
    filter: Option<String>,
    matched_count: usize,
    requested_count: usize,
    skipped_count: usize,
    #[serde(skip_serializing_if = "Option::is_none")]
    audit_id: Option<String>,
    items: Vec<CancelItem>,
}

pub(crate) async fn run(args: TriggerCancelArgs) -> Result<(), String> {
    let (workspace_root, event_log) = workspace_root_and_event_log()?;
    install_trigger_runtime(&workspace_root).await?;

    let (targets, normalized_filter) = match (args.event_id.as_deref(), args.where_expr.as_deref())
    {
        (Some(event_id), None) => (
            load_targets_for_event_id(&event_log, event_id, None).await?,
            None,
        ),
        (None, Some(where_expr)) => {
            let (targets, normalized_filter) =
                load_bulk_targets(&event_log, where_expr, None).await?;
            (targets, Some(normalized_filter))
        }
        _ => return Err("expected either an event id or --where".to_string()),
    };

    let report = cancel_targets(
        &event_log,
        targets,
        normalized_filter,
        args.dry_run,
        args.progress,
        args.rate_limit,
    )
    .await?;
    println!(
        "{}",
        serde_json::to_string_pretty(&report)
            .map_err(|error| format!("failed to encode cancel report: {error}"))?
    );
    Ok(())
}

async fn cancel_targets(
    event_log: &std::sync::Arc<harn_vm::event_log::AnyEventLog>,
    targets: Vec<BulkTriggerTarget>,
    normalized_filter: Option<String>,
    dry_run: bool,
    progress: bool,
    rate_limit: Option<f64>,
) -> Result<CancelReport, String> {
    let matched_count = targets.len();
    let mut requested_count = 0;
    let mut skipped_count = 0;
    let mut items = Vec::new();
    let mut limiter = RateLimiter::new(rate_limit);
    let mut progress_reporter = ProgressReporter::new(progress, "cancel", matched_count);

    if !dry_run {
        let mut request_targets = Vec::new();
        for target in &targets {
            limiter.wait().await;
            let (status, should_request) = if target.cancellable {
                requested_count += 1;
                ("requested".to_string(), true)
            } else {
                skipped_count += 1;
                ("not_cancellable".to_string(), false)
            };
            if should_request {
                request_targets.push(target.clone());
            }
            progress_reporter.update(status.as_str());
            items.push(CancelItem {
                event_id: target.event_id.clone(),
                binding_id: target.binding_id.clone(),
                binding_version: target.binding_version,
                binding_key: target.binding_key.clone(),
                latest_status: target.latest_status.clone(),
                status,
                cancellable: target.cancellable,
            });
        }
        let audit = build_operation_audit(
            "cancel",
            false,
            normalized_filter.clone(),
            rate_limit,
            matched_count,
            requested_count,
            skipped_count,
            &targets,
        );
        append_bulk_cancel_requests(
            event_log,
            &audit.id,
            audit.requested_by.clone(),
            &request_targets,
        )
        .await?;
        append_operation_audit(event_log, &audit).await?;
        return Ok(CancelReport {
            operation: "cancel".to_string(),
            dry_run: false,
            filter: normalized_filter,
            matched_count,
            requested_count,
            skipped_count,
            audit_id: Some(audit.id),
            items,
        });
    }

    for target in &targets {
        let status = if target.cancellable {
            "dry_run"
        } else {
            skipped_count += 1;
            "not_cancellable"
        };
        progress_reporter.update(status);
        items.push(CancelItem {
            event_id: target.event_id.clone(),
            binding_id: target.binding_id.clone(),
            binding_version: target.binding_version,
            binding_key: target.binding_key.clone(),
            latest_status: target.latest_status.clone(),
            status: status.to_string(),
            cancellable: target.cancellable,
        });
    }

    let audit = build_operation_audit(
        "cancel",
        true,
        normalized_filter.clone(),
        rate_limit,
        matched_count,
        0,
        skipped_count,
        &targets,
    );
    append_operation_audit(event_log, &audit).await?;
    Ok(CancelReport {
        operation: "cancel".to_string(),
        dry_run: true,
        filter: normalized_filter,
        matched_count,
        requested_count: 0,
        skipped_count,
        audit_id: Some(audit.id),
        items,
    })
}