worldinterface_daemon/routes/
webhooks.rs1use std::time::{SystemTime, UNIX_EPOCH};
4
5use axum::body::Bytes;
6use axum::extract::{Path, State};
7use axum::http::{HeaderMap, StatusCode};
8use axum::Json;
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use worldinterface_core::flowspec::FlowSpec;
12use worldinterface_core::id::FlowRunId;
13use worldinterface_http_trigger::{validate_webhook_path, WebhookId, WebhookRegistration};
14
15use crate::error::ApiError;
16use crate::state::SharedState;
17
18pub fn register_routes(router: axum::Router<SharedState>) -> axum::Router<SharedState> {
19 router
20 .route("/api/v1/webhooks", axum::routing::post(register_webhook))
21 .route("/api/v1/webhooks", axum::routing::get(list_webhooks))
22 .route("/api/v1/webhooks/:id", axum::routing::get(get_webhook).delete(delete_webhook))
23 .route("/webhooks/*path", axum::routing::post(invoke_webhook))
24}
25
26#[derive(Deserialize)]
29pub struct RegisterWebhookRequest {
30 pub path: String,
31 pub flow_spec: FlowSpec,
32 #[serde(default)]
33 pub description: Option<String>,
34}
35
36#[derive(Serialize)]
37pub struct RegisterWebhookResponse {
38 pub webhook_id: WebhookId,
39 pub path: String,
40 pub invoke_url: String,
41}
42
43#[derive(Serialize)]
44pub struct ListWebhooksResponse {
45 pub webhooks: Vec<WebhookSummary>,
46}
47
48#[derive(Serialize)]
49pub struct WebhookSummary {
50 pub id: WebhookId,
51 pub path: String,
52 pub description: Option<String>,
53 pub created_at: u64,
54 pub invoke_url: String,
55}
56
57#[derive(Serialize)]
58pub struct WebhookInvokeResponse {
59 pub flow_run_id: FlowRunId,
60 pub receipt: Value,
61}
62
63async fn register_webhook(
66 State(state): State<SharedState>,
67 Json(request): Json<RegisterWebhookRequest>,
68) -> Result<(StatusCode, Json<RegisterWebhookResponse>), ApiError> {
69 validate_webhook_path(&request.path)?;
70
71 request
74 .flow_spec
75 .validate()
76 .map_err(|e| ApiError::BadRequest(format!("invalid FlowSpec: {}", e)))?;
77
78 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
79
80 let registration = WebhookRegistration {
81 id: WebhookId::new(),
82 path: request.path.clone(),
83 flow_spec: request.flow_spec,
84 description: request.description,
85 created_at: now,
86 };
87
88 let webhook_id = registration.id;
89 {
90 let mut registry = state.webhook_registry.write().unwrap();
91 registry.register(registration, state.host.context_store())?;
92 }
93
94 Ok((
95 StatusCode::CREATED,
96 Json(RegisterWebhookResponse {
97 webhook_id,
98 path: request.path.clone(),
99 invoke_url: format!("/webhooks/{}", request.path),
100 }),
101 ))
102}
103
104async fn list_webhooks(State(state): State<SharedState>) -> Json<ListWebhooksResponse> {
105 let registry = state.webhook_registry.read().unwrap();
106 let webhooks = registry
107 .list()
108 .into_iter()
109 .map(|r| WebhookSummary {
110 id: r.id,
111 path: r.path.clone(),
112 description: r.description.clone(),
113 created_at: r.created_at,
114 invoke_url: format!("/webhooks/{}", r.path),
115 })
116 .collect();
117 Json(ListWebhooksResponse { webhooks })
118}
119
120async fn get_webhook(
121 State(state): State<SharedState>,
122 Path(id_str): Path<String>,
123) -> Result<Json<WebhookRegistration>, ApiError> {
124 let id: WebhookId = id_str
125 .parse()
126 .map_err(|_| ApiError::BadRequest(format!("invalid webhook ID: {}", id_str)))?;
127 let registry = state.webhook_registry.read().unwrap();
128 let path = registry
129 .by_id_to_path(&id)
130 .ok_or_else(|| ApiError::NotFound(format!("webhook not found: {}", id)))?;
131 let registration = registry.get_by_path(path).expect("inconsistent registry");
132 Ok(Json(registration.clone()))
133}
134
135async fn delete_webhook(
136 State(state): State<SharedState>,
137 Path(id_str): Path<String>,
138) -> Result<StatusCode, ApiError> {
139 let id: WebhookId = id_str
140 .parse()
141 .map_err(|_| ApiError::BadRequest(format!("invalid webhook ID: {}", id_str)))?;
142 {
143 let mut registry = state.webhook_registry.write().unwrap();
144 registry.remove(id, state.host.context_store())?;
145 }
146 Ok(StatusCode::NO_CONTENT)
147}
148
149async fn invoke_webhook(
150 State(state): State<SharedState>,
151 Path(path): Path<String>,
152 headers: HeaderMap,
153 body: Bytes,
154) -> Result<(StatusCode, Json<WebhookInvokeResponse>), ApiError> {
155 state.metrics.collectors().webhook_invocations_total.with_label_values(&[&path]).inc();
157
158 match worldinterface_http_trigger::handle_webhook(
159 &path,
160 &body,
161 &headers,
162 None, &state.host,
164 &state.webhook_registry,
165 )
166 .await
167 {
168 Ok((flow_run_id, receipt)) => {
169 Ok((StatusCode::ACCEPTED, Json(WebhookInvokeResponse { flow_run_id, receipt })))
170 }
171 Err(e) => {
172 state.metrics.collectors().webhook_errors_total.inc();
173 Err(ApiError::from(e))
174 }
175 }
176}