Skip to main content

whatsapp_rust/
pdo.rs

1//! PDO (Peer Data Operation) support for requesting message content from the primary device.
2//!
3//! When message decryption fails (e.g., due to session mismatch), instead of only sending
4//! a retry receipt to the sender, we can also request the message content from our own
5//! primary phone device. This is useful because:
6//!
7//! 1. The primary phone has already decrypted the message successfully
8//! 2. It can share the decrypted content with linked devices via PDO
9//! 3. This bypasses session issues entirely since we're asking our own trusted device
10//!
11//! The flow is:
12//! 1. Decryption fails for a message
13//! 2. We send a PeerDataOperationRequestMessage with type PLACEHOLDER_MESSAGE_RESEND
14//! 3. The phone responds with PeerDataOperationRequestResponseMessage containing the decoded message
15//! 4. We emit the message as if we had decrypted it ourselves
16
17use crate::client::Client;
18use crate::types::message::MessageInfo;
19use log::{debug, info, warn};
20use moka::future::Cache;
21use prost::Message;
22use std::sync::Arc;
23use std::time::Duration;
24use wacore::types::message::{EditAttribute, MessageSource, MsgMetaInfo};
25use wacore_binary::jid::{Jid, JidExt};
26use waproto::whatsapp as wa;
27
28/// Cache entry for pending PDO requests.
29/// Contains the original message info needed to properly dispatch the response.
30#[derive(Clone, Debug)]
31pub struct PendingPdoRequest {
32    pub message_info: MessageInfo,
33    pub requested_at: std::time::Instant,
34}
35
36/// Creates a new PDO request cache.
37/// The cache has a TTL of 30 seconds (phone should respond quickly) and limited capacity.
38pub fn new_pdo_cache() -> Cache<String, PendingPdoRequest> {
39    Cache::builder()
40        .time_to_live(Duration::from_secs(30))
41        .max_capacity(500)
42        .build()
43}
44
45impl Client {
46    /// Sends a PDO (Peer Data Operation) request to our own primary phone to get the
47    /// decrypted content of a message that we failed to decrypt.
48    ///
49    /// This is called when decryption fails and we want to ask our phone for the message.
50    /// The phone will respond with a PeerDataOperationRequestResponseMessage containing
51    /// the full WebMessageInfo which we can then dispatch as a normal message event.
52    ///
53    /// # Arguments
54    /// * `info` - The MessageInfo for the message that failed to decrypt
55    ///
56    /// # Returns
57    /// * `Ok(())` if the request was sent successfully
58    /// * `Err` if we couldn't send the request (e.g., not logged in)
59    pub async fn send_pdo_placeholder_resend_request(
60        self: &Arc<Self>,
61        info: &MessageInfo,
62    ) -> Result<(), anyhow::Error> {
63        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
64
65        // We need to send PDO to our PRIMARY PHONE (device 0), not to ourselves (linked device).
66        // The primary phone has already decrypted the message and can share the content with us.
67        let own_pn = device_snapshot
68            .pn
69            .clone()
70            .ok_or_else(|| anyhow::anyhow!("Not logged in - no phone number available for PDO"))?;
71
72        // Create JID for device 0 (primary phone)
73        let primary_phone_jid = own_pn.with_device(0);
74
75        // Resolve JIDs to LID for the MessageKey and cache key, matching WhatsApp Web's behavior.
76        // This ensures the cache key matches the JID that the phone will respond with (usually LID).
77        let remote_jid = self.resolve_encryption_jid(&info.source.chat).await;
78        let participant = if info.source.is_group {
79            Some(self.resolve_encryption_jid(&info.source.sender).await)
80        } else {
81            None
82        };
83
84        // Atomically check-and-insert to avoid race conditions where two concurrent
85        // calls could both pass a contains_key check before either inserts.
86        let cache_key = format!("{}:{}", remote_jid, info.id);
87        let pending = PendingPdoRequest {
88            message_info: info.clone(),
89            requested_at: std::time::Instant::now(),
90        };
91        let entry = self
92            .pdo_pending_requests
93            .entry(cache_key.clone())
94            .or_insert(pending)
95            .await;
96
97        if !entry.is_fresh() {
98            debug!(
99                "PDO request already pending for message {} from {} (resolved: {})",
100                info.id, info.source.sender, remote_jid
101            );
102            return Ok(());
103        }
104
105        // Build the message key for the placeholder resend request
106        let message_key = wa::MessageKey {
107            remote_jid: Some(remote_jid.to_string()),
108            from_me: Some(info.source.is_from_me),
109            id: Some(info.id.clone()),
110            participant: participant.map(|p| p.to_string()),
111        };
112
113        // Build the PDO request message
114        let pdo_request = wa::message::PeerDataOperationRequestMessage {
115            peer_data_operation_request_type: Some(
116                wa::message::PeerDataOperationRequestType::PlaceholderMessageResend as i32,
117            ),
118            placeholder_message_resend_request: vec![
119                wa::message::peer_data_operation_request_message::PlaceholderMessageResendRequest {
120                    message_key: Some(message_key),
121                },
122            ],
123            ..Default::default()
124        };
125
126        // Wrap it in a protocol message
127        let protocol_message = wa::message::ProtocolMessage {
128            r#type: Some(
129                wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
130            ),
131            peer_data_operation_request_message: Some(pdo_request),
132            ..Default::default()
133        };
134
135        let msg = wa::Message {
136            protocol_message: Some(Box::new(protocol_message)),
137            ..Default::default()
138        };
139
140        info!(
141            "Sending PDO placeholder resend request for message {} from {} in {} to primary phone {}",
142            info.id, info.source.sender, info.source.chat, primary_phone_jid
143        );
144
145        // Ensure E2E session exists before sending (matches WhatsApp Web behavior)
146        self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
147            .await?;
148
149        // Send the message to our primary phone (device 0)
150        match self.send_peer_message(primary_phone_jid, &msg).await {
151            Ok(_) => {
152                debug!("PDO request sent successfully for message {}", info.id);
153                Ok(())
154            }
155            Err(e) => {
156                // Remove from pending cache on failure
157                self.pdo_pending_requests.remove(&cache_key).await;
158                warn!(
159                    "Failed to send PDO request for message {}: {:?}",
160                    info.id, e
161                );
162                Err(e)
163            }
164        }
165    }
166
167    /// Sends a peer message (message to our own devices).
168    /// This is used for PDO requests and similar device-to-device communication.
169    async fn send_peer_message(
170        self: &Arc<Self>,
171        to: Jid,
172        msg: &wa::Message,
173    ) -> Result<String, anyhow::Error> {
174        let msg_id = self.generate_message_id().await;
175
176        // Send with peer category and high priority
177        self.send_message_impl(
178            to,
179            msg,
180            Some(msg_id.clone()),
181            true,  // is_peer_message
182            false, // is_retry
183            None,
184            vec![], // No extra stanza nodes for peer messages
185        )
186        .await?;
187
188        Ok(msg_id)
189    }
190
191    /// Handles a PDO response message from our primary phone.
192    /// This is called when we receive a PeerDataOperationRequestResponseMessage.
193    ///
194    /// # Arguments
195    /// * `response` - The PDO response message
196    /// * `info` - The MessageInfo for the PDO response message itself
197    pub async fn handle_pdo_response(
198        self: &Arc<Self>,
199        response: &wa::message::PeerDataOperationRequestResponseMessage,
200        _pdo_msg_info: &MessageInfo,
201    ) {
202        debug!(
203            "Received PDO response with {} results",
204            response.peer_data_operation_result.len()
205        );
206
207        for result in &response.peer_data_operation_result {
208            if let Some(placeholder_response) = &result.placeholder_message_resend_response {
209                self.handle_placeholder_resend_response(placeholder_response)
210                    .await;
211            }
212        }
213    }
214
215    /// Handles a single placeholder message resend response from PDO.
216    async fn handle_placeholder_resend_response(
217        self: &Arc<Self>,
218        response: &wa::message::peer_data_operation_request_response_message::peer_data_operation_result::PlaceholderMessageResendResponse,
219    ) {
220        let Some(web_message_info_bytes) = &response.web_message_info_bytes else {
221            warn!("PDO placeholder response missing webMessageInfoBytes");
222            return;
223        };
224
225        // Decode the WebMessageInfo
226        let web_msg_info = match wa::WebMessageInfo::decode(web_message_info_bytes.as_slice()) {
227            Ok(info) => info,
228            Err(e) => {
229                warn!("Failed to decode WebMessageInfo from PDO response: {:?}", e);
230                return;
231            }
232        };
233
234        // Extract message key to find the original pending request
235        let key = &web_msg_info.key;
236
237        let remote_jid = key.remote_jid.as_deref().unwrap_or("");
238        let msg_id = key.id.as_deref().unwrap_or("");
239        let cache_key = format!("{}:{}", remote_jid, msg_id);
240
241        // Remove from pending requests
242        let pending = self.pdo_pending_requests.remove(&cache_key).await;
243
244        let elapsed = pending
245            .as_ref()
246            .map(|p| p.requested_at.elapsed().as_millis())
247            .unwrap_or(0);
248
249        info!(
250            "Received PDO placeholder response for message {} (took {}ms)",
251            msg_id, elapsed
252        );
253
254        // Build MessageInfo from the WebMessageInfo or use the pending request's info
255        let message_info = if let Some(pending) = pending {
256            pending.message_info
257        } else {
258            // Reconstruct MessageInfo from WebMessageInfo if we don't have it cached
259            match self.message_info_from_web_message_info(&web_msg_info).await {
260                Ok(info) => info,
261                Err(e) => {
262                    warn!(
263                        "Failed to reconstruct MessageInfo from PDO response: {:?}",
264                        e
265                    );
266                    return;
267                }
268            }
269        };
270
271        // Extract the actual message content
272        let Some(message) = web_msg_info.message else {
273            warn!("PDO response WebMessageInfo missing message content");
274            return;
275        };
276
277        // Dispatch the message as a normal message event
278        info!(
279            "Dispatching PDO-recovered message {} from {} via phone",
280            message_info.id, message_info.source.sender
281        );
282
283        self.core
284            .event_bus
285            .dispatch(&wacore::types::events::Event::Message(
286                Box::new(message),
287                message_info,
288            ));
289    }
290
291    /// Reconstructs a MessageInfo from a WebMessageInfo.
292    /// This is used when we receive a PDO response but don't have the original pending request cached.
293    async fn message_info_from_web_message_info(
294        &self,
295        web_msg: &wa::WebMessageInfo,
296    ) -> Result<MessageInfo, anyhow::Error> {
297        let key = &web_msg.key;
298
299        let remote_jid: Jid = key
300            .remote_jid
301            .as_ref()
302            .ok_or_else(|| anyhow::anyhow!("MessageKey missing remoteJid"))?
303            .parse()?;
304
305        let is_group = remote_jid.is_group();
306        let is_from_me = key.from_me.unwrap_or(false);
307
308        let sender = if is_group {
309            key.participant
310                .as_ref()
311                .map(|p: &String| p.parse())
312                .transpose()?
313                .unwrap_or_else(|| remote_jid.clone())
314        } else if is_from_me {
315            self.persistence_manager
316                .get_device_snapshot()
317                .await
318                .pn
319                .clone()
320                .unwrap_or_else(|| remote_jid.clone())
321        } else {
322            remote_jid.clone()
323        };
324
325        let timestamp = web_msg
326            .message_timestamp
327            .map(|ts| {
328                chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_else(chrono::Utc::now)
329            })
330            .unwrap_or_else(chrono::Utc::now);
331
332        Ok(MessageInfo {
333            id: key.id.clone().unwrap_or_default(),
334            server_id: 0,
335            r#type: String::new(),
336            source: MessageSource {
337                chat: remote_jid,
338                sender,
339                sender_alt: None,
340                recipient_alt: None,
341                is_from_me,
342                is_group,
343                addressing_mode: None,
344                broadcast_list_owner: None,
345                recipient: None,
346            },
347            timestamp,
348            push_name: web_msg.push_name.clone().unwrap_or_default(),
349            category: String::new(),
350            multicast: false,
351            media_type: String::new(),
352            edit: EditAttribute::default(),
353            bot_info: None,
354            meta_info: MsgMetaInfo::default(),
355            verified_name: None,
356            device_sent_meta: None,
357        })
358    }
359
360    /// Spawns a PDO request for a message that failed to decrypt.
361    /// This is called alongside the retry receipt to increase chances of recovery.
362    ///
363    /// When `immediate` is true, the PDO request is sent without delay.
364    /// This is used when we've exhausted retry attempts and need immediate PDO recovery.
365    pub(crate) fn spawn_pdo_request_with_options(
366        self: &Arc<Self>,
367        info: &MessageInfo,
368        immediate: bool,
369    ) {
370        // Don't send PDO for our own messages or status broadcasts
371        if info.source.is_from_me {
372            return;
373        }
374        if info.source.chat.server == wacore_binary::jid::BROADCAST_SERVER {
375            return;
376        }
377
378        let client_clone = Arc::clone(self);
379        let info_clone = info.clone();
380
381        tokio::spawn(async move {
382            if !immediate {
383                // Add a small delay to allow the retry receipt to be processed first
384                // This avoids overwhelming the phone with simultaneous requests
385                tokio::time::sleep(Duration::from_millis(500)).await;
386            }
387
388            if let Err(e) = client_clone
389                .send_pdo_placeholder_resend_request(&info_clone)
390                .await
391            {
392                warn!(
393                    "Failed to send PDO request for message {} from {}: {:?}",
394                    info_clone.id, info_clone.source.sender, e
395                );
396            }
397        });
398    }
399
400    /// Spawns a PDO request for a message that failed to decrypt.
401    /// This is called alongside the retry receipt to increase chances of recovery.
402    pub(crate) fn spawn_pdo_request(self: &Arc<Self>, info: &MessageInfo) {
403        self.spawn_pdo_request_with_options(info, false);
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use wacore_binary::jid::{DEFAULT_USER_SERVER, Jid, JidExt};
410
411    #[test]
412    fn test_pdo_primary_phone_jid_is_device_0() {
413        // PDO sends to device 0 (primary phone)
414        let own_pn = Jid::pn("559999999999");
415        let primary_phone_jid = own_pn.with_device(0);
416
417        assert_eq!(primary_phone_jid.device, 0);
418        assert!(!primary_phone_jid.is_ad()); // Device 0 is NOT an additional device
419    }
420
421    #[test]
422    fn test_pdo_primary_phone_jid_preserves_user() {
423        let own_pn = Jid::pn("559999999999");
424        let primary_phone_jid = own_pn.with_device(0);
425
426        assert_eq!(primary_phone_jid.user, "559999999999");
427        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
428    }
429
430    #[test]
431    fn test_pdo_primary_phone_jid_from_linked_device() {
432        // Even if we're device 33, PDO should send to device 0
433        let own_pn = Jid::pn_device("559999999999", 33);
434        let primary_phone_jid = own_pn.with_device(0);
435
436        assert_eq!(primary_phone_jid.user, "559999999999");
437        assert_eq!(primary_phone_jid.device, 0);
438    }
439}