use super::{CreateEventRuleRequest, CreateWebhookRequest};
use crate::{AppState, AuthClaims};
use axum::{
body::Bytes,
extract::{Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
Json,
};
use hmac::{Hmac, Mac};
use serde_json::Value;
use sha2::Sha256;
use std::collections::HashMap;
use stormchaser_model::event_rules::WebhookConfig;
use stormchaser_model::workflow::RunStatus;
use uuid::Uuid;
pub async fn create_webhook(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
Json(payload): Json<CreateWebhookRequest>,
) -> Result<impl IntoResponse, StatusCode> {
let id = Uuid::new_v4();
sqlx::query(
"INSERT INTO webhooks (id, name, description, source_type, secret_token) VALUES ($1, $2, $3, $4, $5)"
)
.bind(id)
.bind(&payload.name)
.bind(&payload.description)
.bind(&payload.source_type)
.bind(&payload.secret_token)
.execute(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to create webhook: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok((StatusCode::CREATED, Json(serde_json::json!({ "id": id }))))
}
pub async fn list_webhooks(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
) -> Result<impl IntoResponse, StatusCode> {
let webhooks = crate::db::list_webhooks(&state.pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(webhooks))
}
pub async fn get_webhook(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, StatusCode> {
let webhook = crate::db::get_webhook(&state.pool, id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(webhook))
}
pub async fn delete_webhook(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, StatusCode> {
crate::db::delete_webhook(&state.pool, id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::NO_CONTENT)
}
pub async fn create_event_rule(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
Json(payload): Json<CreateEventRuleRequest>,
) -> Result<impl IntoResponse, StatusCode> {
let id = Uuid::new_v4();
crate::db::create_event_rule(
&state.pool,
id,
&payload.name,
&payload.description,
Some(payload.webhook_id),
&payload.event_type_pattern,
&payload.condition_expr,
&payload.workflow_name,
&payload.repo_url,
&payload.workflow_path,
&payload.git_ref,
serde_json::to_value(&payload.input_mappings).unwrap_or(serde_json::json!({})),
)
.await
.map_err(|e| {
tracing::error!("Failed to create rule: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok((StatusCode::CREATED, Json(serde_json::json!({ "id": id }))))
}
pub async fn list_event_rules(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
) -> Result<impl IntoResponse, StatusCode> {
let rules = crate::db::list_event_rules(&state.pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(rules))
}
pub async fn delete_event_rule(
AuthClaims(_claims): AuthClaims,
State(state): State<AppState>,
Path(id): Path<Uuid>,
) -> Result<impl IntoResponse, StatusCode> {
crate::db::delete_event_rule(&state.pool, id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::NO_CONTENT)
}
pub async fn handle_webhook(
Path(webhook_id): Path<Uuid>,
headers: HeaderMap,
State(state): State<AppState>,
body: Bytes,
) -> Result<impl IntoResponse, StatusCode> {
let webhook: WebhookConfig =
sqlx::query_as("SELECT * FROM webhooks WHERE id = $1 AND is_active = TRUE")
.bind(webhook_id)
.fetch_optional(&state.pool)
.await
.map_err(|e| {
tracing::error!("Failed to fetch webhook: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.ok_or(StatusCode::NOT_FOUND)?;
let payload: Value = serde_json::from_slice(&body).map_err(|_| StatusCode::BAD_REQUEST)?;
let event_type = match webhook.source_type.as_str() {
"github" => {
validate_github_signature(&headers, &body, webhook.secret_token.as_deref())?;
headers
.get("X-GitHub-Event")
.and_then(|h| h.to_str().ok())
.unwrap_or("unknown")
.to_string()
}
"generic" => "generic".to_string(),
_ => return Err(StatusCode::NOT_IMPLEMENTED),
};
tracing::info!(
"Received webhook event '{}' for webhook '{}' ({})",
event_type,
webhook.name,
webhook.id
);
let rules = crate::db::get_active_event_rules_by_webhook(&state.pool, webhook_id)
.await
.map_err(|e| {
tracing::error!("Failed to fetch rules: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let mut triggered_count = 0;
let mut hcl_ctx = hcl::eval::Context::default();
hcl_ctx.declare_var("event", hcl_eval::json_to_hcl(payload.clone()));
hcl_ctx.declare_var(
"headers",
hcl_eval::json_to_hcl(
serde_json::to_value(
headers
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or_default().to_string()))
.collect::<HashMap<_, _>>(),
)
.unwrap(),
),
);
for rule in rules {
let re = regex::Regex::new(&rule.event_type_pattern).map_err(|e| {
tracing::error!(
"Invalid regex pattern '{}': {:?}",
rule.event_type_pattern,
e
);
StatusCode::INTERNAL_SERVER_ERROR
})?;
if !re.is_match(&event_type) {
continue;
}
if let Some(cond) = &rule.condition_expr {
match hcl_eval::evaluate_raw_expr(cond, &hcl_ctx) {
Ok(Value::Bool(true)) => {}
Ok(_) => continue,
Err(e) => {
tracing::error!("Rule '{}' condition evaluation failed: {:?}", rule.name, e);
continue;
}
}
}
let mut inputs = serde_json::Map::new();
for (name, expr) in rule.get_input_mappings() {
match hcl_eval::evaluate_raw_expr(&expr, &hcl_ctx) {
Ok(val) => {
inputs.insert(name, val);
}
Err(e) => {
tracing::error!(
"Rule '{}' input mapping for '{}' failed: {:?}",
rule.name,
name,
e
);
continue;
}
}
}
let run_id = Uuid::new_v4();
tracing::info!(run_id = %run_id, "Enqueuing webhook workflow: {}", rule.workflow_name);
let fencing_token = chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0);
let mut tx = state
.pool
.begin()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
crate::db::insert_workflow_run(
&mut tx,
run_id,
&rule.workflow_name,
&format!("webhook:{}", webhook.name),
&rule.repo_url,
&rule.workflow_path,
&rule.git_ref,
RunStatus::Queued,
fencing_token,
)
.await
.map_err(|e| {
tracing::error!(run_id = %run_id, "Failed to insert workflow run: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
crate::db::insert_run_context(
&mut tx,
run_id,
"v1",
serde_json::json!({}),
"",
&Value::Object(inputs),
)
.await
.map_err(|e| {
tracing::error!(run_id = %run_id, "Failed to insert run context: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
crate::db::insert_run_quotas(&mut tx, run_id, 10, "1", "4Gi", "10Gi", "1h")
.await
.map_err(|e| {
tracing::error!(run_id = %run_id, "Failed to insert run quotas: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
tx.commit()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let event = serde_json::json!({
"run_id": run_id,
"event_type": "workflow_queued",
"timestamp": chrono::Utc::now(),
});
state
.nats
.publish("stormchaser.run.queued", event.to_string().into())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
triggered_count += 1;
tracing::info!(
"Triggered workflow '{}' (run {}) from rule '{}'",
rule.workflow_name,
run_id,
rule.name
);
}
Ok(Json(serde_json::json!({
"status": "ok",
"event_type": event_type,
"triggered_rules": triggered_count
})))
}
use stormchaser_model::hcl_eval;
fn validate_github_signature(
headers: &HeaderMap,
body: &[u8],
secret: Option<&str>,
) -> Result<(), StatusCode> {
let secret = match secret {
Some(s) => s,
None => {
tracing::debug!(
"No secret token configured for webhook, skipping signature validation"
);
return Ok(());
}
};
let signature = headers
.get("X-Hub-Signature-256")
.and_then(|h| h.to_str().ok())
.ok_or_else(|| {
tracing::warn!("Missing X-Hub-Signature-256 header");
StatusCode::UNAUTHORIZED
})?;
if !signature.starts_with("sha256=") {
tracing::warn!("Invalid signature format: {}", signature);
return Err(StatusCode::UNAUTHORIZED);
}
let signature_hex = &signature["sha256=".len()..];
let signature_bytes = hex::decode(signature_hex).map_err(|e| {
tracing::warn!("Failed to decode signature hex: {:?}", e);
StatusCode::UNAUTHORIZED
})?;
let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).map_err(|e| {
tracing::error!("Failed to initialize HMAC: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
mac.update(body);
if let Err(e) = mac.verify_slice(&signature_bytes) {
tracing::warn!("HMAC signature verification failed: {:?}", e);
return Err(StatusCode::UNAUTHORIZED);
}
Ok(())
}