Skip to main content

harn_cli/commands/trigger/
cancel.rs

1use serde::Serialize;
2
3use std::path::Path;
4use std::sync::Arc;
5
6use harn_vm::event_log::AnyEventLog;
7
8use crate::cli::TriggerCancelArgs;
9use crate::commands::trigger::ops::{
10    append_bulk_cancel_requests, append_operation_audit, build_operation_audit,
11    install_trigger_runtime, load_bulk_targets, load_targets_for_event_id,
12    workspace_root_and_event_log, BulkTriggerTarget, ProgressReporter, RateLimiter,
13};
14
15#[derive(Clone, Debug, Serialize)]
16pub struct CancelItem {
17    pub event_id: String,
18    pub binding_id: String,
19    pub binding_version: u32,
20    pub binding_key: String,
21    pub latest_status: String,
22    pub status: String,
23    pub cancellable: bool,
24}
25
26#[derive(Clone, Debug, Serialize)]
27pub struct CancelReport {
28    pub operation: String,
29    pub dry_run: bool,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub filter: Option<String>,
32    pub matched_count: usize,
33    pub requested_count: usize,
34    pub skipped_count: usize,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub audit_id: Option<String>,
37    pub items: Vec<CancelItem>,
38}
39
40pub(crate) async fn run(args: TriggerCancelArgs) -> Result<(), String> {
41    let (workspace_root, event_log) = workspace_root_and_event_log()?;
42    install_trigger_runtime(&workspace_root).await?;
43
44    let (targets, normalized_filter) = match (args.event_id.as_deref(), args.where_expr.as_deref())
45    {
46        (Some(event_id), None) => (
47            load_targets_for_event_id(&event_log, event_id, None).await?,
48            None,
49        ),
50        (None, Some(where_expr)) => {
51            let (targets, normalized_filter) =
52                load_bulk_targets(&event_log, where_expr, None).await?;
53            (targets, Some(normalized_filter))
54        }
55        _ => return Err("expected either an event id or --where".to_string()),
56    };
57
58    let report = cancel_targets(
59        &event_log,
60        targets,
61        normalized_filter,
62        args.dry_run,
63        args.progress,
64        args.rate_limit,
65    )
66    .await?;
67    println!(
68        "{}",
69        serde_json::to_string_pretty(&report)
70            .map_err(|error| format!("failed to encode cancel report: {error}"))?
71    );
72    Ok(())
73}
74
75/// In-process entry point for `harn trigger cancel <event_id>`. Used by
76/// integration tests to drive the cancel path without spawning the `harn`
77/// binary. The binary entry calls the same internals via `run`.
78pub async fn cancel_event_in_process(
79    event_log: Arc<AnyEventLog>,
80    workspace_root: &Path,
81    event_id: &str,
82) -> Result<CancelReport, String> {
83    install_trigger_runtime(workspace_root).await?;
84    let targets = load_targets_for_event_id(&event_log, event_id, None).await?;
85    cancel_targets(&event_log, targets, None, false, false, None).await
86}
87
88async fn cancel_targets(
89    event_log: &std::sync::Arc<harn_vm::event_log::AnyEventLog>,
90    targets: Vec<BulkTriggerTarget>,
91    normalized_filter: Option<String>,
92    dry_run: bool,
93    progress: bool,
94    rate_limit: Option<f64>,
95) -> Result<CancelReport, String> {
96    let matched_count = targets.len();
97    let mut requested_count = 0;
98    let mut skipped_count = 0;
99    let mut items = Vec::new();
100    let mut limiter = RateLimiter::new(rate_limit);
101    let mut progress_reporter = ProgressReporter::new(progress, "cancel", matched_count);
102
103    if !dry_run {
104        let mut request_targets = Vec::new();
105        for target in &targets {
106            limiter.wait().await;
107            let (status, should_request) = if target.cancellable {
108                requested_count += 1;
109                ("requested".to_string(), true)
110            } else {
111                skipped_count += 1;
112                ("not_cancellable".to_string(), false)
113            };
114            if should_request {
115                request_targets.push(target.clone());
116            }
117            progress_reporter.update(status.as_str());
118            items.push(CancelItem {
119                event_id: target.event_id.clone(),
120                binding_id: target.binding_id.clone(),
121                binding_version: target.binding_version,
122                binding_key: target.binding_key.clone(),
123                latest_status: target.latest_status.clone(),
124                status,
125                cancellable: target.cancellable,
126            });
127        }
128        let audit = build_operation_audit(
129            "cancel",
130            false,
131            normalized_filter.clone(),
132            rate_limit,
133            matched_count,
134            requested_count,
135            skipped_count,
136            &targets,
137        );
138        append_bulk_cancel_requests(
139            event_log,
140            &audit.id,
141            audit.requested_by.clone(),
142            &request_targets,
143        )
144        .await?;
145        append_operation_audit(event_log, &audit).await?;
146        return Ok(CancelReport {
147            operation: "cancel".to_string(),
148            dry_run: false,
149            filter: normalized_filter,
150            matched_count,
151            requested_count,
152            skipped_count,
153            audit_id: Some(audit.id),
154            items,
155        });
156    }
157
158    for target in &targets {
159        let status = if target.cancellable {
160            "dry_run"
161        } else {
162            skipped_count += 1;
163            "not_cancellable"
164        };
165        progress_reporter.update(status);
166        items.push(CancelItem {
167            event_id: target.event_id.clone(),
168            binding_id: target.binding_id.clone(),
169            binding_version: target.binding_version,
170            binding_key: target.binding_key.clone(),
171            latest_status: target.latest_status.clone(),
172            status: status.to_string(),
173            cancellable: target.cancellable,
174        });
175    }
176
177    let audit = build_operation_audit(
178        "cancel",
179        true,
180        normalized_filter.clone(),
181        rate_limit,
182        matched_count,
183        0,
184        skipped_count,
185        &targets,
186    );
187    append_operation_audit(event_log, &audit).await?;
188    Ok(CancelReport {
189        operation: "cancel".to_string(),
190        dry_run: true,
191        filter: normalized_filter,
192        matched_count,
193        requested_count: 0,
194        skipped_count,
195        audit_id: Some(audit.id),
196        items,
197    })
198}