1use super::{CreateWebhookRequest, UpdateWebhookRequest};
2use crate::db;
3use crate::{AppState, AuthClaims};
4use async_nats::jetstream;
5use axum::{
6 body::Bytes,
7 extract::{Path, State},
8 http::{HeaderMap, StatusCode},
9 response::IntoResponse,
10 Json,
11};
12use chrono::Utc;
13use hmac::{Hmac, Mac};
14use serde_json::Value;
15use sha2::Sha256;
16use std::collections::HashMap;
17use stormchaser_model::event_rules::WebhookConfig;
18use stormchaser_model::events::WorkflowQueuedEvent;
19use stormchaser_model::events::{EventSource, EventType, SchemaVersion, WorkflowEventType};
20use stormchaser_model::nats::{publish_cloudevent, NatsSubject};
21use stormchaser_model::workflow::RunStatus;
22use stormchaser_model::WebhookId;
23
24#[utoipa::path(
26 post,
27 path = "/api/v1/webhooks",
28 responses(
29 (status = 200, description = "Success"),
30 (status = 400, description = "Bad Request"),
31 (status = 404, description = "Not Found"),
32 (status = 500, description = "Internal Server Error")
33 ),
34 tag = "webhook"
35)]
36pub async fn create_webhook(
37 AuthClaims(_claims): AuthClaims,
38 State(state): State<AppState>,
39 Json(payload): Json<CreateWebhookRequest>,
40) -> Result<impl IntoResponse, StatusCode> {
41 let id = stormchaser_model::WebhookId::new_v4();
42 db::insert_webhook(
43 &state.pool,
44 id,
45 &payload.name,
46 &payload.description,
47 &payload.source_type,
48 &payload.secret_token,
49 )
50 .await
51 .map_err(|e| {
52 tracing::error!("Failed to create webhook: {:?}", e);
53 StatusCode::INTERNAL_SERVER_ERROR
54 })?;
55
56 Ok((StatusCode::CREATED, Json(serde_json::json!({ "id": id }))))
57}
58
59#[utoipa::path(
61 get,
62 path = "/api/v1/webhooks",
63 responses(
64 (status = 200, description = "Success"),
65 (status = 400, description = "Bad Request"),
66 (status = 404, description = "Not Found"),
67 (status = 500, description = "Internal Server Error")
68 ),
69 tag = "webhook"
70)]
71pub async fn list_webhooks(
72 AuthClaims(_claims): AuthClaims,
73 State(state): State<AppState>,
74) -> Result<impl IntoResponse, StatusCode> {
75 let webhooks = db::list_webhooks(&state.pool)
76 .await
77 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
78
79 Ok(Json(webhooks))
80}
81
82#[utoipa::path(
84 get,
85 path = "/api/v1/webhooks/{id}",
86 params(("id" = stormchaser_model::WebhookId, Path, description="Webhook ID")),
87 responses(
88 (status = 200, description = "Success"),
89 (status = 400, description = "Bad Request"),
90 (status = 404, description = "Not Found"),
91 (status = 500, description = "Internal Server Error")
92 ),
93 tag = "webhook"
94)]
95pub async fn get_webhook(
96 AuthClaims(_claims): AuthClaims,
97 State(state): State<AppState>,
98 Path(id): Path<WebhookId>,
99) -> Result<impl IntoResponse, StatusCode> {
100 let webhook = db::get_webhook(&state.pool, id)
101 .await
102 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
103 .ok_or(StatusCode::NOT_FOUND)?;
104
105 Ok(Json(webhook))
106}
107
108#[utoipa::path(
110 patch,
111 path = "/api/v1/webhooks/{id}",
112 request_body = UpdateWebhookRequest,
113 params(("id" = stormchaser_model::WebhookId, Path, description="Webhook ID")),
114 responses(
115 (status = 200, description = "Success"),
116 (status = 400, description = "Bad Request"),
117 (status = 404, description = "Not Found"),
118 (status = 500, description = "Internal Server Error")
119 ),
120 tag = "webhook"
121)]
122pub async fn update_webhook(
123 AuthClaims(_claims): AuthClaims,
124 State(state): State<AppState>,
125 Path(id): Path<WebhookId>,
126 Json(payload): Json<UpdateWebhookRequest>,
127) -> Result<impl IntoResponse, StatusCode> {
128 let description = match payload.description {
129 Some(s) if s.is_empty() => Some(None),
130 Some(s) => Some(Some(s)),
131 None => None,
132 };
133 let secret_token = match payload.secret_token {
134 Some(s) if s.is_empty() => Some(None),
135 Some(s) => Some(Some(s)),
136 None => None,
137 };
138
139 db::update_webhook(
140 &state.pool,
141 id,
142 payload.name,
143 description,
144 payload.source_type,
145 secret_token,
146 payload.is_active,
147 )
148 .await
149 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
150
151 Ok(StatusCode::OK)
152}
153
154#[utoipa::path(
156 delete,
157 path = "/api/v1/webhooks/{id}",
158 params(("id" = stormchaser_model::WebhookId, Path, description="Webhook ID")),
159 responses(
160 (status = 200, description = "Success"),
161 (status = 400, description = "Bad Request"),
162 (status = 404, description = "Not Found"),
163 (status = 500, description = "Internal Server Error")
164 ),
165 tag = "webhook"
166)]
167pub async fn delete_webhook(
168 AuthClaims(_claims): AuthClaims,
169 State(state): State<AppState>,
170 Path(id): Path<WebhookId>,
171) -> Result<impl IntoResponse, StatusCode> {
172 db::delete_webhook(&state.pool, id)
173 .await
174 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
175
176 Ok(StatusCode::NO_CONTENT)
177}
178
179#[utoipa::path(
180 post,
181 path = "/api/v1/webhooks/{id}",
182 params(
183 ("id" = stormchaser_model::WebhookId, Path, description = "Webhook ID")
184 ),
185 request_body = String,
186 responses(
187 (status = 200, description = "Webhook handled"),
188 (status = 400, description = "Bad Request"),
189 (status = 404, description = "Webhook not found"),
190 (status = 500, description = "Internal Server Error")
191 ),
192 tag = "webhook"
193)]
194pub async fn handle_webhook(
196 Path(webhook_id): Path<WebhookId>,
197 headers: HeaderMap,
198 State(state): State<AppState>,
199 body: Bytes,
200) -> Result<impl IntoResponse, StatusCode> {
201 let webhook: WebhookConfig = db::get_active_webhook(&state.pool, webhook_id)
203 .await
204 .map_err(|e| {
205 tracing::error!("Failed to fetch webhook: {:?}", e);
206 StatusCode::INTERNAL_SERVER_ERROR
207 })?
208 .ok_or(StatusCode::NOT_FOUND)?;
209
210 let payload: Value = serde_json::from_slice(&body).map_err(|_| StatusCode::BAD_REQUEST)?;
212 let event_type = match webhook.source_type.as_str() {
213 "github" => {
214 validate_github_signature(&headers, &body, webhook.secret_token.as_deref())?;
215 headers
216 .get("X-GitHub-Event")
217 .and_then(|h| h.to_str().ok())
218 .unwrap_or("unknown")
219 .to_string()
220 }
221 "generic" => "generic".to_string(),
222 _ => return Err(StatusCode::NOT_IMPLEMENTED),
223 };
224
225 tracing::info!(
226 "Received webhook event '{}' for webhook '{}' ({})",
227 event_type,
228 webhook.name,
229 webhook.id
230 );
231
232 let rules = db::get_active_event_rules_by_webhook(&state.pool, webhook_id)
234 .await
235 .map_err(|e| {
236 tracing::error!("Failed to fetch rules: {:?}", e);
237 StatusCode::INTERNAL_SERVER_ERROR
238 })?;
239
240 let mut triggered_count = 0;
241 let mut hcl_ctx = hcl::eval::Context::default();
242 hcl_ctx.declare_var("event", hcl_eval::json_to_hcl(payload.clone()));
243 hcl_ctx.declare_var(
244 "headers",
245 hcl_eval::json_to_hcl(
246 serde_json::to_value(
247 headers
248 .iter()
249 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default().to_string()))
250 .collect::<HashMap<_, _>>(),
251 )
252 .unwrap(),
253 ),
254 );
255
256 for rule in rules {
257 let re = regex::Regex::new(&rule.event_type_pattern).map_err(|e| {
259 tracing::error!(
260 "Invalid regex pattern '{}': {:?}",
261 rule.event_type_pattern,
262 e
263 );
264 StatusCode::INTERNAL_SERVER_ERROR
265 })?;
266 if !re.is_match(&event_type) {
267 continue;
268 }
269
270 if let Some(cond) = &rule.condition_expr {
272 match hcl_eval::evaluate_raw_expr(cond, &hcl_ctx) {
273 Ok(Value::Bool(true)) => {}
274 Ok(_) => continue,
275 Err(e) => {
276 tracing::error!("Rule '{}' condition evaluation failed: {:?}", rule.name, e);
277 continue;
278 }
279 }
280 }
281
282 let mut inputs = serde_json::Map::new();
284 for (name, expr) in rule.get_input_mappings() {
285 match hcl_eval::evaluate_raw_expr(&expr, &hcl_ctx) {
286 Ok(val) => {
287 inputs.insert(name, val);
288 }
289 Err(e) => {
290 tracing::error!(
291 "Rule '{}' input mapping for '{}' failed: {:?}",
292 rule.name,
293 name,
294 e
295 );
296 continue;
297 }
298 }
299 }
300
301 let run_id = stormchaser_model::RunId::new_v4();
303
304 tracing::info!(run_id = %run_id, "Enqueuing webhook workflow: {}", rule.workflow_name);
305 let fencing_token = Utc::now().timestamp_nanos_opt().unwrap_or(0);
306
307 let mut tx = state
308 .pool
309 .begin()
310 .await
311 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
312
313 db::insert_workflow_run(
314 &mut tx,
315 run_id,
316 &rule.workflow_name,
317 &format!("webhook:{}", webhook.name),
318 &rule.repo_url,
319 &rule.workflow_path,
320 &rule.git_ref,
321 RunStatus::Queued,
322 fencing_token,
323 )
324 .await
325 .map_err(|e| {
326 tracing::error!(run_id = %run_id, "Failed to insert workflow run: {:?}", e);
327 StatusCode::INTERNAL_SERVER_ERROR
328 })?;
329
330 db::insert_run_context(
331 &mut tx,
332 run_id,
333 "v1",
334 serde_json::json!({}),
335 "",
336 &Value::Object(inputs),
337 )
338 .await
339 .map_err(|e| {
340 tracing::error!(run_id = %run_id, "Failed to insert run context: {:?}", e);
341 StatusCode::INTERNAL_SERVER_ERROR
342 })?;
343
344 db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", "1h")
345 .await
346 .map_err(|e| {
347 tracing::error!(run_id = %run_id, "Failed to insert run quotas: {:?}", e);
348 StatusCode::INTERNAL_SERVER_ERROR
349 })?;
350
351 tx.commit()
352 .await
353 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
354
355 let event = WorkflowQueuedEvent {
356 run_id,
357 event_type: EventType::Workflow(WorkflowEventType::Queued),
358 timestamp: Utc::now(),
359 dsl: None,
360 inputs: None,
361 initiating_user: None,
362 };
363
364 publish_cloudevent(
365 &jetstream::new(state.nats.clone()),
366 NatsSubject::RunQueued,
367 EventType::Workflow(WorkflowEventType::Queued),
368 EventSource::System,
369 serde_json::to_value(event).unwrap(),
370 Some(SchemaVersion::new("1.0".to_string())),
371 None,
372 )
373 .await
374 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
375
376 triggered_count += 1;
377 tracing::info!(
378 "Triggered workflow '{}' (run {}) from rule '{}'",
379 rule.workflow_name,
380 run_id,
381 rule.name
382 );
383 }
384
385 Ok(Json(serde_json::json!({
386 "status": "ok",
387 "event_type": event_type,
388 "triggered_rules": triggered_count
389 })))
390}
391
392use stormchaser_model::hcl_eval;
393
394fn validate_github_signature(
395 headers: &HeaderMap,
396 body: &[u8],
397 secret: Option<&str>,
398) -> Result<(), StatusCode> {
399 let secret = match secret {
400 Some(s) => s,
401 None => {
402 tracing::debug!(
403 "No secret token configured for webhook, skipping signature validation"
404 );
405 return Ok(());
406 }
407 };
408
409 let signature = headers
410 .get("X-Hub-Signature-256")
411 .and_then(|h| h.to_str().ok())
412 .ok_or_else(|| {
413 tracing::warn!("Missing X-Hub-Signature-256 header");
414 StatusCode::UNAUTHORIZED
415 })?;
416
417 if !signature.starts_with("sha256=") {
418 tracing::warn!("Invalid signature format: {}", signature);
419 return Err(StatusCode::UNAUTHORIZED);
420 }
421
422 let signature_hex = &signature["sha256=".len()..];
423 let signature_bytes = hex::decode(signature_hex).map_err(|e| {
424 tracing::warn!("Failed to decode signature hex: {:?}", e);
425 StatusCode::UNAUTHORIZED
426 })?;
427
428 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).map_err(|e| {
429 tracing::error!("Failed to initialize HMAC: {:?}", e);
430 StatusCode::INTERNAL_SERVER_ERROR
431 })?;
432 mac.update(body);
433
434 if let Err(e) = mac.verify_slice(&signature_bytes) {
435 tracing::warn!("HMAC signature verification failed: {:?}", e);
436 return Err(StatusCode::UNAUTHORIZED);
437 }
438
439 Ok(())
440}