Skip to main content

greentic_runner_host/runner/
adapt_whatsapp.rs

1use axum::body::Body;
2use axum::extract::Query;
3use axum::http::{HeaderMap, Request, StatusCode};
4use axum::response::IntoResponse;
5use chrono::{DateTime, Utc};
6use hmac::{Hmac, Mac};
7use serde::Deserialize;
8use serde_json::{Value, json};
9use sha2::Sha256;
10
11use crate::engine::runtime::IngressEnvelope;
12use crate::ingress::{
13    CanonicalAttachment, CanonicalButton, ProviderIds, build_canonical_payload,
14    canonical_session_key, default_metadata, empty_entities,
15};
16use crate::provider_core_only;
17use crate::routing::TenantRuntimeHandle;
18use crate::runner::ingress_util::{collect_body, mark_processed};
19
20type HmacSha256 = Hmac<Sha256>;
21
22pub async fn verify(Query(query): Query<VerifyQuery>) -> impl IntoResponse {
23    let expected = std::env::var("WHATSAPP_VERIFY_TOKEN").ok();
24    match (&query.mode, &query.challenge, &query.verify_token, expected) {
25        (Some(mode), Some(challenge), Some(token), Some(expected))
26            if mode == "subscribe" && token == &expected =>
27        {
28            (StatusCode::OK, challenge.clone())
29        }
30        _ => (StatusCode::FORBIDDEN, String::new()),
31    }
32}
33
34pub async fn webhook(
35    TenantRuntimeHandle { tenant, runtime }: TenantRuntimeHandle,
36    request: Request<Body>,
37) -> Result<StatusCode, StatusCode> {
38    if provider_core_only::is_enabled() {
39        tracing::warn!("provider-core only mode enabled; blocking whatsapp webhook");
40        return Err(StatusCode::NOT_IMPLEMENTED);
41    }
42
43    let (parts, body) = request.into_parts();
44    let headers = parts.headers;
45    let bytes = collect_body(body).await?;
46    verify_signature(&headers, &bytes)?;
47
48    let raw_value: Value = serde_json::from_slice(&bytes).map_err(|_| StatusCode::BAD_REQUEST)?;
49    let webhook: WhatsappWebhook =
50        serde_json::from_value(raw_value.clone()).map_err(|_| StatusCode::BAD_REQUEST)?;
51
52    let message = webhook
53        .entry
54        .iter()
55        .flat_map(|entry| &entry.changes)
56        .find_map(|change| {
57            change
58                .value
59                .messages
60                .as_ref()
61                .and_then(|msgs| msgs.first().cloned())
62        });
63
64    if message.is_none() {
65        return Ok(StatusCode::OK);
66    }
67    let message = message.unwrap();
68
69    if mark_processed(runtime.webhook_cache(), &message.id) {
70        return Ok(StatusCode::ACCEPTED);
71    }
72
73    let flow = runtime
74        .engine()
75        .flow_by_type("messaging")
76        .ok_or(StatusCode::NOT_FOUND)?;
77
78    let provider_ids = ProviderIds {
79        conversation_id: Some(message.from.clone()),
80        user_id: Some(message.from.clone()),
81        message_id: Some(message.id.clone()),
82        ..ProviderIds::default()
83    };
84    let session_key = canonical_session_key(&tenant, "whatsapp", &provider_ids);
85    let timestamp = parse_timestamp(message.timestamp.as_deref())?;
86
87    let (text, attachments, buttons, scopes) = map_message_content(&message);
88
89    let canonical_payload = build_canonical_payload(
90        &tenant,
91        "whatsapp",
92        &provider_ids,
93        session_key.clone(),
94        &scopes,
95        timestamp,
96        None,
97        text,
98        attachments,
99        buttons,
100        empty_entities(),
101        default_metadata(),
102        json!({ "type": message.r#type }),
103        raw_value,
104    );
105
106    let envelope = IngressEnvelope {
107        tenant,
108        env: None,
109        pack_id: Some(flow.pack_id.clone()),
110        flow_id: flow.id.clone(),
111        flow_type: Some(flow.flow_type.clone()),
112        action: Some("messaging".into()),
113        session_hint: Some(session_key),
114        provider: Some("whatsapp".into()),
115        channel: Some(message.from.clone()),
116        conversation: Some(message.from.clone()),
117        user: Some(message.from.clone()),
118        activity_id: Some(message.id.clone()),
119        timestamp: Some(timestamp.to_rfc3339()),
120        payload: canonical_payload,
121        metadata: None,
122        reply_scope: None,
123    }
124    .canonicalize();
125
126    runtime
127        .state_machine()
128        .handle(envelope)
129        .await
130        .map_err(|err| {
131            tracing::error!(error = %err, "whatsapp flow execution failed");
132            StatusCode::BAD_GATEWAY
133        })?;
134    Ok(StatusCode::ACCEPTED)
135}
136
137fn map_message_content(
138    message: &WhatsappMessage,
139) -> (Option<String>, Vec<Value>, Vec<Value>, Vec<String>) {
140    let mut attachments = Vec::new();
141    let mut buttons = Vec::new();
142    let mut scopes = vec!["chat".to_string()];
143    let mut text = message.text.as_ref().map(|text| text.body.clone());
144
145    match message.r#type.as_str() {
146        "image" => {
147            if let Some(image) = &message.image {
148                attachments.push(
149                    CanonicalAttachment {
150                        attachment_type: "image".into(),
151                        name: image.caption.clone(),
152                        mime: None,
153                        size: None,
154                        url: image.link.clone(),
155                        data_inline_b64: None,
156                    }
157                    .into_value(),
158                );
159            }
160        }
161        "audio" => {
162            if let Some(audio) = &message.audio {
163                attachments.push(
164                    CanonicalAttachment {
165                        attachment_type: "audio".into(),
166                        name: None,
167                        mime: None,
168                        size: None,
169                        url: audio.link.clone(),
170                        data_inline_b64: None,
171                    }
172                    .into_value(),
173                );
174            }
175        }
176        "video" => {
177            if let Some(video) = &message.video {
178                attachments.push(
179                    CanonicalAttachment {
180                        attachment_type: "video".into(),
181                        name: video.caption.clone(),
182                        mime: None,
183                        size: None,
184                        url: video.link.clone(),
185                        data_inline_b64: None,
186                    }
187                    .into_value(),
188                );
189            }
190        }
191        "document" => {
192            if let Some(doc) = &message.document {
193                attachments.push(
194                    CanonicalAttachment {
195                        attachment_type: "file".into(),
196                        name: doc.filename.clone(),
197                        mime: None,
198                        size: None,
199                        url: doc.link.clone(),
200                        data_inline_b64: None,
201                    }
202                    .into_value(),
203                );
204            }
205        }
206        "location" => {
207            if let Some(location) = &message.location {
208                text = Some(location.name.clone().unwrap_or_else(|| "location".into()));
209            }
210        }
211        "interactive" => {
212            if let Some(interactive) = &message.interactive {
213                if let Some(reply) = &interactive.button_reply {
214                    buttons.push(
215                        CanonicalButton {
216                            id: reply.id.clone(),
217                            title: reply.title.clone(),
218                            payload: reply.id.clone(),
219                        }
220                        .into_value(),
221                    );
222                    text = Some(reply.title.clone());
223                } else if let Some(list) = &interactive.list_reply {
224                    buttons.push(
225                        CanonicalButton {
226                            id: list.id.clone(),
227                            title: list.title.clone(),
228                            payload: list.id.clone(),
229                        }
230                        .into_value(),
231                    );
232                    text = Some(list.title.clone());
233                }
234            }
235        }
236        _ => {}
237    }
238
239    if !attachments.is_empty() {
240        scopes.push("attachments".into());
241    }
242    if !buttons.is_empty() {
243        scopes.push("buttons".into());
244    }
245
246    (text, attachments, buttons, scopes)
247}
248
249fn parse_timestamp(raw: Option<&str>) -> Result<DateTime<Utc>, StatusCode> {
250    if let Some(epoch) = raw.and_then(|value| value.parse::<i64>().ok()) {
251        return DateTime::from_timestamp(epoch, 0).ok_or(StatusCode::BAD_REQUEST);
252    }
253    Ok(Utc::now())
254}
255
256fn verify_signature(headers: &HeaderMap, body: &[u8]) -> Result<(), StatusCode> {
257    if let Ok(secret) = std::env::var("WHATSAPP_APP_SECRET") {
258        let signature = headers
259            .get("X-Hub-Signature-256")
260            .and_then(|value| value.to_str().ok())
261            .and_then(|value| value.strip_prefix("sha256="))
262            .ok_or(StatusCode::UNAUTHORIZED)?;
263        let mut mac =
264            HmacSha256::new_from_slice(secret.as_bytes()).map_err(|_| StatusCode::UNAUTHORIZED)?;
265        mac.update(body);
266        let expected = hex::encode(mac.finalize().into_bytes());
267        if !subtle_equals(signature, &expected) {
268            return Err(StatusCode::UNAUTHORIZED);
269        }
270    }
271    Ok(())
272}
273
274fn subtle_equals(a: &str, b: &str) -> bool {
275    if a.len() != b.len() {
276        return false;
277    }
278    let mut diff = 0u8;
279    for (x, y) in a.as_bytes().iter().zip(b.as_bytes()) {
280        diff |= x ^ y;
281    }
282    diff == 0
283}
284
285#[derive(Debug, Deserialize)]
286pub struct VerifyQuery {
287    #[serde(rename = "hub.mode")]
288    mode: Option<String>,
289    #[serde(rename = "hub.challenge")]
290    challenge: Option<String>,
291    #[serde(rename = "hub.verify_token")]
292    verify_token: Option<String>,
293}
294
295#[derive(Debug, Deserialize)]
296struct WhatsappWebhook {
297    entry: Vec<WhatsappEntry>,
298}
299
300#[derive(Debug, Deserialize)]
301struct WhatsappEntry {
302    changes: Vec<WhatsappChange>,
303}
304
305#[derive(Debug, Deserialize)]
306struct WhatsappChange {
307    value: WhatsappValue,
308}
309
310#[derive(Debug, Deserialize)]
311struct WhatsappValue {
312    #[serde(default)]
313    messages: Option<Vec<WhatsappMessage>>,
314}
315
316#[derive(Debug, Clone, Deserialize)]
317struct WhatsappMessage {
318    id: String,
319    #[serde(default)]
320    from: String,
321    #[serde(rename = "type")]
322    r#type: String,
323    #[serde(default)]
324    timestamp: Option<String>,
325    #[serde(default)]
326    text: Option<WhatsappText>,
327    #[serde(default)]
328    image: Option<WhatsappMedia>,
329    #[serde(default)]
330    audio: Option<WhatsappMedia>,
331    #[serde(default)]
332    video: Option<WhatsappMedia>,
333    #[serde(default)]
334    document: Option<WhatsappDocument>,
335    #[serde(default)]
336    location: Option<WhatsappLocation>,
337    #[serde(default)]
338    interactive: Option<WhatsappInteractive>,
339}
340
341#[derive(Debug, Clone, Deserialize)]
342struct WhatsappText {
343    body: String,
344}
345
346#[derive(Debug, Clone, Deserialize)]
347struct WhatsappMedia {
348    #[serde(default)]
349    link: Option<String>,
350    #[serde(default)]
351    caption: Option<String>,
352}
353
354#[derive(Debug, Clone, Deserialize)]
355struct WhatsappDocument {
356    #[serde(default)]
357    link: Option<String>,
358    #[serde(default)]
359    filename: Option<String>,
360}
361
362#[derive(Debug, Clone, Deserialize)]
363struct WhatsappLocation {
364    #[serde(default)]
365    name: Option<String>,
366}
367
368#[derive(Debug, Clone, Deserialize)]
369struct WhatsappInteractive {
370    #[serde(default)]
371    button_reply: Option<WhatsappButtonReply>,
372    #[serde(default)]
373    list_reply: Option<WhatsappListReply>,
374}
375
376#[derive(Debug, Clone, Deserialize)]
377struct WhatsappButtonReply {
378    id: String,
379    title: String,
380}
381
382#[derive(Debug, Clone, Deserialize)]
383struct WhatsappListReply {
384    id: String,
385    title: String,
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391    use serde_json::json;
392
393    #[test]
394    fn whatsapp_message_maps_to_canonical_payload() {
395        let raw = json!({
396            "entry": [{
397                "changes": [{
398                    "value": {
399                        "messages": [{
400                            "id": "wamid.HBgM",
401                            "from": "447700900123",
402                            "timestamp": "1731315600",
403                            "type": "text",
404                            "text": { "body": "Hi" }
405                        }]
406                    }
407                }]
408            }]
409        });
410        let webhook: WhatsappWebhook = serde_json::from_value(raw.clone()).unwrap();
411        let message = webhook.entry[0].changes[0].value.messages.as_ref().unwrap()[0].clone();
412
413        let provider_ids = ProviderIds {
414            conversation_id: Some(message.from.clone()),
415            user_id: Some(message.from.clone()),
416            message_id: Some(message.id.clone()),
417            ..ProviderIds::default()
418        };
419        let session_key = canonical_session_key("zain-kuwait", "whatsapp", &provider_ids);
420        assert_eq!(
421            session_key,
422            "zain-kuwait:whatsapp:447700900123:447700900123"
423        );
424        let timestamp = parse_timestamp(message.timestamp.as_deref()).unwrap();
425        let (text, attachments, buttons, scopes) = map_message_content(&message);
426        let canonical = build_canonical_payload(
427            "zain-kuwait",
428            "whatsapp",
429            &provider_ids,
430            session_key,
431            &scopes,
432            timestamp,
433            None,
434            text,
435            attachments,
436            buttons,
437            empty_entities(),
438            default_metadata(),
439            json!({ "type": message.r#type }),
440            raw,
441        );
442
443        assert_eq!(canonical["provider"], json!("whatsapp"));
444        assert_eq!(
445            canonical["session"]["key"],
446            json!("zain-kuwait:whatsapp:447700900123:447700900123")
447        );
448        assert_eq!(canonical["text"], json!("Hi"));
449        assert_eq!(canonical["attachments"], json!([]));
450        assert_eq!(canonical["buttons"], json!([]));
451    }
452}