1use axum::body::Body;
2use axum::extract::Query;
3use axum::http::{HeaderMap, Request, StatusCode};
4use axum::response::IntoResponse;
5use chrono::{DateTime, Utc};
6use hmac::{Hmac, Mac};
7use serde::Deserialize;
8use serde_json::{Value, json};
9use sha2::Sha256;
10
11use crate::engine::runtime::IngressEnvelope;
12use crate::ingress::{
13 CanonicalAttachment, CanonicalButton, ProviderIds, build_canonical_payload,
14 canonical_session_key, default_metadata, empty_entities,
15};
16use crate::provider_core_only;
17use crate::routing::TenantRuntimeHandle;
18use crate::runner::ingress_util::{collect_body, mark_processed};
19
20type HmacSha256 = Hmac<Sha256>;
21
22pub async fn verify(Query(query): Query<VerifyQuery>) -> impl IntoResponse {
23 let expected = std::env::var("WHATSAPP_VERIFY_TOKEN").ok();
24 match (&query.mode, &query.challenge, &query.verify_token, expected) {
25 (Some(mode), Some(challenge), Some(token), Some(expected))
26 if mode == "subscribe" && token == &expected =>
27 {
28 (StatusCode::OK, challenge.clone())
29 }
30 _ => (StatusCode::FORBIDDEN, String::new()),
31 }
32}
33
34pub async fn webhook(
35 TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
36 request: Request<Body>,
37) -> Result<StatusCode, StatusCode> {
38 if provider_core_only::is_enabled() {
39 tracing::warn!("provider-core only mode enabled; blocking whatsapp webhook");
40 return Err(StatusCode::NOT_IMPLEMENTED);
41 }
42
43 let (parts, body) = request.into_parts();
44 let headers = parts.headers;
45 let bytes = collect_body(body).await?;
46 verify_signature(&headers, &bytes)?;
47
48 let raw_value: Value = serde_json::from_slice(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?;
49 let webhook: WhatsappWebhook =
50 serde_json::from_value(raw_value.clone()).map_err(|_| StatusCode::BAD_REQUEST)?;
51
52 let message = webhook
53 .entry
54 .iter()
55 .flat_map(|entry| &entry.changes)
56 .find_map(|change| {
57 change
58 .value
59 .messages
60 .as_ref()
61 .and_then(|msgs| msgs.first().cloned())
62 });
63
64 if message.is_none() {
65 return Ok(StatusCode::OK);
66 }
67 let message = message.unwrap();
68
69 if mark_processed(runtime.webhook_cache(), &message.id) {
70 return Ok(StatusCode::ACCEPTED);
71 }
72
73 let flow = runtime
74 .engine()
75 .flow_by_type("messaging")
76 .ok_or(StatusCode::NOT_FOUND)?;
77
78 let provider_ids = ProviderIds {
79 conversation_id: Some(message.from.clone()),
80 user_id: Some(message.from.clone()),
81 message_id: Some(message.id.clone()),
82 ..ProviderIds::default()
83 };
84 let session_key = canonical_session_key(&tenant, "whatsapp", &provider_ids);
85 let timestamp = parse_timestamp(message.timestamp.as_deref())?;
86
87 let (text, attachments, buttons, scopes) = map_message_content(&message);
88
89 let canonical_payload = build_canonical_payload(
90 &tenant,
91 "whatsapp",
92 &provider_ids,
93 session_key.clone(),
94 &scopes,
95 timestamp,
96 None,
97 text,
98 attachments,
99 buttons,
100 empty_entities(),
101 default_metadata(),
102 json!({ "type": message.r#type }),
103 raw_value,
104 );
105
106 let envelope = IngressEnvelope {
107 tenant,
108 env: None,
109 pack_id: Some(flow.pack_id.clone()),
110 flow_id: flow.id.clone(),
111 flow_type: Some(flow.flow_type.clone()),
112 action: Some("messaging".into()),
113 session_hint: Some(session_key),
114 provider: Some("whatsapp".into()),
115 channel: Some(message.from.clone()),
116 conversation: Some(message.from.clone()),
117 user: Some(message.from.clone()),
118 activity_id: Some(message.id.clone()),
119 timestamp: Some(timestamp.to_rfc3339()),
120 payload: canonical_payload,
121 metadata: None,
122 reply_scope: None,
123 }
124 .canonicalize();
125
126 runtime
127 .state_machine()
128 .handle(envelope)
129 .await
130 .map_err(|err| {
131 tracing::error!(error = %err, "whatsapp flow execution failed");
132 StatusCode::BAD_GATEWAY
133 })?;
134 Ok(StatusCode::ACCEPTED)
135}
136
137fn map_message_content(
138 message: &WhatsappMessage,
139) -> (Option<String>, Vec<Value>, Vec<Value>, Vec<String>) {
140 let mut attachments = Vec::new();
141 let mut buttons = Vec::new();
142 let mut scopes = vec!["chat".to_string()];
143 let mut text = message.text.as_ref().map(|text| text.body.clone());
144
145 match message.r#type.as_str() {
146 "image" => {
147 if let Some(image) = &message.image {
148 attachments.push(
149 CanonicalAttachment {
150 attachment_type: "image".into(),
151 name: image.caption.clone(),
152 mime: None,
153 size: None,
154 url: image.link.clone(),
155 data_inline_b64: None,
156 }
157 .into_value(),
158 );
159 }
160 }
161 "audio" => {
162 if let Some(audio) = &message.audio {
163 attachments.push(
164 CanonicalAttachment {
165 attachment_type: "audio".into(),
166 name: None,
167 mime: None,
168 size: None,
169 url: audio.link.clone(),
170 data_inline_b64: None,
171 }
172 .into_value(),
173 );
174 }
175 }
176 "video" => {
177 if let Some(video) = &message.video {
178 attachments.push(
179 CanonicalAttachment {
180 attachment_type: "video".into(),
181 name: video.caption.clone(),
182 mime: None,
183 size: None,
184 url: video.link.clone(),
185 data_inline_b64: None,
186 }
187 .into_value(),
188 );
189 }
190 }
191 "document" => {
192 if let Some(doc) = &message.document {
193 attachments.push(
194 CanonicalAttachment {
195 attachment_type: "file".into(),
196 name: doc.filename.clone(),
197 mime: None,
198 size: None,
199 url: doc.link.clone(),
200 data_inline_b64: None,
201 }
202 .into_value(),
203 );
204 }
205 }
206 "location" => {
207 if let Some(location) = &message.location {
208 text = Some(location.name.clone().unwrap_or_else(|| "location".into()));
209 }
210 }
211 "interactive" => {
212 if let Some(interactive) = &message.interactive {
213 if let Some(reply) = &interactive.button_reply {
214 buttons.push(
215 CanonicalButton {
216 id: reply.id.clone(),
217 title: reply.title.clone(),
218 payload: reply.id.clone(),
219 }
220 .into_value(),
221 );
222 text = Some(reply.title.clone());
223 } else if let Some(list) = &interactive.list_reply {
224 buttons.push(
225 CanonicalButton {
226 id: list.id.clone(),
227 title: list.title.clone(),
228 payload: list.id.clone(),
229 }
230 .into_value(),
231 );
232 text = Some(list.title.clone());
233 }
234 }
235 }
236 _ => {}
237 }
238
239 if !attachments.is_empty() {
240 scopes.push("attachments".into());
241 }
242 if !buttons.is_empty() {
243 scopes.push("buttons".into());
244 }
245
246 (text, attachments, buttons, scopes)
247}
248
249fn parse_timestamp(raw: Option<&str>) -> Result<DateTime<Utc>, StatusCode> {
250 if let Some(epoch) = raw.and_then(|value| value.parse::<i64>().ok()) {
251 return DateTime::from_timestamp(epoch, 0).ok_or(StatusCode::BAD_REQUEST);
252 }
253 Ok(Utc::now())
254}
255
256fn verify_signature(headers: &HeaderMap, body: &[u8]) -> Result<(), StatusCode> {
257 if let Ok(secret) = std::env::var("WHATSAPP_APP_SECRET") {
258 let signature = headers
259 .get("X-Hub-Signature-256")
260 .and_then(|value| value.to_str().ok())
261 .and_then(|value| value.strip_prefix("sha256="))
262 .ok_or(StatusCode::UNAUTHORIZED)?;
263 let mut mac =
264 HmacSha256::new_from_slice(secret.as_bytes()).map_err(|_| StatusCode::UNAUTHORIZED)?;
265 mac.update(body);
266 let expected = hex::encode(mac.finalize().into_bytes());
267 if !subtle_equals(signature, &expected) {
268 return Err(StatusCode::UNAUTHORIZED);
269 }
270 }
271 Ok(())
272}
273
274fn subtle_equals(a: &str, b: &str) -> bool {
275 if a.len() != b.len() {
276 return false;
277 }
278 let mut diff = 0u8;
279 for (x, y) in a.as_bytes().iter().zip(b.as_bytes()) {
280 diff |= x ^ y;
281 }
282 diff == 0
283}
284
285#[derive(Debug, Deserialize)]
286pub struct VerifyQuery {
287 #[serde(rename = "hub.mode")]
288 mode: Option<String>,
289 #[serde(rename = "hub.challenge")]
290 challenge: Option<String>,
291 #[serde(rename = "hub.verify_token")]
292 verify_token: Option<String>,
293}
294
295#[derive(Debug, Deserialize)]
296struct WhatsappWebhook {
297 entry: Vec<WhatsappEntry>,
298}
299
300#[derive(Debug, Deserialize)]
301struct WhatsappEntry {
302 changes: Vec<WhatsappChange>,
303}
304
305#[derive(Debug, Deserialize)]
306struct WhatsappChange {
307 value: WhatsappValue,
308}
309
310#[derive(Debug, Deserialize)]
311struct WhatsappValue {
312 #[serde(default)]
313 messages: Option<Vec<WhatsappMessage>>,
314}
315
316#[derive(Debug, Clone, Deserialize)]
317struct WhatsappMessage {
318 id: String,
319 #[serde(default)]
320 from: String,
321 #[serde(rename = "type")]
322 r#type: String,
323 #[serde(default)]
324 timestamp: Option<String>,
325 #[serde(default)]
326 text: Option<WhatsappText>,
327 #[serde(default)]
328 image: Option<WhatsappMedia>,
329 #[serde(default)]
330 audio: Option<WhatsappMedia>,
331 #[serde(default)]
332 video: Option<WhatsappMedia>,
333 #[serde(default)]
334 document: Option<WhatsappDocument>,
335 #[serde(default)]
336 location: Option<WhatsappLocation>,
337 #[serde(default)]
338 interactive: Option<WhatsappInteractive>,
339}
340
341#[derive(Debug, Clone, Deserialize)]
342struct WhatsappText {
343 body: String,
344}
345
346#[derive(Debug, Clone, Deserialize)]
347struct WhatsappMedia {
348 #[serde(default)]
349 link: Option<String>,
350 #[serde(default)]
351 caption: Option<String>,
352}
353
354#[derive(Debug, Clone, Deserialize)]
355struct WhatsappDocument {
356 #[serde(default)]
357 link: Option<String>,
358 #[serde(default)]
359 filename: Option<String>,
360}
361
362#[derive(Debug, Clone, Deserialize)]
363struct WhatsappLocation {
364 #[serde(default)]
365 name: Option<String>,
366}
367
368#[derive(Debug, Clone, Deserialize)]
369struct WhatsappInteractive {
370 #[serde(default)]
371 button_reply: Option<WhatsappButtonReply>,
372 #[serde(default)]
373 list_reply: Option<WhatsappListReply>,
374}
375
376#[derive(Debug, Clone, Deserialize)]
377struct WhatsappButtonReply {
378 id: String,
379 title: String,
380}
381
382#[derive(Debug, Clone, Deserialize)]
383struct WhatsappListReply {
384 id: String,
385 title: String,
386}
387
388#[cfg(test)]
389mod tests {
390 use super::*;
391 use serde_json::json;
392
393 #[test]
394 fn whatsapp_message_maps_to_canonical_payload() {
395 let raw = json!({
396 "entry": [{
397 "changes": [{
398 "value": {
399 "messages": [{
400 "id": "wamid.HBgM",
401 "from": "447700900123",
402 "timestamp": "1731315600",
403 "type": "text",
404 "text": { "body": "Hi" }
405 }]
406 }
407 }]
408 }]
409 });
410 let webhook: WhatsappWebhook = serde_json::from_value(raw.clone()).unwrap();
411 let message = webhook.entry[0].changes[0].value.messages.as_ref().unwrap()[0].clone();
412
413 let provider_ids = ProviderIds {
414 conversation_id: Some(message.from.clone()),
415 user_id: Some(message.from.clone()),
416 message_id: Some(message.id.clone()),
417 ..ProviderIds::default()
418 };
419 let session_key = canonical_session_key("zain-kuwait", "whatsapp", &provider_ids);
420 assert_eq!(
421 session_key,
422 "zain-kuwait:whatsapp:447700900123:447700900123"
423 );
424 let timestamp = parse_timestamp(message.timestamp.as_deref()).unwrap();
425 let (text, attachments, buttons, scopes) = map_message_content(&message);
426 let canonical = build_canonical_payload(
427 "zain-kuwait",
428 "whatsapp",
429 &provider_ids,
430 session_key,
431 &scopes,
432 timestamp,
433 None,
434 text,
435 attachments,
436 buttons,
437 empty_entities(),
438 default_metadata(),
439 json!({ "type": message.r#type }),
440 raw,
441 );
442
443 assert_eq!(canonical["provider"], json!("whatsapp"));
444 assert_eq!(
445 canonical["session"]["key"],
446 json!("zain-kuwait:whatsapp:447700900123:447700900123")
447 );
448 assert_eq!(canonical["text"], json!("Hi"));
449 assert_eq!(canonical["attachments"], json!([]));
450 assert_eq!(canonical["buttons"], json!([]));
451 }
452}