harn_cli/commands/trigger/
cancel.rs1use serde::Serialize;
2
3use std::path::Path;
4use std::sync::Arc;
5
6use harn_vm::event_log::AnyEventLog;
7
8use crate::cli::TriggerCancelArgs;
9use crate::commands::trigger::ops::{
10 append_bulk_cancel_requests, append_operation_audit, build_operation_audit,
11 install_trigger_runtime, load_bulk_targets, load_targets_for_event_id,
12 workspace_root_and_event_log, BulkTriggerTarget, ProgressReporter, RateLimiter,
13};
14
15#[derive(Clone, Debug, Serialize)]
16pub struct CancelItem {
17 pub event_id: String,
18 pub binding_id: String,
19 pub binding_version: u32,
20 pub binding_key: String,
21 pub latest_status: String,
22 pub status: String,
23 pub cancellable: bool,
24}
25
26#[derive(Clone, Debug, Serialize)]
27pub struct CancelReport {
28 pub operation: String,
29 pub dry_run: bool,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub filter: Option<String>,
32 pub matched_count: usize,
33 pub requested_count: usize,
34 pub skipped_count: usize,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub audit_id: Option<String>,
37 pub items: Vec<CancelItem>,
38}
39
40pub(crate) async fn run(args: TriggerCancelArgs) -> Result<(), String> {
41 let (workspace_root, event_log) = workspace_root_and_event_log()?;
42 install_trigger_runtime(&workspace_root).await?;
43
44 let (targets, normalized_filter) = match (args.event_id.as_deref(), args.where_expr.as_deref())
45 {
46 (Some(event_id), None) => (
47 load_targets_for_event_id(&event_log, event_id, None).await?,
48 None,
49 ),
50 (None, Some(where_expr)) => {
51 let (targets, normalized_filter) =
52 load_bulk_targets(&event_log, where_expr, None).await?;
53 (targets, Some(normalized_filter))
54 }
55 _ => return Err("expected either an event id or --where".to_string()),
56 };
57
58 let report = cancel_targets(
59 &event_log,
60 targets,
61 normalized_filter,
62 args.dry_run,
63 args.progress,
64 args.rate_limit,
65 )
66 .await?;
67 println!(
68 "{}",
69 serde_json::to_string_pretty(&report)
70 .map_err(|error| format!("failed to encode cancel report: {error}"))?
71 );
72 Ok(())
73}
74
75pub async fn cancel_event_in_process(
79 event_log: Arc<AnyEventLog>,
80 workspace_root: &Path,
81 event_id: &str,
82) -> Result<CancelReport, String> {
83 install_trigger_runtime(workspace_root).await?;
84 let targets = load_targets_for_event_id(&event_log, event_id, None).await?;
85 cancel_targets(&event_log, targets, None, false, false, None).await
86}
87
88async fn cancel_targets(
89 event_log: &std::sync::Arc<harn_vm::event_log::AnyEventLog>,
90 targets: Vec<BulkTriggerTarget>,
91 normalized_filter: Option<String>,
92 dry_run: bool,
93 progress: bool,
94 rate_limit: Option<f64>,
95) -> Result<CancelReport, String> {
96 let matched_count = targets.len();
97 let mut requested_count = 0;
98 let mut skipped_count = 0;
99 let mut items = Vec::new();
100 let mut limiter = RateLimiter::new(rate_limit);
101 let mut progress_reporter = ProgressReporter::new(progress, "cancel", matched_count);
102
103 if !dry_run {
104 let mut request_targets = Vec::new();
105 for target in &targets {
106 limiter.wait().await;
107 let (status, should_request) = if target.cancellable {
108 requested_count += 1;
109 ("requested".to_string(), true)
110 } else {
111 skipped_count += 1;
112 ("not_cancellable".to_string(), false)
113 };
114 if should_request {
115 request_targets.push(target.clone());
116 }
117 progress_reporter.update(status.as_str());
118 items.push(CancelItem {
119 event_id: target.event_id.clone(),
120 binding_id: target.binding_id.clone(),
121 binding_version: target.binding_version,
122 binding_key: target.binding_key.clone(),
123 latest_status: target.latest_status.clone(),
124 status,
125 cancellable: target.cancellable,
126 });
127 }
128 let audit = build_operation_audit(
129 "cancel",
130 false,
131 normalized_filter.clone(),
132 rate_limit,
133 matched_count,
134 requested_count,
135 skipped_count,
136 &targets,
137 );
138 append_bulk_cancel_requests(
139 event_log,
140 &audit.id,
141 audit.requested_by.clone(),
142 &request_targets,
143 )
144 .await?;
145 append_operation_audit(event_log, &audit).await?;
146 return Ok(CancelReport {
147 operation: "cancel".to_string(),
148 dry_run: false,
149 filter: normalized_filter,
150 matched_count,
151 requested_count,
152 skipped_count,
153 audit_id: Some(audit.id),
154 items,
155 });
156 }
157
158 for target in &targets {
159 let status = if target.cancellable {
160 "dry_run"
161 } else {
162 skipped_count += 1;
163 "not_cancellable"
164 };
165 progress_reporter.update(status);
166 items.push(CancelItem {
167 event_id: target.event_id.clone(),
168 binding_id: target.binding_id.clone(),
169 binding_version: target.binding_version,
170 binding_key: target.binding_key.clone(),
171 latest_status: target.latest_status.clone(),
172 status: status.to_string(),
173 cancellable: target.cancellable,
174 });
175 }
176
177 let audit = build_operation_audit(
178 "cancel",
179 true,
180 normalized_filter.clone(),
181 rate_limit,
182 matched_count,
183 0,
184 skipped_count,
185 &targets,
186 );
187 append_operation_audit(event_log, &audit).await?;
188 Ok(CancelReport {
189 operation: "cancel".to_string(),
190 dry_run: true,
191 filter: normalized_filter,
192 matched_count,
193 requested_count: 0,
194 skipped_count,
195 audit_id: Some(audit.id),
196 items,
197 })
198}