1use crate::client::Client;
18use crate::types::message::MessageInfo;
19use log::{debug, info, warn};
20use moka::future::Cache;
21use prost::Message;
22use std::sync::Arc;
23use std::time::Duration;
24use wacore::types::message::{EditAttribute, MessageSource, MsgMetaInfo};
25use wacore_binary::jid::{Jid, JidExt};
26use waproto::whatsapp as wa;
27
28#[derive(Clone, Debug)]
31pub struct PendingPdoRequest {
32 pub message_info: MessageInfo,
33 pub requested_at: std::time::Instant,
34}
35
36pub fn new_pdo_cache() -> Cache<String, PendingPdoRequest> {
39 Cache::builder()
40 .time_to_live(Duration::from_secs(30))
41 .max_capacity(500)
42 .build()
43}
44
45impl Client {
46 pub async fn send_pdo_placeholder_resend_request(
60 self: &Arc<Self>,
61 info: &MessageInfo,
62 ) -> Result<(), anyhow::Error> {
63 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
64
65 let own_pn = device_snapshot
68 .pn
69 .clone()
70 .ok_or_else(|| anyhow::anyhow!("Not logged in - no phone number available for PDO"))?;
71
72 let primary_phone_jid = own_pn.with_device(0);
74
75 let remote_jid = self.resolve_encryption_jid(&info.source.chat).await;
78 let participant = if info.source.is_group {
79 Some(self.resolve_encryption_jid(&info.source.sender).await)
80 } else {
81 None
82 };
83
84 let cache_key = format!("{}:{}", remote_jid, info.id);
87 let pending = PendingPdoRequest {
88 message_info: info.clone(),
89 requested_at: std::time::Instant::now(),
90 };
91 let entry = self
92 .pdo_pending_requests
93 .entry(cache_key.clone())
94 .or_insert(pending)
95 .await;
96
97 if !entry.is_fresh() {
98 debug!(
99 "PDO request already pending for message {} from {} (resolved: {})",
100 info.id, info.source.sender, remote_jid
101 );
102 return Ok(());
103 }
104
105 let message_key = wa::MessageKey {
107 remote_jid: Some(remote_jid.to_string()),
108 from_me: Some(info.source.is_from_me),
109 id: Some(info.id.clone()),
110 participant: participant.map(|p| p.to_string()),
111 };
112
113 let pdo_request = wa::message::PeerDataOperationRequestMessage {
115 peer_data_operation_request_type: Some(
116 wa::message::PeerDataOperationRequestType::PlaceholderMessageResend as i32,
117 ),
118 placeholder_message_resend_request: vec![
119 wa::message::peer_data_operation_request_message::PlaceholderMessageResendRequest {
120 message_key: Some(message_key),
121 },
122 ],
123 ..Default::default()
124 };
125
126 let protocol_message = wa::message::ProtocolMessage {
128 r#type: Some(
129 wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
130 ),
131 peer_data_operation_request_message: Some(pdo_request),
132 ..Default::default()
133 };
134
135 let msg = wa::Message {
136 protocol_message: Some(Box::new(protocol_message)),
137 ..Default::default()
138 };
139
140 info!(
141 "Sending PDO placeholder resend request for message {} from {} in {} to primary phone {}",
142 info.id, info.source.sender, info.source.chat, primary_phone_jid
143 );
144
145 self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
147 .await?;
148
149 match self.send_peer_message(primary_phone_jid, &msg).await {
151 Ok(_) => {
152 debug!("PDO request sent successfully for message {}", info.id);
153 Ok(())
154 }
155 Err(e) => {
156 self.pdo_pending_requests.remove(&cache_key).await;
158 warn!(
159 "Failed to send PDO request for message {}: {:?}",
160 info.id, e
161 );
162 Err(e)
163 }
164 }
165 }
166
167 async fn send_peer_message(
170 self: &Arc<Self>,
171 to: Jid,
172 msg: &wa::Message,
173 ) -> Result<String, anyhow::Error> {
174 let msg_id = self.generate_message_id().await;
175
176 self.send_message_impl(
178 to,
179 msg,
180 Some(msg_id.clone()),
181 true, false, None,
184 vec![], )
186 .await?;
187
188 Ok(msg_id)
189 }
190
191 pub async fn handle_pdo_response(
198 self: &Arc<Self>,
199 response: &wa::message::PeerDataOperationRequestResponseMessage,
200 _pdo_msg_info: &MessageInfo,
201 ) {
202 debug!(
203 "Received PDO response with {} results",
204 response.peer_data_operation_result.len()
205 );
206
207 for result in &response.peer_data_operation_result {
208 if let Some(placeholder_response) = &result.placeholder_message_resend_response {
209 self.handle_placeholder_resend_response(placeholder_response)
210 .await;
211 }
212 }
213 }
214
215 async fn handle_placeholder_resend_response(
217 self: &Arc<Self>,
218 response: &wa::message::peer_data_operation_request_response_message::peer_data_operation_result::PlaceholderMessageResendResponse,
219 ) {
220 let Some(web_message_info_bytes) = &response.web_message_info_bytes else {
221 warn!("PDO placeholder response missing webMessageInfoBytes");
222 return;
223 };
224
225 let web_msg_info = match wa::WebMessageInfo::decode(web_message_info_bytes.as_slice()) {
227 Ok(info) => info,
228 Err(e) => {
229 warn!("Failed to decode WebMessageInfo from PDO response: {:?}", e);
230 return;
231 }
232 };
233
234 let key = &web_msg_info.key;
236
237 let remote_jid = key.remote_jid.as_deref().unwrap_or("");
238 let msg_id = key.id.as_deref().unwrap_or("");
239 let cache_key = format!("{}:{}", remote_jid, msg_id);
240
241 let pending = self.pdo_pending_requests.remove(&cache_key).await;
243
244 let elapsed = pending
245 .as_ref()
246 .map(|p| p.requested_at.elapsed().as_millis())
247 .unwrap_or(0);
248
249 info!(
250 "Received PDO placeholder response for message {} (took {}ms)",
251 msg_id, elapsed
252 );
253
254 let message_info = if let Some(pending) = pending {
256 pending.message_info
257 } else {
258 match self.message_info_from_web_message_info(&web_msg_info).await {
260 Ok(info) => info,
261 Err(e) => {
262 warn!(
263 "Failed to reconstruct MessageInfo from PDO response: {:?}",
264 e
265 );
266 return;
267 }
268 }
269 };
270
271 let Some(message) = web_msg_info.message else {
273 warn!("PDO response WebMessageInfo missing message content");
274 return;
275 };
276
277 info!(
279 "Dispatching PDO-recovered message {} from {} via phone",
280 message_info.id, message_info.source.sender
281 );
282
283 self.core
284 .event_bus
285 .dispatch(&wacore::types::events::Event::Message(
286 Box::new(message),
287 message_info,
288 ));
289 }
290
291 async fn message_info_from_web_message_info(
294 &self,
295 web_msg: &wa::WebMessageInfo,
296 ) -> Result<MessageInfo, anyhow::Error> {
297 let key = &web_msg.key;
298
299 let remote_jid: Jid = key
300 .remote_jid
301 .as_ref()
302 .ok_or_else(|| anyhow::anyhow!("MessageKey missing remoteJid"))?
303 .parse()?;
304
305 let is_group = remote_jid.is_group();
306 let is_from_me = key.from_me.unwrap_or(false);
307
308 let sender = if is_group {
309 key.participant
310 .as_ref()
311 .map(|p: &String| p.parse())
312 .transpose()?
313 .unwrap_or_else(|| remote_jid.clone())
314 } else if is_from_me {
315 self.persistence_manager
316 .get_device_snapshot()
317 .await
318 .pn
319 .clone()
320 .unwrap_or_else(|| remote_jid.clone())
321 } else {
322 remote_jid.clone()
323 };
324
325 let timestamp = web_msg
326 .message_timestamp
327 .map(|ts| {
328 chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_else(chrono::Utc::now)
329 })
330 .unwrap_or_else(chrono::Utc::now);
331
332 Ok(MessageInfo {
333 id: key.id.clone().unwrap_or_default(),
334 server_id: 0,
335 r#type: String::new(),
336 source: MessageSource {
337 chat: remote_jid,
338 sender,
339 sender_alt: None,
340 recipient_alt: None,
341 is_from_me,
342 is_group,
343 addressing_mode: None,
344 broadcast_list_owner: None,
345 recipient: None,
346 },
347 timestamp,
348 push_name: web_msg.push_name.clone().unwrap_or_default(),
349 category: String::new(),
350 multicast: false,
351 media_type: String::new(),
352 edit: EditAttribute::default(),
353 bot_info: None,
354 meta_info: MsgMetaInfo::default(),
355 verified_name: None,
356 device_sent_meta: None,
357 })
358 }
359
360 pub(crate) fn spawn_pdo_request_with_options(
366 self: &Arc<Self>,
367 info: &MessageInfo,
368 immediate: bool,
369 ) {
370 if info.source.is_from_me {
372 return;
373 }
374 if info.source.chat.server == wacore_binary::jid::BROADCAST_SERVER {
375 return;
376 }
377
378 let client_clone = Arc::clone(self);
379 let info_clone = info.clone();
380
381 tokio::spawn(async move {
382 if !immediate {
383 tokio::time::sleep(Duration::from_millis(500)).await;
386 }
387
388 if let Err(e) = client_clone
389 .send_pdo_placeholder_resend_request(&info_clone)
390 .await
391 {
392 warn!(
393 "Failed to send PDO request for message {} from {}: {:?}",
394 info_clone.id, info_clone.source.sender, e
395 );
396 }
397 });
398 }
399
400 pub(crate) fn spawn_pdo_request(self: &Arc<Self>, info: &MessageInfo) {
403 self.spawn_pdo_request_with_options(info, false);
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use wacore_binary::jid::{DEFAULT_USER_SERVER, Jid, JidExt};
410
411 #[test]
412 fn test_pdo_primary_phone_jid_is_device_0() {
413 let own_pn = Jid::pn("559999999999");
415 let primary_phone_jid = own_pn.with_device(0);
416
417 assert_eq!(primary_phone_jid.device, 0);
418 assert!(!primary_phone_jid.is_ad()); }
420
421 #[test]
422 fn test_pdo_primary_phone_jid_preserves_user() {
423 let own_pn = Jid::pn("559999999999");
424 let primary_phone_jid = own_pn.with_device(0);
425
426 assert_eq!(primary_phone_jid.user, "559999999999");
427 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
428 }
429
430 #[test]
431 fn test_pdo_primary_phone_jid_from_linked_device() {
432 let own_pn = Jid::pn_device("559999999999", 33);
434 let primary_phone_jid = own_pn.with_device(0);
435
436 assert_eq!(primary_phone_jid.user, "559999999999");
437 assert_eq!(primary_phone_jid.device, 0);
438 }
439}