Skip to main content

stormchaser_api/routes/
webhook.rs

1use super::{CreateWebhookRequest, UpdateWebhookRequest};
2use crate::db;
3use crate::{AppState, AuthClaims};
4use async_nats::jetstream;
5use axum::{
6    body::Bytes,
7    extract::{Path, State},
8    http::{HeaderMap, StatusCode},
9    response::IntoResponse,
10    Json,
11};
12use chrono::Utc;
13use hmac::{Hmac, Mac};
14use serde_json::Value;
15use sha2::Sha256;
16use std::collections::HashMap;
17use stormchaser_model::event_rules::WebhookConfig;
18use stormchaser_model::events::WorkflowQueuedEvent;
19use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
20use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
21use stormchaser_model::workflow::RunStatus;
22use stormchaser_model::WebhookId;
23
24/// Create webhook.
25#[utoipa::path(
26    post,
27    path = "/api/v1/webhooks",
28    responses(
29        (status = 200, description = "Success"),
30        (status = 400, description = "Bad Request"),
31        (status = 404, description = "Not Found"),
32        (status = 500, description = "Internal Server Error")
33    ),
34    tag = "webhook"
35)]
36pub async fn create_webhook(
37    AuthClaims(_claims): AuthClaims,
38    State(state): State<AppState>,
39    Json(payload): Json<CreateWebhookRequest>,
40) -> Result<impl IntoResponse, StatusCode> {
41    let id = stormchaser_model::WebhookId::new_v4();
42    db::insert_webhook(
43        &state.pool,
44        id,
45        &payload.name,
46        &payload.description,
47        &payload.source_type,
48        &payload.secret_token,
49    )
50    .await
51    .map_err(|e| {
52        tracing::error!("Failed to create webhook: {:?}", e);
53        StatusCode::INTERNAL_SERVER_ERROR
54    })?;
55
56    Ok((StatusCode::CREATED, Json(serde_json::json!({ "id": id }))))
57}
58
59/// List webhooks.
60#[utoipa::path(
61    get,
62    path = "/api/v1/webhooks",
63    responses(
64        (status = 200, description = "Success"),
65        (status = 400, description = "Bad Request"),
66        (status = 404, description = "Not Found"),
67        (status = 500, description = "Internal Server Error")
68    ),
69    tag = "webhook"
70)]
71pub async fn list_webhooks(
72    AuthClaims(_claims): AuthClaims,
73    State(state): State<AppState>,
74) -> Result<impl IntoResponse, StatusCode> {
75    let webhooks = db::list_webhooks(&state.pool)
76        .await
77        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
78
79    Ok(Json(webhooks))
80}
81
82/// Gets a webhook.
83#[utoipa::path(
84    get,
85    path = "/api/v1/webhooks/{id}",
86    params(("id" = stormchaser_model::WebhookId, Path, description="Webhook ID")),
87    responses(
88        (status = 200, description = "Success"),
89        (status = 400, description = "Bad Request"),
90        (status = 404, description = "Not Found"),
91        (status = 500, description = "Internal Server Error")
92    ),
93    tag = "webhook"
94)]
95pub async fn get_webhook(
96    AuthClaims(_claims): AuthClaims,
97    State(state): State<AppState>,
98    Path(id): Path<WebhookId>,
99) -> Result<impl IntoResponse, StatusCode> {
100    let webhook = db::get_webhook(&state.pool, id)
101        .await
102        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
103        .ok_or(StatusCode::NOT_FOUND)?;
104
105    Ok(Json(webhook))
106}
107
108/// Updates a webhook.
109#[utoipa::path(
110    patch,
111    path = "/api/v1/webhooks/{id}",
112    request_body = UpdateWebhookRequest,
113    params(("id" = stormchaser_model::WebhookId, Path, description="Webhook ID")),
114    responses(
115        (status = 200, description = "Success"),
116        (status = 400, description = "Bad Request"),
117        (status = 404, description = "Not Found"),
118        (status = 500, description = "Internal Server Error")
119    ),
120    tag = "webhook"
121)]
122pub async fn update_webhook(
123    AuthClaims(_claims): AuthClaims,
124    State(state): State<AppState>,
125    Path(id): Path<WebhookId>,
126    Json(payload): Json<UpdateWebhookRequest>,
127) -> Result<impl IntoResponse, StatusCode> {
128    let description = match payload.description {
129        Some(s) if s.is_empty() => Some(None),
130        Some(s) => Some(Some(s)),
131        None => None,
132    };
133    let secret_token = match payload.secret_token {
134        Some(s) if s.is_empty() => Some(None),
135        Some(s) => Some(Some(s)),
136        None => None,
137    };
138
139    db::update_webhook(
140        &state.pool,
141        id,
142        payload.name,
143        description,
144        payload.source_type,
145        secret_token,
146        payload.is_active,
147    )
148    .await
149    .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
150
151    Ok(StatusCode::OK)
152}
153
154/// Deletes a webhook.
155#[utoipa::path(
156    delete,
157    path = "/api/v1/webhooks/{id}",
158    params(("id" = stormchaser_model::WebhookId, Path, description="Webhook ID")),
159    responses(
160        (status = 200, description = "Success"),
161        (status = 400, description = "Bad Request"),
162        (status = 404, description = "Not Found"),
163        (status = 500, description = "Internal Server Error")
164    ),
165    tag = "webhook"
166)]
167pub async fn delete_webhook(
168    AuthClaims(_claims): AuthClaims,
169    State(state): State<AppState>,
170    Path(id): Path<WebhookId>,
171) -> Result<impl IntoResponse, StatusCode> {
172    db::delete_webhook(&state.pool, id)
173        .await
174        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
175
176    Ok(StatusCode::NO_CONTENT)
177}
178
179#[utoipa::path(
180    post,
181    path = "/api/v1/webhooks/{id}",
182    params(
183        ("id" = stormchaser_model::WebhookId, Path, description = "Webhook ID")
184    ),
185    request_body = String,
186    responses(
187        (status = 200, description = "Webhook handled"),
188        (status = 400, description = "Bad Request"),
189        (status = 404, description = "Webhook not found"),
190        (status = 500, description = "Internal Server Error")
191    ),
192    tag = "webhook"
193)]
194/// Handle webhook.
195pub async fn handle_webhook(
196    Path(webhook_id): Path<WebhookId>,
197    headers: HeaderMap,
198    State(state): State<AppState>,
199    body: Bytes,
200) -> Result<impl IntoResponse, StatusCode> {
201    // 1. Fetch WebhookConfig
202    let webhook: WebhookConfig = db::get_active_webhook(&state.pool, webhook_id)
203        .await
204        .map_err(|e| {
205            tracing::error!("Failed to fetch webhook: {:?}", e);
206            StatusCode::INTERNAL_SERVER_ERROR
207        })?
208        .ok_or(StatusCode::NOT_FOUND)?;
209
210    // 2. Validate Source/Signature
211    let payload: Value = serde_json::from_slice(&body).map_err(|_| StatusCode::BAD_REQUEST)?;
212    let event_type = match webhook.source_type.as_str() {
213        "github" => {
214            validate_github_signature(&headers, &body, webhook.secret_token.as_deref())?;
215            headers
216                .get("X-GitHub-Event")
217                .and_then(|h| h.to_str().ok())
218                .unwrap_or("unknown")
219                .to_string()
220        }
221        "generic" => "generic".to_string(),
222        _ => return Err(StatusCode::NOT_IMPLEMENTED),
223    };
224
225    tracing::info!(
226        "Received webhook event '{}' for webhook '{}' ({})",
227        event_type,
228        webhook.name,
229        webhook.id
230    );
231
232    // 3. Find matching EventRules
233    let rules = db::get_active_event_rules_by_webhook(&state.pool, webhook_id)
234        .await
235        .map_err(|e| {
236            tracing::error!("Failed to fetch rules: {:?}", e);
237            StatusCode::INTERNAL_SERVER_ERROR
238        })?;
239
240    let mut triggered_count = 0;
241    let mut hcl_ctx = hcl::eval::Context::default();
242    hcl_ctx.declare_var("event", hcl_eval::json_to_hcl(payload.clone()));
243    hcl_ctx.declare_var(
244        "headers",
245        hcl_eval::json_to_hcl(
246            serde_json::to_value(
247                headers
248                    .iter()
249                    .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default().to_string()))
250                    .collect::<HashMap<_, _>>(),
251            )
252            .unwrap(),
253        ),
254    );
255
256    for rule in rules {
257        // 3a. Check event type pattern (simple regex for now)
258        let re = regex::Regex::new(&rule.event_type_pattern).map_err(|e| {
259            tracing::error!(
260                "Invalid regex pattern '{}': {:?}",
261                rule.event_type_pattern,
262                e
263            );
264            StatusCode::INTERNAL_SERVER_ERROR
265        })?;
266        if !re.is_match(&event_type) {
267            continue;
268        }
269
270        // 3b. Evaluate condition expression
271        if let Some(cond) = &rule.condition_expr {
272            match hcl_eval::evaluate_raw_expr(cond, &hcl_ctx) {
273                Ok(Value::Bool(true)) => {}
274                Ok(_) => continue,
275                Err(e) => {
276                    tracing::error!("Rule '{}' condition evaluation failed: {:?}", rule.name, e);
277                    continue;
278                }
279            }
280        }
281
282        // 3c. Map inputs
283        let mut inputs = serde_json::Map::new();
284        for (name, expr) in rule.get_input_mappings() {
285            match hcl_eval::evaluate_raw_expr(&expr, &hcl_ctx) {
286                Ok(val) => {
287                    inputs.insert(name, val);
288                }
289                Err(e) => {
290                    tracing::error!(
291                        "Rule '{}' input mapping for '{}' failed: {:?}",
292                        rule.name,
293                        name,
294                        e
295                    );
296                    continue;
297                }
298            }
299        }
300
301        // 4. Trigger Workflow
302        let run_id = stormchaser_model::RunId::new_v4();
303
304        tracing::info!(run_id = %run_id, "Enqueuing webhook workflow: {}", rule.workflow_name);
305        let fencing_token = Utc::now().timestamp_nanos_opt().unwrap_or(0);
306
307        let mut tx = state
308            .pool
309            .begin()
310            .await
311            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
312
313        db::insert_workflow_run(
314            &mut tx,
315            run_id,
316            &rule.workflow_name,
317            &format!("webhook:{}", webhook.name),
318            &rule.repo_url,
319            &rule.workflow_path,
320            &rule.git_ref,
321            RunStatus::Queued,
322            fencing_token,
323        )
324        .await
325        .map_err(|e| {
326            tracing::error!(run_id = %run_id, "Failed to insert workflow run: {:?}", e);
327            StatusCode::INTERNAL_SERVER_ERROR
328        })?;
329
330        db::insert_run_context(
331            &mut tx,
332            run_id,
333            "v1",
334            serde_json::json!({}),
335            "",
336            &Value::Object(inputs),
337        )
338        .await
339        .map_err(|e| {
340            tracing::error!(run_id = %run_id, "Failed to insert run context: {:?}", e);
341            StatusCode::INTERNAL_SERVER_ERROR
342        })?;
343
344        db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", "1h")
345            .await
346            .map_err(|e| {
347                tracing::error!(run_id = %run_id, "Failed to insert run quotas: {:?}", e);
348                StatusCode::INTERNAL_SERVER_ERROR
349            })?;
350
351        tx.commit()
352            .await
353            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
354
355        let event = WorkflowQueuedEvent {
356            run_id,
357            event_type: EventType::Workflow(WorkflowEventType::Queued),
358            timestamp: Utc::now(),
359            dsl: None,
360            inputs: None,
361            initiating_user: None,
362        };
363
364        publish_cloudevent(
365            &jetstream::new(state.nats.clone()),
366            NatsSubject::RunQueued,
367            EventType::Workflow(WorkflowEventType::Queued),
368            EventSource::System,
369            serde_json::to_value(event).unwrap(),
370            Some(SchemaVersion::new("1.0".to_string())),
371            None,
372        )
373        .await
374        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
375
376        triggered_count += 1;
377        tracing::info!(
378            "Triggered workflow '{}' (run {}) from rule '{}'",
379            rule.workflow_name,
380            run_id,
381            rule.name
382        );
383    }
384
385    Ok(Json(serde_json::json!({
386        "status": "ok",
387        "event_type": event_type,
388        "triggered_rules": triggered_count
389    })))
390}
391
392use stormchaser_model::hcl_eval;
393
394fn validate_github_signature(
395    headers: &HeaderMap,
396    body: &[u8],
397    secret: Option<&str>,
398) -> Result<(), StatusCode> {
399    let secret = match secret {
400        Some(s) => s,
401        None => {
402            tracing::debug!(
403                "No secret token configured for webhook, skipping signature validation"
404            );
405            return Ok(());
406        }
407    };
408
409    let signature = headers
410        .get("X-Hub-Signature-256")
411        .and_then(|h| h.to_str().ok())
412        .ok_or_else(|| {
413            tracing::warn!("Missing X-Hub-Signature-256 header");
414            StatusCode::UNAUTHORIZED
415        })?;
416
417    if !signature.starts_with("sha256=") {
418        tracing::warn!("Invalid signature format: {}", signature);
419        return Err(StatusCode::UNAUTHORIZED);
420    }
421
422    let signature_hex = &signature["sha256=".len()..];
423    let signature_bytes = hex::decode(signature_hex).map_err(|e| {
424        tracing::warn!("Failed to decode signature hex: {:?}", e);
425        StatusCode::UNAUTHORIZED
426    })?;
427
428    let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).map_err(|e| {
429        tracing::error!("Failed to initialize HMAC: {:?}", e);
430        StatusCode::INTERNAL_SERVER_ERROR
431    })?;
432    mac.update(body);
433
434    if let Err(e) = mac.verify_slice(&signature_bytes) {
435        tracing::warn!("HMAC signature verification failed: {:?}", e);
436        return Err(StatusCode::UNAUTHORIZED);
437    }
438
439    Ok(())
440}