whatsapp_rust/
pdo.rs

1//! PDO (Peer Data Operation) support for requesting message content from the primary device.
2//!
3//! When message decryption fails (e.g., due to session mismatch), instead of only sending
4//! a retry receipt to the sender, we can also request the message content from our own
5//! primary phone device. This is useful because:
6//!
7//! 1. The primary phone has already decrypted the message successfully
8//! 2. It can share the decrypted content with linked devices via PDO
9//! 3. This bypasses session issues entirely since we're asking our own trusted device
10//!
11//! The flow is:
12//! 1. Decryption fails for a message
13//! 2. We send a PeerDataOperationRequestMessage with type PLACEHOLDER_MESSAGE_RESEND
14//! 3. The phone responds with PeerDataOperationRequestResponseMessage containing the decoded message
15//! 4. We emit the message as if we had decrypted it ourselves
16
17use crate::client::Client;
18use crate::types::message::MessageInfo;
19use log::{debug, info, warn};
20use moka::future::Cache;
21use prost::Message;
22use std::sync::Arc;
23use std::time::Duration;
24use wacore::types::message::{EditAttribute, MessageSource, MsgMetaInfo};
25use wacore_binary::jid::{Jid, JidExt};
26use waproto::whatsapp as wa;
27
28/// Cache entry for pending PDO requests.
29/// Contains the original message info needed to properly dispatch the response.
30#[derive(Clone, Debug)]
31pub struct PendingPdoRequest {
32    pub message_info: MessageInfo,
33    pub requested_at: std::time::Instant,
34}
35
36/// Creates a new PDO request cache.
37/// The cache has a TTL of 30 seconds (phone should respond quickly) and limited capacity.
38pub fn new_pdo_cache() -> Cache<String, PendingPdoRequest> {
39    Cache::builder()
40        .time_to_live(Duration::from_secs(30))
41        .max_capacity(500)
42        .build()
43}
44
45impl Client {
46    /// Sends a PDO (Peer Data Operation) request to our own primary phone to get the
47    /// decrypted content of a message that we failed to decrypt.
48    ///
49    /// This is called when decryption fails and we want to ask our phone for the message.
50    /// The phone will respond with a PeerDataOperationRequestResponseMessage containing
51    /// the full WebMessageInfo which we can then dispatch as a normal message event.
52    ///
53    /// # Arguments
54    /// * `info` - The MessageInfo for the message that failed to decrypt
55    ///
56    /// # Returns
57    /// * `Ok(())` if the request was sent successfully
58    /// * `Err` if we couldn't send the request (e.g., not logged in)
59    pub async fn send_pdo_placeholder_resend_request(
60        self: &Arc<Self>,
61        info: &MessageInfo,
62    ) -> Result<(), anyhow::Error> {
63        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
64
65        // We need to send PDO to our PRIMARY PHONE (device 0), not to ourselves (linked device).
66        // The primary phone has already decrypted the message and can share the content with us.
67        let own_pn = device_snapshot
68            .pn
69            .clone()
70            .ok_or_else(|| anyhow::anyhow!("Not logged in - no phone number available for PDO"))?;
71
72        // Create JID for device 0 (primary phone)
73        let primary_phone_jid = own_pn.with_device(0);
74
75        // Atomically check-and-insert to avoid race conditions where two concurrent
76        // calls could both pass a contains_key check before either inserts.
77        let cache_key = format!("{}:{}", info.source.chat, info.id);
78        let pending = PendingPdoRequest {
79            message_info: info.clone(),
80            requested_at: std::time::Instant::now(),
81        };
82        let entry = self
83            .pdo_pending_requests
84            .entry(cache_key.clone())
85            .or_insert(pending)
86            .await;
87
88        if !entry.is_fresh() {
89            debug!(
90                "PDO request already pending for message {} from {}",
91                info.id, info.source.sender
92            );
93            return Ok(());
94        }
95
96        // Build the message key for the placeholder resend request
97        let message_key = wa::MessageKey {
98            remote_jid: Some(info.source.chat.to_string()),
99            from_me: Some(info.source.is_from_me),
100            id: Some(info.id.clone()),
101            participant: if info.source.is_group {
102                Some(info.source.sender.to_string())
103            } else {
104                None
105            },
106        };
107
108        // Build the PDO request message
109        let pdo_request = wa::message::PeerDataOperationRequestMessage {
110            peer_data_operation_request_type: Some(
111                wa::message::PeerDataOperationRequestType::PlaceholderMessageResend as i32,
112            ),
113            placeholder_message_resend_request: vec![
114                wa::message::peer_data_operation_request_message::PlaceholderMessageResendRequest {
115                    message_key: Some(message_key),
116                },
117            ],
118            ..Default::default()
119        };
120
121        // Wrap it in a protocol message
122        let protocol_message = wa::message::ProtocolMessage {
123            r#type: Some(
124                wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
125            ),
126            peer_data_operation_request_message: Some(pdo_request),
127            ..Default::default()
128        };
129
130        let msg = wa::Message {
131            protocol_message: Some(Box::new(protocol_message)),
132            ..Default::default()
133        };
134
135        info!(
136            "Sending PDO placeholder resend request for message {} from {} in {} to primary phone {}",
137            info.id, info.source.sender, info.source.chat, primary_phone_jid
138        );
139
140        // Ensure E2E session exists before sending (matches WhatsApp Web behavior)
141        self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
142            .await?;
143
144        // Send the message to our primary phone (device 0)
145        match self.send_peer_message(primary_phone_jid, &msg).await {
146            Ok(_) => {
147                debug!("PDO request sent successfully for message {}", info.id);
148                Ok(())
149            }
150            Err(e) => {
151                // Remove from pending cache on failure
152                self.pdo_pending_requests.remove(&cache_key).await;
153                warn!(
154                    "Failed to send PDO request for message {}: {:?}",
155                    info.id, e
156                );
157                Err(e)
158            }
159        }
160    }
161
162    /// Sends a peer message (message to our own devices).
163    /// This is used for PDO requests and similar device-to-device communication.
164    async fn send_peer_message(
165        self: &Arc<Self>,
166        to: Jid,
167        msg: &wa::Message,
168    ) -> Result<String, anyhow::Error> {
169        let msg_id = self.generate_message_id().await;
170
171        // Send with peer category and high priority
172        self.send_message_impl(
173            to,
174            msg,
175            Some(msg_id.clone()),
176            true,  // is_peer_message
177            false, // is_retry
178            None,
179        )
180        .await?;
181
182        Ok(msg_id)
183    }
184
185    /// Handles a PDO response message from our primary phone.
186    /// This is called when we receive a PeerDataOperationRequestResponseMessage.
187    ///
188    /// # Arguments
189    /// * `response` - The PDO response message
190    /// * `info` - The MessageInfo for the PDO response message itself
191    pub async fn handle_pdo_response(
192        self: &Arc<Self>,
193        response: &wa::message::PeerDataOperationRequestResponseMessage,
194        _pdo_msg_info: &MessageInfo,
195    ) {
196        debug!(
197            "Received PDO response with {} results",
198            response.peer_data_operation_result.len()
199        );
200
201        for result in &response.peer_data_operation_result {
202            if let Some(placeholder_response) = &result.placeholder_message_resend_response {
203                self.handle_placeholder_resend_response(placeholder_response)
204                    .await;
205            }
206        }
207    }
208
209    /// Handles a single placeholder message resend response from PDO.
210    async fn handle_placeholder_resend_response(
211        self: &Arc<Self>,
212        response: &wa::message::peer_data_operation_request_response_message::peer_data_operation_result::PlaceholderMessageResendResponse,
213    ) {
214        let Some(web_message_info_bytes) = &response.web_message_info_bytes else {
215            warn!("PDO placeholder response missing webMessageInfoBytes");
216            return;
217        };
218
219        // Decode the WebMessageInfo
220        let web_msg_info = match wa::WebMessageInfo::decode(web_message_info_bytes.as_slice()) {
221            Ok(info) => info,
222            Err(e) => {
223                warn!("Failed to decode WebMessageInfo from PDO response: {:?}", e);
224                return;
225            }
226        };
227
228        // Extract message key to find the original pending request
229        let key = &web_msg_info.key;
230
231        let remote_jid = key.remote_jid.as_deref().unwrap_or("");
232        let msg_id = key.id.as_deref().unwrap_or("");
233        let cache_key = format!("{}:{}", remote_jid, msg_id);
234
235        // Remove from pending requests
236        let pending = self.pdo_pending_requests.remove(&cache_key).await;
237
238        let elapsed = pending
239            .as_ref()
240            .map(|p| p.requested_at.elapsed().as_millis())
241            .unwrap_or(0);
242
243        info!(
244            "Received PDO placeholder response for message {} (took {}ms)",
245            msg_id, elapsed
246        );
247
248        // Build MessageInfo from the WebMessageInfo or use the pending request's info
249        let message_info = if let Some(pending) = pending {
250            pending.message_info
251        } else {
252            // Reconstruct MessageInfo from WebMessageInfo if we don't have it cached
253            match self.message_info_from_web_message_info(&web_msg_info).await {
254                Ok(info) => info,
255                Err(e) => {
256                    warn!(
257                        "Failed to reconstruct MessageInfo from PDO response: {:?}",
258                        e
259                    );
260                    return;
261                }
262            }
263        };
264
265        // Extract the actual message content
266        let Some(message) = web_msg_info.message else {
267            warn!("PDO response WebMessageInfo missing message content");
268            return;
269        };
270
271        // Dispatch the message as a normal message event
272        info!(
273            "Dispatching PDO-recovered message {} from {} via phone",
274            message_info.id, message_info.source.sender
275        );
276
277        self.core
278            .event_bus
279            .dispatch(&wacore::types::events::Event::Message(
280                Box::new(message),
281                message_info,
282            ));
283    }
284
285    /// Reconstructs a MessageInfo from a WebMessageInfo.
286    /// This is used when we receive a PDO response but don't have the original pending request cached.
287    async fn message_info_from_web_message_info(
288        &self,
289        web_msg: &wa::WebMessageInfo,
290    ) -> Result<MessageInfo, anyhow::Error> {
291        let key = &web_msg.key;
292
293        let remote_jid: Jid = key
294            .remote_jid
295            .as_ref()
296            .ok_or_else(|| anyhow::anyhow!("MessageKey missing remoteJid"))?
297            .parse()?;
298
299        let is_group = remote_jid.is_group();
300        let is_from_me = key.from_me.unwrap_or(false);
301
302        let sender = if is_group {
303            key.participant
304                .as_ref()
305                .map(|p: &String| p.parse())
306                .transpose()?
307                .unwrap_or_else(|| remote_jid.clone())
308        } else if is_from_me {
309            self.persistence_manager
310                .get_device_snapshot()
311                .await
312                .pn
313                .clone()
314                .unwrap_or_else(|| remote_jid.clone())
315        } else {
316            remote_jid.clone()
317        };
318
319        let timestamp = web_msg
320            .message_timestamp
321            .map(|ts| {
322                chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_else(chrono::Utc::now)
323            })
324            .unwrap_or_else(chrono::Utc::now);
325
326        Ok(MessageInfo {
327            id: key.id.clone().unwrap_or_default(),
328            server_id: 0,
329            r#type: String::new(),
330            source: MessageSource {
331                chat: remote_jid,
332                sender,
333                sender_alt: None,
334                recipient_alt: None,
335                is_from_me,
336                is_group,
337                addressing_mode: None,
338                broadcast_list_owner: None,
339                recipient: None,
340            },
341            timestamp,
342            push_name: web_msg.push_name.clone().unwrap_or_default(),
343            category: String::new(),
344            multicast: false,
345            media_type: String::new(),
346            edit: EditAttribute::default(),
347            bot_info: None,
348            meta_info: MsgMetaInfo::default(),
349            verified_name: None,
350            device_sent_meta: None,
351        })
352    }
353
354    /// Spawns a PDO request for a message that failed to decrypt.
355    /// This is called alongside the retry receipt to increase chances of recovery.
356    ///
357    /// When `immediate` is true, the PDO request is sent without delay.
358    /// This is used when we've exhausted retry attempts and need immediate PDO recovery.
359    pub(crate) fn spawn_pdo_request_with_options(
360        self: &Arc<Self>,
361        info: &MessageInfo,
362        immediate: bool,
363    ) {
364        // Don't send PDO for our own messages or status broadcasts
365        if info.source.is_from_me {
366            return;
367        }
368        if info.source.chat.server == wacore_binary::jid::BROADCAST_SERVER {
369            return;
370        }
371
372        let client_clone = Arc::clone(self);
373        let info_clone = info.clone();
374
375        tokio::spawn(async move {
376            if !immediate {
377                // Add a small delay to allow the retry receipt to be processed first
378                // This avoids overwhelming the phone with simultaneous requests
379                tokio::time::sleep(Duration::from_millis(500)).await;
380            }
381
382            if let Err(e) = client_clone
383                .send_pdo_placeholder_resend_request(&info_clone)
384                .await
385            {
386                warn!(
387                    "Failed to send PDO request for message {} from {}: {:?}",
388                    info_clone.id, info_clone.source.sender, e
389                );
390            }
391        });
392    }
393
394    /// Spawns a PDO request for a message that failed to decrypt.
395    /// This is called alongside the retry receipt to increase chances of recovery.
396    pub(crate) fn spawn_pdo_request(self: &Arc<Self>, info: &MessageInfo) {
397        self.spawn_pdo_request_with_options(info, false);
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use wacore_binary::jid::{DEFAULT_USER_SERVER, Jid, JidExt};
404
405    #[test]
406    fn test_pdo_primary_phone_jid_is_device_0() {
407        // PDO sends to device 0 (primary phone)
408        let own_pn = Jid::pn("559999999999");
409        let primary_phone_jid = own_pn.with_device(0);
410
411        assert_eq!(primary_phone_jid.device, 0);
412        assert!(!primary_phone_jid.is_ad()); // Device 0 is NOT an additional device
413    }
414
415    #[test]
416    fn test_pdo_primary_phone_jid_preserves_user() {
417        let own_pn = Jid::pn("559999999999");
418        let primary_phone_jid = own_pn.with_device(0);
419
420        assert_eq!(primary_phone_jid.user, "559999999999");
421        assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
422    }
423
424    #[test]
425    fn test_pdo_primary_phone_jid_from_linked_device() {
426        // Even if we're device 33, PDO should send to device 0
427        let own_pn = Jid::pn_device("559999999999", 33);
428        let primary_phone_jid = own_pn.with_device(0);
429
430        assert_eq!(primary_phone_jid.user, "559999999999");
431        assert_eq!(primary_phone_jid.device, 0);
432    }
433}