1use crate::{AppState, JWT_SECRET};
2use aes_gcm::{
3 aead::{Aead, KeyInit},
4 Aes256Gcm, Nonce,
5};
6use axum::{
7 extract::{Path, State},
8 http::{header::AUTHORIZATION, HeaderMap, StatusCode},
9 response::IntoResponse,
10 Json,
11};
12use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use stormchaser_model::auth::ApprovalOpaContext;
16use stormchaser_model::events::{
17 EventSource, EventType, SchemaVersion, StepCompletedEvent, StepEventType, StepFailedEvent,
18};
19use stormchaser_model::step::StepStatus;
20use stormchaser_model::EventId;
21use stormchaser_model::RunId;
22use stormchaser_model::StepInstanceId;
23
24use crate::auth::AuthClaims;
25use crate::db::{
26 delete_event_correlation, get_event_correlation, get_run_outputs_for_opa,
27 get_step_instance_for_approval, get_workflow_context_for_opa, insert_approval_registry,
28};
29use async_nats::jetstream::new as new_jetstream;
30use chrono::Utc;
31use stormchaser_model::dsl::{Step, Workflow};
32use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
33
34#[derive(serde::Deserialize, serde::Serialize)]
35struct ApprovalLinkPayload {
36 run_id: RunId,
37 step_id: StepInstanceId,
38 action: String,
39 #[serde(default)]
40 inputs: Value,
41}
42
43#[utoipa::path(
45 get,
46 path = "/api/v1/approve-link/{token}",
47 params(
48 ("token" = String, Path, description = "Encrypted approval token")
49 ),
50 responses(
51 (status = 200, description = "Step approved or rejected successfully"),
52 (status = 400, description = "Invalid token or step state"),
53 (status = 404, description = "Step not found"),
54 (status = 500, description = "Internal server error")
55 ),
56 tag = "hitl"
57)]
58pub async fn approve_step_link(
60 State(state): State<AppState>,
61 Path(token): Path<String>,
62) -> impl IntoResponse {
63 let mut hasher = Sha256::new();
65 hasher.update(JWT_SECRET);
66 let key_bytes = hasher.finalize();
67 let key = aes_gcm::Key::<Aes256Gcm>::from_slice(&key_bytes);
68 let cipher = Aes256Gcm::new(key);
69
70 let decoded = match URL_SAFE_NO_PAD.decode(token) {
72 Ok(d) => d,
73 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid token encoding").into_response(),
74 };
75
76 if decoded.len() < 12 {
77 return (StatusCode::BAD_REQUEST, "Token too short").into_response();
78 }
79
80 let (nonce_bytes, ciphertext) = decoded.split_at(12);
82 let nonce = Nonce::from_slice(nonce_bytes);
83 let plaintext = match cipher.decrypt(nonce, ciphertext) {
84 Ok(p) => p,
85 Err(_) => {
86 return (
87 StatusCode::BAD_REQUEST,
88 "Invalid token signature or ciphertext",
89 )
90 .into_response()
91 }
92 };
93
94 let payload: ApprovalLinkPayload = match serde_json::from_slice(&plaintext) {
96 Ok(p) => p,
97 Err(_) => return (StatusCode::BAD_REQUEST, "Invalid token payload").into_response(),
98 };
99
100 let step = get_step_instance_for_approval(&state.pool, payload.step_id, payload.run_id)
102 .await
103 .unwrap_or(None);
104
105 let step = match step {
106 Some(s) => s,
107 None => return (StatusCode::NOT_FOUND, "Step not found").into_response(),
108 };
109
110 if step.status != StepStatus::WaitingForEvent {
111 return (StatusCode::BAD_REQUEST, "Step is not waiting for approval").into_response();
112 }
113
114 let is_approve = payload.action.to_lowercase() == "approve";
115 let status_str = if is_approve { "approved" } else { "rejected" };
116
117 let _ = insert_approval_registry(
119 &state.pool,
120 EventId::new_v4(),
121 payload.step_id,
122 "system-link",
123 status_str,
124 &payload.inputs,
125 )
126 .await;
127
128 let nats_payload = if is_approve {
130 json!({
131 "run_id": payload.run_id.to_string(),
132 "step_id": payload.step_id.to_string(),
133 "exit_code": 0,
134 "outputs": payload.inputs,
135 })
136 } else {
137 json!({
138 "run_id": payload.run_id.to_string(),
139 "step_id": payload.step_id.to_string(),
140 "exit_code": 1,
141 "error": "Rejected by human via link",
142 })
143 };
144
145 let subject = if is_approve {
146 "stormchaser.v1.step.completed"
147 } else {
148 "stormchaser.v1.step.failed"
149 };
150
151 match state
152 .nats
153 .publish(subject, nats_payload.to_string().into())
154 .await
155 {
156 Ok(_) => (StatusCode::OK, format!("Successfully {}", status_str)).into_response(),
157 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
158 }
159}
160
161async fn check_approval_opa(
162 state: &AppState,
163 run_id: RunId,
164 step_name: &str,
165 token: Option<&str>,
166) -> Result<(), (StatusCode, String)> {
167 if !state.opa.is_configured() {
168 return Ok(());
169 }
170
171 let context_row = match get_workflow_context_for_opa(&state.pool, run_id).await {
172 Ok(context_row) => context_row,
173 Err(err) => {
174 eprintln!(
175 "Failed to load workflow context for approval OPA evaluation for run {}: {:?}",
176 run_id, err
177 );
178 return Err((
179 StatusCode::INTERNAL_SERVER_ERROR,
180 "Failed to load workflow context for approval policy evaluation".to_string(),
181 ));
182 }
183 };
184
185 if let Some(context_data) = context_row {
186 let mut step_ast = json!({});
187 if let Ok(workflow) = serde_json::from_value::<Workflow>(context_data.workflow_definition) {
188 if let Some(s) = find_step(&workflow.steps, step_name) {
189 step_ast = serde_json::to_value(s).unwrap_or(json!({}));
190 }
191 }
192
193 let run_outputs_map = match get_run_outputs_for_opa(&state.pool, run_id).await {
194 Ok(map) => map,
195 Err(err) => {
196 tracing::error!(
197 "Failed to load run outputs for approval OPA evaluation for run {}: {:?}",
198 run_id,
199 err
200 );
201 return Err((
202 StatusCode::INTERNAL_SERVER_ERROR,
203 "Failed to load run outputs for approval policy evaluation".to_string(),
204 ));
205 }
206 };
207
208 let opa_context = ApprovalOpaContext {
209 run_id,
210 initiating_user: context_data.initiating_user,
211 step_ast,
212 inputs: context_data.run_inputs,
213 run_outputs: Value::Object(run_outputs_map),
214 token,
215 };
216
217 match state.opa.check_approval(opa_context).await {
218 Ok(true) => Ok(()),
219 Ok(false) => Err((
220 StatusCode::FORBIDDEN,
221 "Approval denied by OPA policy".to_string(),
222 )),
223 Err(_) => Err((
224 StatusCode::INTERNAL_SERVER_ERROR,
225 "OPA policy evaluation failed".to_string(),
226 )),
227 }
228 } else {
229 Ok(())
230 }
231}
232
233fn find_step(steps: &[Step], name: &str) -> Option<Step> {
235 for s in steps {
236 if s.name == name {
237 return Some(s.clone());
238 }
239 if let Some(inner) = &s.steps {
240 if let Some(found) = find_step(inner, name) {
241 return Some(found);
242 }
243 }
244 }
245 None
246}
247
248pub async fn approve_step(
250 State(state): State<AppState>,
251 AuthClaims(claims): AuthClaims,
252 headers: HeaderMap,
253 Path((run_id, step_id)): Path<(RunId, StepInstanceId)>,
254 Json(inputs): Json<Value>,
255) -> impl IntoResponse {
256 let token = headers
257 .get(AUTHORIZATION)
258 .and_then(|h| h.to_str().ok())
259 .and_then(|s| s.strip_prefix("Bearer "));
260
261 let step = get_step_instance_for_approval(&state.pool, step_id, run_id)
263 .await
264 .unwrap_or(None);
265
266 let step = match step {
267 Some(s) => s,
268 None => return (StatusCode::NOT_FOUND, "Step not found").into_response(),
269 };
270
271 if step.status != StepStatus::WaitingForEvent {
272 return (StatusCode::BAD_REQUEST, "Step is not waiting for approval").into_response();
273 }
274
275 if let Err((status, msg)) = check_approval_opa(&state, run_id, &step.step_name, token).await {
277 return (status, msg).into_response();
278 }
279
280 let _ = insert_approval_registry(
282 &state.pool,
283 EventId::new_v4(),
284 step_id,
285 &claims.sub,
286 "approved",
287 &inputs,
288 )
289 .await;
290
291 let completion_event = StepCompletedEvent {
293 run_id,
294 step_id,
295 event_type: EventType::Step(StepEventType::Completed),
296 runner_id: None,
297 exit_code: Some(0),
298 storage_hashes: None,
299 artifacts: None,
300 test_reports: None,
301 outputs: serde_json::from_value(inputs).ok(),
302 timestamp: Utc::now(),
303 };
304
305 match publish_cloudevent(
306 &new_jetstream(state.nats.clone()),
307 NatsSubject::StepCompleted,
308 EventType::Step(StepEventType::Completed),
309 EventSource::Api,
310 serde_json::to_value(completion_event).unwrap(),
311 Some(SchemaVersion::new("1.0".to_string())),
312 None,
313 )
314 .await
315 {
316 Ok(_) => (StatusCode::OK, "Approved").into_response(),
317 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
318 }
319}
320
321pub async fn reject_step(
323 State(state): State<AppState>,
324 AuthClaims(claims): AuthClaims,
325 headers: HeaderMap,
326 Path((run_id, step_id)): Path<(RunId, StepInstanceId)>,
327) -> impl IntoResponse {
328 let token = headers
329 .get(AUTHORIZATION)
330 .and_then(|h| h.to_str().ok())
331 .and_then(|s| s.strip_prefix("Bearer "));
332
333 let step = get_step_instance_for_approval(&state.pool, step_id, run_id)
334 .await
335 .unwrap_or(None);
336
337 let step = match step {
338 Some(s) => s,
339 None => return (StatusCode::NOT_FOUND, "Step not found").into_response(),
340 };
341
342 if step.status != StepStatus::WaitingForEvent {
343 return (StatusCode::BAD_REQUEST, "Step is not waiting for approval").into_response();
344 }
345
346 if let Err((status, msg)) = check_approval_opa(&state, run_id, &step.step_name, token).await {
348 return (status, msg).into_response();
349 }
350
351 let _ = insert_approval_registry(
352 &state.pool,
353 EventId::new_v4(),
354 step_id,
355 &claims.sub,
356 "rejected",
357 &json!({}),
358 )
359 .await;
360
361 let event = StepFailedEvent {
362 run_id,
363 step_id,
364 event_type: EventType::Step(StepEventType::Failed),
365 error: "Rejected by human".to_string(),
366 exit_code: Some(1),
367 runner_id: None,
368 storage_hashes: None,
369 artifacts: None,
370 test_reports: None,
371 outputs: None,
372 timestamp: Utc::now(),
373 };
374
375 match publish_cloudevent(
376 &new_jetstream(state.nats.clone()),
377 NatsSubject::StepFailed,
378 EventType::Step(StepEventType::Failed),
379 EventSource::Api,
380 serde_json::to_value(event).unwrap(),
381 Some(SchemaVersion::new("1.0".to_string())),
382 None,
383 )
384 .await
385 {
386 Ok(_) => (StatusCode::OK, "Rejected").into_response(),
387 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
388 }
389}
390
391pub async fn correlate_event(
393 State(state): State<AppState>,
394 Json(payload): Json<Value>,
395) -> impl IntoResponse {
396 let key = payload.get("key").and_then(|v| v.as_str()).unwrap_or("");
399 let value = payload.get("value").and_then(|v| v.as_str()).unwrap_or("");
400
401 let correlation = get_event_correlation(&state.pool, key, value)
402 .await
403 .unwrap_or(None);
404
405 let corr = match correlation {
406 Some(c) => c,
407 None => return (StatusCode::NOT_FOUND, "No correlation matched").into_response(),
408 };
409
410 let completion_event = StepCompletedEvent {
412 run_id: corr.run_id,
413 step_id: corr.step_instance_id,
414 event_type: EventType::Step(StepEventType::Completed),
415 runner_id: None,
416 exit_code: Some(0),
417 storage_hashes: None,
418 artifacts: None,
419 test_reports: None,
420 outputs: serde_json::from_value(payload.clone()).ok(),
421 timestamp: Utc::now(),
422 };
423
424 match publish_cloudevent(
425 &new_jetstream(state.nats.clone()),
426 NatsSubject::StepCompleted,
427 EventType::Step(StepEventType::Completed),
428 EventSource::Api,
429 serde_json::to_value(completion_event).unwrap(),
430 Some(SchemaVersion::new("1.0".to_string())),
431 None,
432 )
433 .await
434 {
435 Ok(_) => {
436 let _ = delete_event_correlation(&state.pool, corr.id).await;
438 (StatusCode::OK, "Event Correlated").into_response()
439 }
440 Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
441 }
442}