Skip to main content

ferro_whatsapp/webhook/
events.rs

1use crate::client::WhatsApp;
2use crate::message::{DeliveryStatus, SenderIdentity};
3use ferro_events::Event;
4
5/// Ferro event emitted when a WhatsApp text message is received.
6///
7/// Dispatched by [`ProcessWhatsAppWebhook`] for each inbound message in
8/// the Meta webhook payload. Attach a [`ferro_events::Listener`] to handle it.
9#[derive(Debug, Clone)]
10pub struct WhatsAppTextReceived {
11    /// WhatsApp message ID (`wamid.xxx`). Use for delivery status correlation.
12    pub wamid: String,
13    /// Sender identity resolved via [the `is_owner` closure in `WhatsAppConfig`].
14    pub sender_identity: SenderIdentity,
15    /// Text body of the message.
16    pub text: String,
17    /// Unix timestamp of the message.
18    pub timestamp: i64,
19    /// Raw Meta webhook message object for custom field access.
20    pub raw: serde_json::Value,
21}
22
23impl Event for WhatsAppTextReceived {
24    fn name(&self) -> &'static str {
25        "whatsapp.message.received"
26    }
27}
28
29/// Ferro event emitted when a WhatsApp delivery status update is received.
30///
31/// Dispatched by [`ProcessWhatsAppWebhook`] for each status entry in
32/// the Meta webhook payload.
33#[derive(Debug, Clone)]
34pub struct WhatsAppStatusUpdate {
35    /// WhatsApp message ID (`wamid.xxx`) this status refers to.
36    pub wamid: String,
37    /// Delivery status from Meta.
38    pub status: DeliveryStatus,
39    /// Unix timestamp of the status update.
40    pub timestamp: i64,
41}
42
43impl Event for WhatsAppStatusUpdate {
44    fn name(&self) -> &'static str {
45        "whatsapp.status.update"
46    }
47}
48
49/// Background job that processes a WhatsApp webhook payload asynchronously.
50///
51/// Webhook handlers dispatch this job immediately after HMAC verification,
52/// returning HTTP 200 to Meta without blocking on event processing.
53/// The job parses the Meta JSON envelope and dispatches typed ferro-events.
54///
55/// ## Deduplication
56///
57/// Deduplication is handled at the application layer. Wire an
58/// [`InMemoryDeduplicationStore`][crate::InMemoryDeduplicationStore] (or custom
59/// [`DeduplicationStore`][crate::DeduplicationStore]) into your webhook handler
60/// before dispatching this job to avoid processing duplicate `wamid`s.
61#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
62pub struct ProcessWhatsAppWebhook {
63    /// The raw JSON body received from Meta's webhook POST.
64    pub payload_json: String,
65}
66
67#[ferro_queue::async_trait]
68impl ferro_queue::Job for ProcessWhatsAppWebhook {
69    async fn handle(&self) -> Result<(), ferro_queue::Error> {
70        let v: serde_json::Value = serde_json::from_str(&self.payload_json)
71            .map_err(|e| ferro_queue::Error::custom(format!("invalid webhook JSON: {e}")))?;
72
73        let value = &v["entry"][0]["changes"][0]["value"];
74        let is_owner = &WhatsApp::config().is_owner;
75
76        for event in parse_text_messages(value, is_owner.as_ref()) {
77            event.dispatch_sync();
78        }
79
80        for event in parse_status_updates(value) {
81            event.dispatch_sync();
82        }
83
84        Ok(())
85    }
86
87    fn name(&self) -> &'static str {
88        "ProcessWhatsAppWebhook"
89    }
90}
91
92fn parse_text_messages(
93    value: &serde_json::Value,
94    is_owner: &dyn Fn(&str) -> bool,
95) -> Vec<WhatsAppTextReceived> {
96    let Some(messages) = value["messages"].as_array() else {
97        return vec![];
98    };
99
100    messages
101        .iter()
102        .filter_map(|msg| {
103            let wamid = msg["id"].as_str()?.to_string();
104            let from_phone = msg["from"].as_str()?.to_string();
105            let timestamp: i64 = msg["timestamp"].as_str()?.parse().ok()?;
106            let text = msg["text"]["body"].as_str()?.to_string();
107            let sender_identity = resolve_identity(&from_phone, is_owner);
108            Some(WhatsAppTextReceived {
109                wamid,
110                sender_identity,
111                text,
112                timestamp,
113                raw: msg.clone(),
114            })
115        })
116        .collect()
117}
118
119fn parse_status_updates(value: &serde_json::Value) -> Vec<WhatsAppStatusUpdate> {
120    let Some(statuses) = value["statuses"].as_array() else {
121        return vec![];
122    };
123
124    statuses
125        .iter()
126        .filter_map(|status| {
127            let wamid = status["id"].as_str()?.to_string();
128            let status_str = status["status"].as_str()?;
129            let timestamp: i64 = status["timestamp"].as_str()?.parse().ok()?;
130            let delivery_status: DeliveryStatus =
131                serde_json::from_value(serde_json::Value::String(status_str.to_string()))
132                    .unwrap_or(DeliveryStatus::Unknown);
133            Some(WhatsAppStatusUpdate {
134                wamid,
135                status: delivery_status,
136                timestamp,
137            })
138        })
139        .collect()
140}
141
142fn resolve_identity(phone: &str, is_owner: &dyn Fn(&str) -> bool) -> SenderIdentity {
143    if is_owner(phone) {
144        SenderIdentity::Owner(phone.to_string())
145    } else {
146        SenderIdentity::Customer(phone.to_string())
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use super::*;
153    use ferro_queue::Job;
154
155    fn sample_text_webhook(
156        wamid: &str,
157        from: &str,
158        text: &str,
159        timestamp: &str,
160    ) -> serde_json::Value {
161        serde_json::json!({
162            "entry": [{
163                "changes": [{
164                    "value": {
165                        "messages": [{
166                            "id": wamid,
167                            "from": from,
168                            "timestamp": timestamp,
169                            "type": "text",
170                            "text": { "body": text }
171                        }]
172                    }
173                }]
174            }]
175        })
176    }
177
178    fn sample_status_webhook(wamid: &str, status: &str, timestamp: &str) -> serde_json::Value {
179        serde_json::json!({
180            "entry": [{
181                "changes": [{
182                    "value": {
183                        "statuses": [{
184                            "id": wamid,
185                            "status": status,
186                            "timestamp": timestamp
187                        }]
188                    }
189                }]
190            }]
191        })
192    }
193
194    #[test]
195    fn text_received_event_name() {
196        let event = WhatsAppTextReceived {
197            wamid: "wamid.001".into(),
198            sender_identity: SenderIdentity::Customer("393401234567".into()),
199            text: "hello".into(),
200            timestamp: 1700000000,
201            raw: serde_json::Value::Null,
202        };
203        assert_eq!(event.name(), "whatsapp.message.received");
204    }
205
206    #[test]
207    fn status_update_event_name() {
208        let event = WhatsAppStatusUpdate {
209            wamid: "wamid.001".into(),
210            status: DeliveryStatus::Delivered,
211            timestamp: 1700000001,
212        };
213        assert_eq!(event.name(), "whatsapp.status.update");
214    }
215
216    #[test]
217    fn process_job_name() {
218        let job = ProcessWhatsAppWebhook {
219            payload_json: "{}".to_string(),
220        };
221        assert_eq!(job.name(), "ProcessWhatsAppWebhook");
222    }
223
224    #[test]
225    fn sender_identity_owner() {
226        let identity = resolve_identity("393401234567", &|phone| phone == "393401234567");
227        assert!(
228            matches!(identity, SenderIdentity::Owner(_)),
229            "is_owner=true must produce Owner"
230        );
231        assert_eq!(identity.phone(), "393401234567");
232    }
233
234    #[test]
235    fn sender_identity_customer() {
236        let identity = resolve_identity("393409999999", &|_| false);
237        assert!(
238            matches!(identity, SenderIdentity::Customer(_)),
239            "is_owner=false must produce Customer (safe default)"
240        );
241        assert_eq!(identity.phone(), "393409999999");
242    }
243
244    #[test]
245    fn parse_text_message() {
246        let payload = sample_text_webhook("wamid.abc", "393401234567", "Ciao!", "1700000000");
247        let value = &payload["entry"][0]["changes"][0]["value"];
248        let events = parse_text_messages(value, &|_| false);
249
250        assert_eq!(events.len(), 1);
251        let event = &events[0];
252        assert_eq!(event.wamid, "wamid.abc");
253        assert_eq!(event.text, "Ciao!");
254        assert_eq!(event.timestamp, 1700000000);
255        assert!(matches!(event.sender_identity, SenderIdentity::Customer(_)));
256    }
257
258    #[test]
259    fn parse_status_update() {
260        let payload = sample_status_webhook("wamid.xyz", "delivered", "1700000001");
261        let value = &payload["entry"][0]["changes"][0]["value"];
262        let events = parse_status_updates(value);
263
264        assert_eq!(events.len(), 1);
265        let event = &events[0];
266        assert_eq!(event.wamid, "wamid.xyz");
267        assert_eq!(event.timestamp, 1700000001);
268        assert!(matches!(event.status, DeliveryStatus::Delivered));
269    }
270
271    // Compile-time check: all event types are Clone + Send + Sync
272    fn _assert_clone_send_sync<T: Clone + Send + Sync>() {}
273
274    #[test]
275    fn events_are_clone_send_sync() {
276        _assert_clone_send_sync::<WhatsAppTextReceived>();
277        _assert_clone_send_sync::<WhatsAppStatusUpdate>();
278        _assert_clone_send_sync::<ProcessWhatsAppWebhook>();
279    }
280}