1use axum::body::Body;
2use axum::extract::Query;
3use axum::http::{HeaderMap, Request, StatusCode};
4use axum::response::IntoResponse;
5use chrono::{DateTime, Utc};
6use hmac::{Hmac, Mac};
7use serde::Deserialize;
8use serde_json::{Value, json};
9use sha2::Sha256;
10
11use crate::engine::runtime::IngressEnvelope;
12use crate::ingress::{
13 CanonicalAttachment, CanonicalButton, ProviderIds, build_canonical_payload,
14 canonical_session_key, default_metadata, empty_entities,
15};
16use crate::provider_core_only;
17use crate::routing::TenantRuntimeHandle;
18use crate::runner::ingress_util::{collect_body, mark_processed};
19
20type HmacSha256 = Hmac<Sha256>;
21
22pub async fn verify(Query(query): Query<VerifyQuery>) -> impl IntoResponse {
23 let expected = std::env::var("WHATSAPP_VERIFY_TOKEN").ok();
24 match (&query.mode, &query.challenge, &query.verify_token, expected) {
25 (Some(mode), Some(challenge), Some(token), Some(expected))
26 if mode == "subscribe" && token == &expected =>
27 {
28 (StatusCode::OK, challenge.clone())
29 }
30 _ => (StatusCode::FORBIDDEN, String::new()),
31 }
32}
33
34pub async fn webhook(
35 TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
36 request: Request<Body>,
37) -> Result<StatusCode, StatusCode> {
38 if provider_core_only::is_enabled() {
39 tracing::warn!("provider-core only mode enabled; blocking whatsapp webhook");
40 return Err(StatusCode::NOT_IMPLEMENTED);
41 }
42
43 let (parts, body) = request.into_parts();
44 let headers = parts.headers;
45 let bytes = collect_body(body).await?;
46 verify_signature(&headers, &bytes)?;
47
48 let raw_value: Value = serde_json::from_slice(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?;
49 let webhook: WhatsappWebhook =
50 serde_json::from_value(raw_value.clone()).map_err(|_| StatusCode::BAD_REQUEST)?;
51
52 let message = webhook
53 .entry
54 .iter()
55 .flat_map(|entry| &entry.changes)
56 .find_map(|change| {
57 change
58 .value
59 .messages
60 .as_ref()
61 .and_then(|msgs| msgs.first().cloned())
62 });
63
64 if message.is_none() {
65 return Ok(StatusCode::OK);
66 }
67 let message = message.unwrap();
68
69 if mark_processed(runtime.webhook_cache(), &message.id) {
70 return Ok(StatusCode::ACCEPTED);
71 }
72
73 let flow = runtime
74 .engine()
75 .flow_by_type("messaging")
76 .ok_or(StatusCode::NOT_FOUND)?;
77
78 let provider_ids = ProviderIds {
79 conversation_id: Some(message.from.clone()),
80 user_id: Some(message.from.clone()),
81 message_id: Some(message.id.clone()),
82 ..ProviderIds::default()
83 };
84 let session_key = canonical_session_key(&tenant, "whatsapp", &provider_ids);
85 let timestamp = parse_timestamp(message.timestamp.as_deref())?;
86
87 let (text, attachments, buttons, scopes) = map_message_content(&message);
88
89 let canonical_payload = build_canonical_payload(
90 &tenant,
91 "whatsapp",
92 &provider_ids,
93 session_key.clone(),
94 &scopes,
95 timestamp,
96 None,
97 text,
98 attachments,
99 buttons,
100 empty_entities(),
101 default_metadata(),
102 json!({ "type": message.r#type }),
103 raw_value,
104 );
105
106 let envelope = IngressEnvelope {
107 tenant,
108 env: None,
109 flow_id: flow.id.clone(),
110 flow_type: Some(flow.flow_type.clone()),
111 action: Some("messaging".into()),
112 session_hint: Some(session_key),
113 provider: Some("whatsapp".into()),
114 channel: Some(message.from.clone()),
115 conversation: Some(message.from.clone()),
116 user: Some(message.from.clone()),
117 activity_id: Some(message.id.clone()),
118 timestamp: Some(timestamp.to_rfc3339()),
119 payload: canonical_payload,
120 metadata: None,
121 }
122 .canonicalize();
123
124 runtime
125 .state_machine()
126 .handle(envelope)
127 .await
128 .map_err(|err| {
129 tracing::error!(error = %err, "whatsapp flow execution failed");
130 StatusCode::BAD_GATEWAY
131 })?;
132 Ok(StatusCode::ACCEPTED)
133}
134
135fn map_message_content(
136 message: &WhatsappMessage,
137) -> (Option<String>, Vec<Value>, Vec<Value>, Vec<String>) {
138 let mut attachments = Vec::new();
139 let mut buttons = Vec::new();
140 let mut scopes = vec!["chat".to_string()];
141 let mut text = message.text.as_ref().map(|text| text.body.clone());
142
143 match message.r#type.as_str() {
144 "image" => {
145 if let Some(image) = &message.image {
146 attachments.push(
147 CanonicalAttachment {
148 attachment_type: "image".into(),
149 name: image.caption.clone(),
150 mime: None,
151 size: None,
152 url: image.link.clone(),
153 data_inline_b64: None,
154 }
155 .into_value(),
156 );
157 }
158 }
159 "audio" => {
160 if let Some(audio) = &message.audio {
161 attachments.push(
162 CanonicalAttachment {
163 attachment_type: "audio".into(),
164 name: None,
165 mime: None,
166 size: None,
167 url: audio.link.clone(),
168 data_inline_b64: None,
169 }
170 .into_value(),
171 );
172 }
173 }
174 "video" => {
175 if let Some(video) = &message.video {
176 attachments.push(
177 CanonicalAttachment {
178 attachment_type: "video".into(),
179 name: video.caption.clone(),
180 mime: None,
181 size: None,
182 url: video.link.clone(),
183 data_inline_b64: None,
184 }
185 .into_value(),
186 );
187 }
188 }
189 "document" => {
190 if let Some(doc) = &message.document {
191 attachments.push(
192 CanonicalAttachment {
193 attachment_type: "file".into(),
194 name: doc.filename.clone(),
195 mime: None,
196 size: None,
197 url: doc.link.clone(),
198 data_inline_b64: None,
199 }
200 .into_value(),
201 );
202 }
203 }
204 "location" => {
205 if let Some(location) = &message.location {
206 text = Some(location.name.clone().unwrap_or_else(|| "location".into()));
207 }
208 }
209 "interactive" => {
210 if let Some(interactive) = &message.interactive {
211 if let Some(reply) = &interactive.button_reply {
212 buttons.push(
213 CanonicalButton {
214 id: reply.id.clone(),
215 title: reply.title.clone(),
216 payload: reply.id.clone(),
217 }
218 .into_value(),
219 );
220 text = Some(reply.title.clone());
221 } else if let Some(list) = &interactive.list_reply {
222 buttons.push(
223 CanonicalButton {
224 id: list.id.clone(),
225 title: list.title.clone(),
226 payload: list.id.clone(),
227 }
228 .into_value(),
229 );
230 text = Some(list.title.clone());
231 }
232 }
233 }
234 _ => {}
235 }
236
237 if !attachments.is_empty() {
238 scopes.push("attachments".into());
239 }
240 if !buttons.is_empty() {
241 scopes.push("buttons".into());
242 }
243
244 (text, attachments, buttons, scopes)
245}
246
247fn parse_timestamp(raw: Option<&str>) -> Result<DateTime<Utc>, StatusCode> {
248 if let Some(epoch) = raw.and_then(|value| value.parse::<i64>().ok()) {
249 return DateTime::from_timestamp(epoch, 0).ok_or(StatusCode::BAD_REQUEST);
250 }
251 Ok(Utc::now())
252}
253
254fn verify_signature(headers: &HeaderMap, body: &[u8]) -> Result<(), StatusCode> {
255 if let Ok(secret) = std::env::var("WHATSAPP_APP_SECRET") {
256 let signature = headers
257 .get("X-Hub-Signature-256")
258 .and_then(|value| value.to_str().ok())
259 .and_then(|value| value.strip_prefix("sha256="))
260 .ok_or(StatusCode::UNAUTHORIZED)?;
261 let mut mac =
262 HmacSha256::new_from_slice(secret.as_bytes()).map_err(|_| StatusCode::UNAUTHORIZED)?;
263 mac.update(body);
264 let expected = hex::encode(mac.finalize().into_bytes());
265 if !subtle_equals(signature, &expected) {
266 return Err(StatusCode::UNAUTHORIZED);
267 }
268 }
269 Ok(())
270}
271
272fn subtle_equals(a: &str, b: &str) -> bool {
273 if a.len() != b.len() {
274 return false;
275 }
276 let mut diff = 0u8;
277 for (x, y) in a.as_bytes().iter().zip(b.as_bytes()) {
278 diff |= x ^ y;
279 }
280 diff == 0
281}
282
283#[derive(Debug, Deserialize)]
284pub struct VerifyQuery {
285 #[serde(rename = "hub.mode")]
286 mode: Option<String>,
287 #[serde(rename = "hub.challenge")]
288 challenge: Option<String>,
289 #[serde(rename = "hub.verify_token")]
290 verify_token: Option<String>,
291}
292
293#[derive(Debug, Deserialize)]
294struct WhatsappWebhook {
295 entry: Vec<WhatsappEntry>,
296}
297
298#[derive(Debug, Deserialize)]
299struct WhatsappEntry {
300 changes: Vec<WhatsappChange>,
301}
302
303#[derive(Debug, Deserialize)]
304struct WhatsappChange {
305 value: WhatsappValue,
306}
307
308#[derive(Debug, Deserialize)]
309struct WhatsappValue {
310 #[serde(default)]
311 messages: Option<Vec<WhatsappMessage>>,
312}
313
314#[derive(Debug, Clone, Deserialize)]
315struct WhatsappMessage {
316 id: String,
317 #[serde(default)]
318 from: String,
319 #[serde(rename = "type")]
320 r#type: String,
321 #[serde(default)]
322 timestamp: Option<String>,
323 #[serde(default)]
324 text: Option<WhatsappText>,
325 #[serde(default)]
326 image: Option<WhatsappMedia>,
327 #[serde(default)]
328 audio: Option<WhatsappMedia>,
329 #[serde(default)]
330 video: Option<WhatsappMedia>,
331 #[serde(default)]
332 document: Option<WhatsappDocument>,
333 #[serde(default)]
334 location: Option<WhatsappLocation>,
335 #[serde(default)]
336 interactive: Option<WhatsappInteractive>,
337}
338
339#[derive(Debug, Clone, Deserialize)]
340struct WhatsappText {
341 body: String,
342}
343
344#[derive(Debug, Clone, Deserialize)]
345struct WhatsappMedia {
346 #[serde(default)]
347 link: Option<String>,
348 #[serde(default)]
349 caption: Option<String>,
350}
351
352#[derive(Debug, Clone, Deserialize)]
353struct WhatsappDocument {
354 #[serde(default)]
355 link: Option<String>,
356 #[serde(default)]
357 filename: Option<String>,
358}
359
360#[derive(Debug, Clone, Deserialize)]
361struct WhatsappLocation {
362 #[serde(default)]
363 name: Option<String>,
364}
365
366#[derive(Debug, Clone, Deserialize)]
367struct WhatsappInteractive {
368 #[serde(default)]
369 button_reply: Option<WhatsappButtonReply>,
370 #[serde(default)]
371 list_reply: Option<WhatsappListReply>,
372}
373
374#[derive(Debug, Clone, Deserialize)]
375struct WhatsappButtonReply {
376 id: String,
377 title: String,
378}
379
380#[derive(Debug, Clone, Deserialize)]
381struct WhatsappListReply {
382 id: String,
383 title: String,
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use serde_json::json;
390
391 #[test]
392 fn whatsapp_message_maps_to_canonical_payload() {
393 let raw = json!({
394 "entry": [{
395 "changes": [{
396 "value": {
397 "messages": [{
398 "id": "wamid.HBgM",
399 "from": "447700900123",
400 "timestamp": "1731315600",
401 "type": "text",
402 "text": { "body": "Hi" }
403 }]
404 }
405 }]
406 }]
407 });
408 let webhook: WhatsappWebhook = serde_json::from_value(raw.clone()).unwrap();
409 let message = webhook.entry[0].changes[0].value.messages.as_ref().unwrap()[0].clone();
410
411 let provider_ids = ProviderIds {
412 conversation_id: Some(message.from.clone()),
413 user_id: Some(message.from.clone()),
414 message_id: Some(message.id.clone()),
415 ..ProviderIds::default()
416 };
417 let session_key = canonical_session_key("zain-kuwait", "whatsapp", &provider_ids);
418 assert_eq!(
419 session_key,
420 "zain-kuwait:whatsapp:447700900123:447700900123"
421 );
422 let timestamp = parse_timestamp(message.timestamp.as_deref()).unwrap();
423 let (text, attachments, buttons, scopes) = map_message_content(&message);
424 let canonical = build_canonical_payload(
425 "zain-kuwait",
426 "whatsapp",
427 &provider_ids,
428 session_key,
429 &scopes,
430 timestamp,
431 None,
432 text,
433 attachments,
434 buttons,
435 empty_entities(),
436 default_metadata(),
437 json!({ "type": message.r#type }),
438 raw,
439 );
440
441 assert_eq!(canonical["provider"], json!("whatsapp"));
442 assert_eq!(
443 canonical["session"]["key"],
444 json!("zain-kuwait:whatsapp:447700900123:447700900123")
445 );
446 assert_eq!(canonical["text"], json!("Hi"));
447 assert_eq!(canonical["attachments"], json!([]));
448 assert_eq!(canonical["buttons"], json!([]));
449 }
450}