Skip to main content

worldinterface_daemon/routes/
webhooks.rs

1//! Webhook CRUD and invocation endpoints.
2
3use std::time::{SystemTime, UNIX_EPOCH};
4
5use axum::body::Bytes;
6use axum::extract::{Path, State};
7use axum::http::{HeaderMap, StatusCode};
8use axum::Json;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use worldinterface_core::flowspec::FlowSpec;
12use worldinterface_core::id::FlowRunId;
13use worldinterface_http_trigger::{validate_webhook_path, WebhookId, WebhookRegistration};
14
15use crate::error::ApiError;
16use crate::state::SharedState;
17
18pub fn register_routes(router: axum::Router<SharedState>) -> axum::Router<SharedState> {
19    router
20        .route("/api/v1/webhooks", axum::routing::post(register_webhook))
21        .route("/api/v1/webhooks", axum::routing::get(list_webhooks))
22        .route("/api/v1/webhooks/:id", axum::routing::get(get_webhook).delete(delete_webhook))
23        .route("/webhooks/*path", axum::routing::post(invoke_webhook))
24}
25
26// --- CRUD types ---
27
28#[derive(Deserialize)]
29pub struct RegisterWebhookRequest {
30    pub path: String,
31    pub flow_spec: FlowSpec,
32    #[serde(default)]
33    pub description: Option<String>,
34}
35
36#[derive(Serialize)]
37pub struct RegisterWebhookResponse {
38    pub webhook_id: WebhookId,
39    pub path: String,
40    pub invoke_url: String,
41}
42
43#[derive(Serialize)]
44pub struct ListWebhooksResponse {
45    pub webhooks: Vec<WebhookSummary>,
46}
47
48#[derive(Serialize)]
49pub struct WebhookSummary {
50    pub id: WebhookId,
51    pub path: String,
52    pub description: Option<String>,
53    pub created_at: u64,
54    pub invoke_url: String,
55}
56
57#[derive(Serialize)]
58pub struct WebhookInvokeResponse {
59    pub flow_run_id: FlowRunId,
60    pub receipt: Value,
61}
62
63// --- Handlers ---
64
65async fn register_webhook(
66    State(state): State<SharedState>,
67    Json(request): Json<RegisterWebhookRequest>,
68) -> Result<(StatusCode, Json<RegisterWebhookResponse>), ApiError> {
69    validate_webhook_path(&request.path)?;
70
71    // Validate FlowSpec — compile to verify it produces a valid AQ DAG.
72    // Compilation errors map to BadRequest via the From<HostError> impl.
73    request
74        .flow_spec
75        .validate()
76        .map_err(|e| ApiError::BadRequest(format!("invalid FlowSpec: {}", e)))?;
77
78    let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
79
80    let registration = WebhookRegistration {
81        id: WebhookId::new(),
82        path: request.path.clone(),
83        flow_spec: request.flow_spec,
84        description: request.description,
85        created_at: now,
86    };
87
88    let webhook_id = registration.id;
89    {
90        let mut registry = state.webhook_registry.write().unwrap();
91        registry.register(registration, state.host.context_store())?;
92    }
93
94    Ok((
95        StatusCode::CREATED,
96        Json(RegisterWebhookResponse {
97            webhook_id,
98            path: request.path.clone(),
99            invoke_url: format!("/webhooks/{}", request.path),
100        }),
101    ))
102}
103
104async fn list_webhooks(State(state): State<SharedState>) -> Json<ListWebhooksResponse> {
105    let registry = state.webhook_registry.read().unwrap();
106    let webhooks = registry
107        .list()
108        .into_iter()
109        .map(|r| WebhookSummary {
110            id: r.id,
111            path: r.path.clone(),
112            description: r.description.clone(),
113            created_at: r.created_at,
114            invoke_url: format!("/webhooks/{}", r.path),
115        })
116        .collect();
117    Json(ListWebhooksResponse { webhooks })
118}
119
120async fn get_webhook(
121    State(state): State<SharedState>,
122    Path(id_str): Path<String>,
123) -> Result<Json<WebhookRegistration>, ApiError> {
124    let id: WebhookId = id_str
125        .parse()
126        .map_err(|_| ApiError::BadRequest(format!("invalid webhook ID: {}", id_str)))?;
127    let registry = state.webhook_registry.read().unwrap();
128    let path = registry
129        .by_id_to_path(&id)
130        .ok_or_else(|| ApiError::NotFound(format!("webhook not found: {}", id)))?;
131    let registration = registry.get_by_path(path).expect("inconsistent registry");
132    Ok(Json(registration.clone()))
133}
134
135async fn delete_webhook(
136    State(state): State<SharedState>,
137    Path(id_str): Path<String>,
138) -> Result<StatusCode, ApiError> {
139    let id: WebhookId = id_str
140        .parse()
141        .map_err(|_| ApiError::BadRequest(format!("invalid webhook ID: {}", id_str)))?;
142    {
143        let mut registry = state.webhook_registry.write().unwrap();
144        registry.remove(id, state.host.context_store())?;
145    }
146    Ok(StatusCode::NO_CONTENT)
147}
148
149async fn invoke_webhook(
150    State(state): State<SharedState>,
151    Path(path): Path<String>,
152    headers: HeaderMap,
153    body: Bytes,
154) -> Result<(StatusCode, Json<WebhookInvokeResponse>), ApiError> {
155    // Record webhook invocation metric
156    state.metrics.collectors().webhook_invocations_total.with_label_values(&[&path]).inc();
157
158    match worldinterface_http_trigger::handle_webhook(
159        &path,
160        &body,
161        &headers,
162        None, // skip source_addr extraction for v1.0-alpha (H-6)
163        &state.host,
164        &state.webhook_registry,
165    )
166    .await
167    {
168        Ok((flow_run_id, receipt)) => {
169            Ok((StatusCode::ACCEPTED, Json(WebhookInvokeResponse { flow_run_id, receipt })))
170        }
171        Err(e) => {
172            state.metrics.collectors().webhook_errors_total.inc();
173            Err(ApiError::from(e))
174        }
175    }
176}