Skip to main content

whatsapp_rust/
pdo.rs

1//! PDO (Peer Data Operation) support for requesting message content from the primary device.
2//!
3//! When message decryption fails (e.g., due to session mismatch), instead of only sending
4//! a retry receipt to the sender, we can also request the message content from our own
5//! primary phone device. This is useful because:
6//!
7//! 1. The primary phone has already decrypted the message successfully
8//! 2. It can share the decrypted content with linked devices via PDO
9//! 3. This bypasses session issues entirely since we're asking our own trusted device
10//!
11//! The flow is:
12//! 1. Decryption fails for a message
13//! 2. We send a PeerDataOperationRequestMessage with type PLACEHOLDER_MESSAGE_RESEND
14//! 3. The phone responds with PeerDataOperationRequestResponseMessage containing the decoded message
15//! 4. We emit the message as if we had decrypted it ourselves
16
17use crate::cache::Cache;
18use crate::client::Client;
19use crate::types::message::MessageInfo;
20use log::{debug, info, warn};
21use prost::Message;
22use std::sync::Arc;
23use std::time::Duration;
24use wacore::types::message::{EditAttribute, MessageSource, MsgMetaInfo};
25use wacore_binary::jid::{Jid, JidExt};
26use waproto::whatsapp as wa;
27
28/// Cache entry for pending PDO requests.
29/// Contains the original message info needed to properly dispatch the response.
30#[derive(Clone, Debug)]
31pub struct PendingPdoRequest {
32    pub message_info: MessageInfo,
33    pub requested_at: wacore::time::Instant,
34}
35
36/// Creates a new PDO request cache.
37/// The cache has a TTL of 30 seconds (phone should respond quickly) and limited capacity.
38pub fn new_pdo_cache() -> Cache<String, PendingPdoRequest> {
39    Cache::builder()
40        .time_to_live(Duration::from_secs(30))
41        .max_capacity(500)
42        .build()
43}
44
45impl Client {
46    /// Sends a PDO (Peer Data Operation) request to our own primary phone to get the
47    /// decrypted content of a message that we failed to decrypt.
48    ///
49    /// This is called when decryption fails and we want to ask our phone for the message.
50    /// The phone will respond with a PeerDataOperationRequestResponseMessage containing
51    /// the full WebMessageInfo which we can then dispatch as a normal message event.
52    ///
53    /// # Arguments
54    /// * `info` - The MessageInfo for the message that failed to decrypt
55    ///
56    /// # Returns
57    /// * `Ok(())` if the request was sent successfully
58    /// * `Err` if we couldn't send the request (e.g., not logged in)
59    pub async fn send_pdo_placeholder_resend_request(
60        self: &Arc<Self>,
61        info: &MessageInfo,
62    ) -> Result<(), anyhow::Error> {
63        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
64
65        // We need to send PDO to our PRIMARY PHONE (device 0), not to ourselves (linked device).
66        // The primary phone has already decrypted the message and can share the content with us.
67        let own_pn = device_snapshot
68            .pn
69            .clone()
70            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
71
72        // Create JID for device 0 (primary phone)
73        let primary_phone_jid = own_pn.with_device(0);
74
75        // Resolve JIDs to LID for the MessageKey and cache key, matching WhatsApp Web's behavior.
76        // This ensures the cache key matches the JID that the phone will respond with (usually LID).
77        let remote_jid = self.resolve_encryption_jid(&info.source.chat).await;
78        let participant = if info.source.is_group {
79            Some(self.resolve_encryption_jid(&info.source.sender).await)
80        } else {
81            None
82        };
83
84        // Check-and-insert to avoid duplicate PDO requests for the same message.
85        let cache_key = format!("{}:{}", remote_jid, info.id);
86
87        if self.pdo_pending_requests.get(&cache_key).await.is_some() {
88            debug!(
89                "PDO request already pending for message {} from {} (resolved: {})",
90                info.id, info.source.sender, remote_jid
91            );
92            return Ok(());
93        }
94
95        let pending = PendingPdoRequest {
96            message_info: info.clone(),
97            requested_at: wacore::time::Instant::now(),
98        };
99        self.pdo_pending_requests
100            .insert(cache_key.clone(), pending)
101            .await;
102
103        // Build the message key for the placeholder resend request
104        let message_key = wa::MessageKey {
105            remote_jid: Some(remote_jid.to_string()),
106            from_me: Some(info.source.is_from_me),
107            id: Some(info.id.clone()),
108            participant: participant.map(|p| p.to_string()),
109        };
110
111        // Build the PDO request message
112        let pdo_request = wa::message::PeerDataOperationRequestMessage {
113            peer_data_operation_request_type: Some(
114                wa::message::PeerDataOperationRequestType::PlaceholderMessageResend as i32,
115            ),
116            placeholder_message_resend_request: vec![
117                wa::message::peer_data_operation_request_message::PlaceholderMessageResendRequest {
118                    message_key: Some(message_key),
119                },
120            ],
121            ..Default::default()
122        };
123
124        // Wrap it in a protocol message
125        let protocol_message = wa::message::ProtocolMessage {
126            r#type: Some(
127                wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
128            ),
129            peer_data_operation_request_message: Some(pdo_request),
130            ..Default::default()
131        };
132
133        let msg = wa::Message {
134            protocol_message: Some(Box::new(protocol_message)),
135            ..Default::default()
136        };
137
138        info!(
139            "Sending PDO placeholder resend request for message {} from {} in {} to primary phone {}",
140            info.id, info.source.sender, info.source.chat, primary_phone_jid
141        );
142
143        // Ensure E2E session exists before sending (matches WhatsApp Web behavior)
144        self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
145            .await?;
146
147        // Send the message to our primary phone (device 0)
148        match self.send_peer_message(primary_phone_jid, &msg).await {
149            Ok(_) => {
150                debug!("PDO request sent successfully for message {}", info.id);
151                Ok(())
152            }
153            Err(e) => {
154                // Remove from pending cache on failure
155                self.pdo_pending_requests.remove(&cache_key).await;
156                warn!(
157                    "Failed to send PDO request for message {}: {:?}",
158                    info.id, e
159                );
160                Err(e)
161            }
162        }
163    }
164
165    /// Request on-demand message history from the primary phone via PDO.
166    pub async fn fetch_message_history(
167        self: &Arc<Self>,
168        chat_jid: &Jid,
169        oldest_msg_id: &str,
170        oldest_msg_from_me: bool,
171        oldest_msg_timestamp_ms: i64,
172        count: i32,
173    ) -> Result<String, anyhow::Error> {
174        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
175        let own_pn = device_snapshot
176            .pn
177            .clone()
178            .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
179        let primary_phone_jid = own_pn.with_device(0);
180
181        let pdo_request = wa::message::PeerDataOperationRequestMessage {
182            peer_data_operation_request_type: Some(
183                wa::message::PeerDataOperationRequestType::HistorySyncOnDemand as i32,
184            ),
185            history_sync_on_demand_request: Some(
186                wa::message::peer_data_operation_request_message::HistorySyncOnDemandRequest {
187                    chat_jid: Some(chat_jid.to_string()),
188                    oldest_msg_id: Some(oldest_msg_id.to_string()),
189                    oldest_msg_from_me: Some(oldest_msg_from_me),
190                    oldest_msg_timestamp_ms: Some(oldest_msg_timestamp_ms),
191                    on_demand_msg_count: Some(count),
192                    ..Default::default()
193                },
194            ),
195            ..Default::default()
196        };
197
198        let protocol_message = wa::message::ProtocolMessage {
199            r#type: Some(
200                wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
201            ),
202            peer_data_operation_request_message: Some(pdo_request),
203            ..Default::default()
204        };
205
206        let msg = wa::Message {
207            protocol_message: Some(Box::new(protocol_message)),
208            ..Default::default()
209        };
210
211        info!(
212            "Sending PDO history sync on-demand request for chat {} (count={}) to primary phone {}",
213            chat_jid, count, primary_phone_jid
214        );
215
216        self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
217            .await?;
218        self.send_peer_message(primary_phone_jid, &msg).await
219    }
220
221    /// Sends a peer message (message to our own devices).
222    /// This is used for PDO requests and similar device-to-device communication.
223    async fn send_peer_message(
224        self: &Arc<Self>,
225        to: Jid,
226        msg: &wa::Message,
227    ) -> Result<String, anyhow::Error> {
228        let msg_id = self.generate_message_id().await;
229
230        // Send with peer category and high priority
231        self.send_message_impl(
232            to,
233            msg,
234            Some(msg_id.clone()),
235            true,  // is_peer_message
236            false, // is_retry
237            None,
238            vec![], // No extra stanza nodes for peer messages
239        )
240        .await?;
241
242        Ok(msg_id)
243    }
244
245    /// Handles a PDO response message from our primary phone.
246    /// This is called when we receive a PeerDataOperationRequestResponseMessage.
247    ///
248    /// # Arguments
249    /// * `response` - The PDO response message
250    /// * `info` - The MessageInfo for the PDO response message itself
251    pub async fn handle_pdo_response(
252        self: &Arc<Self>,
253        response: &wa::message::PeerDataOperationRequestResponseMessage,
254        _pdo_msg_info: &MessageInfo,
255    ) {
256        debug!(
257            "Received PDO response with {} results",
258            response.peer_data_operation_result.len()
259        );
260
261        for result in &response.peer_data_operation_result {
262            if let Some(placeholder_response) = &result.placeholder_message_resend_response {
263                self.handle_placeholder_resend_response(placeholder_response)
264                    .await;
265            }
266        }
267    }
268
269    /// Handles a single placeholder message resend response from PDO.
270    async fn handle_placeholder_resend_response(
271        self: &Arc<Self>,
272        response: &wa::message::peer_data_operation_request_response_message::peer_data_operation_result::PlaceholderMessageResendResponse,
273    ) {
274        let Some(web_message_info_bytes) = &response.web_message_info_bytes else {
275            warn!("PDO placeholder response missing webMessageInfoBytes");
276            return;
277        };
278
279        // Decode the WebMessageInfo
280        let web_msg_info = match wa::WebMessageInfo::decode(web_message_info_bytes.as_slice()) {
281            Ok(info) => info,
282            Err(e) => {
283                warn!("Failed to decode WebMessageInfo from PDO response: {:?}", e);
284                return;
285            }
286        };
287
288        // Extract message key to find the original pending request
289        let key = &web_msg_info.key;
290
291        let remote_jid = key.remote_jid.as_deref().unwrap_or("");
292        let msg_id = key.id.as_deref().unwrap_or("");
293        let cache_key = format!("{}:{}", remote_jid, msg_id);
294
295        // Remove from pending requests
296        let pending = self.pdo_pending_requests.remove(&cache_key).await;
297
298        let elapsed = pending
299            .as_ref()
300            .map(|p| p.requested_at.elapsed().as_millis())
301            .unwrap_or(0);
302
303        info!(
304            "Received PDO placeholder response for message {} (took {}ms)",
305            msg_id, elapsed
306        );
307
308        // Build MessageInfo from the WebMessageInfo or use the pending request's info
309        let message_info = if let Some(pending) = pending {
310            pending.message_info
311        } else {
312            // Reconstruct MessageInfo from WebMessageInfo if we don't have it cached
313            match self.message_info_from_web_message_info(&web_msg_info).await {
314                Ok(info) => info,
315                Err(e) => {
316                    warn!(
317                        "Failed to reconstruct MessageInfo from PDO response: {:?}",
318                        e
319                    );
320                    return;
321                }
322            }
323        };
324
325        // Extract the actual message content
326        let Some(message) = web_msg_info.message else {
327            warn!("PDO response WebMessageInfo missing message content");
328            return;
329        };
330
331        // Dispatch the message as a normal message event
332        info!(
333            "Dispatching PDO-recovered message {} from {} via phone",
334            message_info.id, message_info.source.sender
335        );
336
337        self.core
338            .event_bus
339            .dispatch(&wacore::types::events::Event::Message(
340                Box::new(message),
341                message_info,
342            ));
343    }
344
345    /// Reconstructs a MessageInfo from a WebMessageInfo.
346    /// This is used when we receive a PDO response but don't have the original pending request cached.
347    async fn message_info_from_web_message_info(
348        &self,
349        web_msg: &wa::WebMessageInfo,
350    ) -> Result<MessageInfo, anyhow::Error> {
351        let key = &web_msg.key;
352
353        let remote_jid: Jid = key
354            .remote_jid
355            .as_ref()
356            .ok_or_else(|| anyhow::anyhow!("MessageKey missing remoteJid"))?
357            .parse()?;
358
359        let is_group = remote_jid.is_group();
360        let is_from_me = key.from_me.unwrap_or(false);
361
362        let sender = if is_group {
363            key.participant
364                .as_ref()
365                .map(|p: &String| p.parse())
366                .transpose()?
367                .unwrap_or_else(|| remote_jid.clone())
368        } else if is_from_me {
369            self.persistence_manager
370                .get_device_snapshot()
371                .await
372                .pn
373                .clone()
374                .unwrap_or_else(|| remote_jid.clone())
375        } else {
376            remote_jid.clone()
377        };
378
379        let timestamp = web_msg
380            .message_timestamp
381            .map(|ts| {
382                chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_else(chrono::Utc::now)
383            })
384            .unwrap_or_else(chrono::Utc::now);
385
386        Ok(MessageInfo {
387            id: key.id.clone().unwrap_or_default(),
388            server_id: 0,
389            r#type: String::new(),
390            source: MessageSource {
391                chat: remote_jid,
392                sender,
393                sender_alt: None,
394                recipient_alt: None,
395                is_from_me,
396                is_group,
397                addressing_mode: None,
398                broadcast_list_owner: None,
399                recipient: None,
400            },
401            timestamp,
402            push_name: web_msg.push_name.clone().unwrap_or_default(),
403            category: String::new(),
404            multicast: false,
405            media_type: String::new(),
406            edit: EditAttribute::default(),
407            bot_info: None,
408            meta_info: MsgMetaInfo::default(),
409            verified_name: None,
410            device_sent_meta: None,
411        })
412    }
413
414    /// Spawns a PDO request for a message that failed to decrypt.
415    /// This is called alongside the retry receipt to increase chances of recovery.
416    ///
417    /// When `immediate` is true, the PDO request is sent without delay.
418    /// This is used when we've exhausted retry attempts and need immediate PDO recovery.
419    pub(crate) fn spawn_pdo_request_with_options(
420        self: &Arc<Self>,
421        info: &MessageInfo,
422        immediate: bool,
423    ) {
424        // Don't send PDO for our own messages or status broadcasts
425        if info.source.is_from_me {
426            return;
427        }
428        if info.source.chat.server == wacore_binary::jid::BROADCAST_SERVER {
429            return;
430        }
431
432        let client_clone = Arc::clone(self);
433        let info_clone = info.clone();
434
435        self.runtime
436            .spawn(Box::pin(async move {
437                if !immediate {
438                    // Add a small delay to allow the retry receipt to be processed first
439                    // This avoids overwhelming the phone with simultaneous requests
440                    client_clone.runtime.sleep(Duration::from_millis(500)).await;
441                }
442
443                if let Err(e) = client_clone
444                    .send_pdo_placeholder_resend_request(&info_clone)
445                    .await
446                {
447                    warn!(
448                        "Failed to send PDO request for message {} from {}: {:?}",
449                        info_clone.id, info_clone.source.sender, e
450                    );
451                }
452            }))
453            .detach();
454    }
455
456    /// Spawns a PDO request for a message that failed to decrypt.
457    /// This is called alongside the retry receipt to increase chances of recovery.
458    pub(crate) fn spawn_pdo_request(self: &Arc<Self>, info: &MessageInfo) {
459        self.spawn_pdo_request_with_options(info, false);
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use wacore_binary::jid::{DEFAULT_USER_SERVER, Jid, JidExt};
466
467    #[test]
468    fn test_pdo_primary_phone_jid_is_device_0() {
469        // PDO sends to device 0 (primary phone)
470        let own_pn = Jid::pn("559999999999");
471        let primary_phone_jid = own_pn.with_device(0);
472
473        assert_eq!(primary_phone_jid.device, 0);
474        assert!(!primary_phone_jid.is_ad()); // Device 0 is NOT an additional device
475    }
476
477    #[test]
478    fn test_pdo_primary_phone_jid_preserves_user() {
479        let own_pn = Jid::pn("559999999999");
480        let primary_phone_jid = own_pn.with_device(0);
481
482        assert_eq!(primary_phone_jid.user, "559999999999");
483        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
484    }
485
486    #[test]
487    fn test_pdo_primary_phone_jid_from_linked_device() {
488        // Even if we're device 33, PDO should send to device 0
489        let own_pn = Jid::pn_device("559999999999", 33);
490        let primary_phone_jid = own_pn.with_device(0);
491
492        assert_eq!(primary_phone_jid.user, "559999999999");
493        assert_eq!(primary_phone_jid.device, 0);
494    }
495}