Skip to main content

stormchaser_api/
hitl.rs

1use crate::{AppState, JWT_SECRET};
2use aes_gcm::{
3    aead::{Aead, KeyInit},
4    Aes256Gcm, Nonce,
5};
6use axum::{
7    extract::{Path, State},
8    http::{header::AUTHORIZATION, HeaderMap, StatusCode},
9    response::IntoResponse,
10    Json,
11};
12use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
13use serde_json::{json, Value};
14use sha2::{Digest, Sha256};
15use stormchaser_model::auth::ApprovalOpaContext;
16use stormchaser_model::events::{
17    EventSource, EventType, SchemaVersion, StepCompletedEvent, StepEventType, StepFailedEvent,
18};
19use stormchaser_model::step::StepStatus;
20use stormchaser_model::EventId;
21use stormchaser_model::RunId;
22use stormchaser_model::StepInstanceId;
23
24use crate::auth::AuthClaims;
25use crate::db::{
26    delete_event_correlation, get_event_correlation, get_run_outputs_for_opa,
27    get_step_instance_for_approval, get_workflow_context_for_opa, insert_approval_registry,
28};
29use async_nats::jetstream::new as new_jetstream;
30use chrono::Utc;
31use stormchaser_model::dsl::{Step, Workflow};
32use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
33
34#[derive(serde::Deserialize, serde::Serialize)]
35struct ApprovalLinkPayload {
36    run_id: RunId,
37    step_id: StepInstanceId,
38    action: String,
39    #[serde(default)]
40    inputs: Value,
41}
42
43/// Approves a step via an encrypted link.
44#[utoipa::path(
45    get,
46    path = "/api/v1/approve-link/{token}",
47    params(
48        ("token" = String, Path, description = "Encrypted approval token")
49    ),
50    responses(
51        (status = 200, description = "Step approved or rejected successfully"),
52        (status = 400, description = "Invalid token or step state"),
53        (status = 404, description = "Step not found"),
54        (status = 500, description = "Internal server error")
55    ),
56    tag = "hitl"
57)]
58/// Approve step link.
59pub async fn approve_step_link(
60    State(state): State<AppState>,
61    Path(token): Path<String>,
62) -> impl IntoResponse {
63    // 1. Derive key from JWT_SECRET
64    let mut hasher = Sha256::new();
65    hasher.update(JWT_SECRET);
66    let key_bytes = hasher.finalize();
67    let key = aes_gcm::Key::<Aes256Gcm>::from_slice(&key_bytes);
68    let cipher = Aes256Gcm::new(key);
69
70    // 2. Decode base64
71    let decoded = match URL_SAFE_NO_PAD.decode(token) {
72        Ok(d) => d,
73        Err(_) => return (StatusCode::BAD_REQUEST, "Invalid token encoding").into_response(),
74    };
75
76    if decoded.len() < 12 {
77        return (StatusCode::BAD_REQUEST, "Token too short").into_response();
78    }
79
80    // 3. Decrypt
81    let (nonce_bytes, ciphertext) = decoded.split_at(12);
82    let nonce = Nonce::from_slice(nonce_bytes);
83    let plaintext = match cipher.decrypt(nonce, ciphertext) {
84        Ok(p) => p,
85        Err(_) => {
86            return (
87                StatusCode::BAD_REQUEST,
88                "Invalid token signature or ciphertext",
89            )
90                .into_response()
91        }
92    };
93
94    // 4. Parse payload
95    let payload: ApprovalLinkPayload = match serde_json::from_slice(&plaintext) {
96        Ok(p) => p,
97        Err(_) => return (StatusCode::BAD_REQUEST, "Invalid token payload").into_response(),
98    };
99
100    // 5. Verify step exists and is WaitingForEvent
101    let step = get_step_instance_for_approval(&state.pool, payload.step_id, payload.run_id)
102        .await
103        .unwrap_or(None);
104
105    let step = match step {
106        Some(s) => s,
107        None => return (StatusCode::NOT_FOUND, "Step not found").into_response(),
108    };
109
110    if step.status != StepStatus::WaitingForEvent {
111        return (StatusCode::BAD_REQUEST, "Step is not waiting for approval").into_response();
112    }
113
114    let is_approve = payload.action.to_lowercase() == "approve";
115    let status_str = if is_approve { "approved" } else { "rejected" };
116
117    // 6. Insert into approval_registry
118    let _ = insert_approval_registry(
119        &state.pool,
120        EventId::new_v4(),
121        payload.step_id,
122        "system-link",
123        status_str,
124        &payload.inputs,
125    )
126    .await;
127
128    // 7. Publish to NATS simulating step completion/failure
129    let nats_payload = if is_approve {
130        json!({
131            "run_id": payload.run_id.to_string(),
132            "step_id": payload.step_id.to_string(),
133            "exit_code": 0,
134            "outputs": payload.inputs,
135        })
136    } else {
137        json!({
138            "run_id": payload.run_id.to_string(),
139            "step_id": payload.step_id.to_string(),
140            "exit_code": 1,
141            "error": "Rejected by human via link",
142        })
143    };
144
145    let subject = if is_approve {
146        "stormchaser.v1.step.completed"
147    } else {
148        "stormchaser.v1.step.failed"
149    };
150
151    match state
152        .nats
153        .publish(subject, nats_payload.to_string().into())
154        .await
155    {
156        Ok(_) => (StatusCode::OK, format!("Successfully {}", status_str)).into_response(),
157        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
158    }
159}
160
161async fn check_approval_opa(
162    state: &AppState,
163    run_id: RunId,
164    step_name: &str,
165    token: Option<&str>,
166) -> Result<(), (StatusCode, String)> {
167    if !state.opa.is_configured() {
168        return Ok(());
169    }
170
171    let context_row = match get_workflow_context_for_opa(&state.pool, run_id).await {
172        Ok(context_row) => context_row,
173        Err(err) => {
174            eprintln!(
175                "Failed to load workflow context for approval OPA evaluation for run {}: {:?}",
176                run_id, err
177            );
178            return Err((
179                StatusCode::INTERNAL_SERVER_ERROR,
180                "Failed to load workflow context for approval policy evaluation".to_string(),
181            ));
182        }
183    };
184
185    if let Some(context_data) = context_row {
186        let mut step_ast = json!({});
187        if let Ok(workflow) = serde_json::from_value::<Workflow>(context_data.workflow_definition) {
188            if let Some(s) = find_step(&workflow.steps, step_name) {
189                step_ast = serde_json::to_value(s).unwrap_or(json!({}));
190            }
191        }
192
193        let run_outputs_map = match get_run_outputs_for_opa(&state.pool, run_id).await {
194            Ok(map) => map,
195            Err(err) => {
196                tracing::error!(
197                    "Failed to load run outputs for approval OPA evaluation for run {}: {:?}",
198                    run_id,
199                    err
200                );
201                return Err((
202                    StatusCode::INTERNAL_SERVER_ERROR,
203                    "Failed to load run outputs for approval policy evaluation".to_string(),
204                ));
205            }
206        };
207
208        let opa_context = ApprovalOpaContext {
209            run_id,
210            initiating_user: context_data.initiating_user,
211            step_ast,
212            inputs: context_data.run_inputs,
213            run_outputs: Value::Object(run_outputs_map),
214            token,
215        };
216
217        match state.opa.check_approval(opa_context).await {
218            Ok(true) => Ok(()),
219            Ok(false) => Err((
220                StatusCode::FORBIDDEN,
221                "Approval denied by OPA policy".to_string(),
222            )),
223            Err(_) => Err((
224                StatusCode::INTERNAL_SERVER_ERROR,
225                "OPA policy evaluation failed".to_string(),
226            )),
227        }
228    } else {
229        Ok(())
230    }
231}
232
233// Recursively find the step AST
234fn find_step(steps: &[Step], name: &str) -> Option<Step> {
235    for s in steps {
236        if s.name == name {
237            return Some(s.clone());
238        }
239        if let Some(inner) = &s.steps {
240            if let Some(found) = find_step(inner, name) {
241                return Some(found);
242            }
243        }
244    }
245    None
246}
247
248/// Approves a step.
249pub async fn approve_step(
250    State(state): State<AppState>,
251    AuthClaims(claims): AuthClaims,
252    headers: HeaderMap,
253    Path((run_id, step_id)): Path<(RunId, StepInstanceId)>,
254    Json(inputs): Json<Value>,
255) -> impl IntoResponse {
256    let token = headers
257        .get(AUTHORIZATION)
258        .and_then(|h| h.to_str().ok())
259        .and_then(|s| s.strip_prefix("Bearer "));
260
261    // 1. Verify step exists and is WaitingForEvent
262    let step = get_step_instance_for_approval(&state.pool, step_id, run_id)
263        .await
264        .unwrap_or(None);
265
266    let step = match step {
267        Some(s) => s,
268        None => return (StatusCode::NOT_FOUND, "Step not found").into_response(),
269    };
270
271    if step.status != StepStatus::WaitingForEvent {
272        return (StatusCode::BAD_REQUEST, "Step is not waiting for approval").into_response();
273    }
274
275    // 1.5 OPA ABAC Engine check
276    if let Err((status, msg)) = check_approval_opa(&state, run_id, &step.step_name, token).await {
277        return (status, msg).into_response();
278    }
279
280    // 2. Insert into approval_registry
281    let _ = insert_approval_registry(
282        &state.pool,
283        EventId::new_v4(),
284        step_id,
285        &claims.sub,
286        "approved",
287        &inputs,
288    )
289    .await;
290
291    // 3. Publish to NATS simulating step completion
292    let completion_event = StepCompletedEvent {
293        run_id,
294        step_id,
295        event_type: EventType::Step(StepEventType::Completed),
296        runner_id: None,
297        exit_code: Some(0),
298        storage_hashes: None,
299        artifacts: None,
300        test_reports: None,
301        outputs: serde_json::from_value(inputs).ok(),
302        timestamp: Utc::now(),
303    };
304
305    match publish_cloudevent(
306        &new_jetstream(state.nats.clone()),
307        NatsSubject::StepCompleted,
308        EventType::Step(StepEventType::Completed),
309        EventSource::Api,
310        serde_json::to_value(completion_event).unwrap(),
311        Some(SchemaVersion::new("1.0".to_string())),
312        None,
313    )
314    .await
315    {
316        Ok(_) => (StatusCode::OK, "Approved").into_response(),
317        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
318    }
319}
320
321/// Rejects a step.
322pub async fn reject_step(
323    State(state): State<AppState>,
324    AuthClaims(claims): AuthClaims,
325    headers: HeaderMap,
326    Path((run_id, step_id)): Path<(RunId, StepInstanceId)>,
327) -> impl IntoResponse {
328    let token = headers
329        .get(AUTHORIZATION)
330        .and_then(|h| h.to_str().ok())
331        .and_then(|s| s.strip_prefix("Bearer "));
332
333    let step = get_step_instance_for_approval(&state.pool, step_id, run_id)
334        .await
335        .unwrap_or(None);
336
337    let step = match step {
338        Some(s) => s,
339        None => return (StatusCode::NOT_FOUND, "Step not found").into_response(),
340    };
341
342    if step.status != StepStatus::WaitingForEvent {
343        return (StatusCode::BAD_REQUEST, "Step is not waiting for approval").into_response();
344    }
345
346    // 1.5 OPA ABAC Engine check
347    if let Err((status, msg)) = check_approval_opa(&state, run_id, &step.step_name, token).await {
348        return (status, msg).into_response();
349    }
350
351    let _ = insert_approval_registry(
352        &state.pool,
353        EventId::new_v4(),
354        step_id,
355        &claims.sub,
356        "rejected",
357        &json!({}),
358    )
359    .await;
360
361    let event = StepFailedEvent {
362        run_id,
363        step_id,
364        event_type: EventType::Step(StepEventType::Failed),
365        error: "Rejected by human".to_string(),
366        exit_code: Some(1),
367        runner_id: None,
368        storage_hashes: None,
369        artifacts: None,
370        test_reports: None,
371        outputs: None,
372        timestamp: Utc::now(),
373    };
374
375    match publish_cloudevent(
376        &new_jetstream(state.nats.clone()),
377        NatsSubject::StepFailed,
378        EventType::Step(StepEventType::Failed),
379        EventSource::Api,
380        serde_json::to_value(event).unwrap(),
381        Some(SchemaVersion::new("1.0".to_string())),
382        None,
383    )
384    .await
385    {
386        Ok(_) => (StatusCode::OK, "Rejected").into_response(),
387        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
388    }
389}
390
391/// Correlates an event.
392pub async fn correlate_event(
393    State(state): State<AppState>,
394    Json(payload): Json<Value>,
395) -> impl IntoResponse {
396    // 1. Iterate over event_correlations, match payload against correlation_key
397    // For simplicity, let's assume payload has exactly { "key": "...", "value": "..." }
398    let key = payload.get("key").and_then(|v| v.as_str()).unwrap_or("");
399    let value = payload.get("value").and_then(|v| v.as_str()).unwrap_or("");
400
401    let correlation = get_event_correlation(&state.pool, key, value)
402        .await
403        .unwrap_or(None);
404
405    let corr = match correlation {
406        Some(c) => c,
407        None => return (StatusCode::NOT_FOUND, "No correlation matched").into_response(),
408    };
409
410    // 2. Publish to stormchaser.step.completed
411    let completion_event = StepCompletedEvent {
412        run_id: corr.run_id,
413        step_id: corr.step_instance_id,
414        event_type: EventType::Step(StepEventType::Completed),
415        runner_id: None,
416        exit_code: Some(0),
417        storage_hashes: None,
418        artifacts: None,
419        test_reports: None,
420        outputs: serde_json::from_value(payload.clone()).ok(),
421        timestamp: Utc::now(),
422    };
423
424    match publish_cloudevent(
425        &new_jetstream(state.nats.clone()),
426        NatsSubject::StepCompleted,
427        EventType::Step(StepEventType::Completed),
428        EventSource::Api,
429        serde_json::to_value(completion_event).unwrap(),
430        Some(SchemaVersion::new("1.0".to_string())),
431        None,
432    )
433    .await
434    {
435        Ok(_) => {
436            // Delete correlation so it doesn't match again
437            let _ = delete_event_correlation(&state.pool, corr.id).await;
438            (StatusCode::OK, "Event Correlated").into_response()
439        }
440        Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish").into_response(),
441    }
442}