1use crate::client::Client;
18use crate::types::message::MessageInfo;
19use log::{debug, info, warn};
20use moka::future::Cache;
21use prost::Message;
22use std::sync::Arc;
23use std::time::Duration;
24use wacore::types::message::{EditAttribute, MessageSource, MsgMetaInfo};
25use wacore_binary::jid::{Jid, JidExt};
26use waproto::whatsapp as wa;
27
28#[derive(Clone, Debug)]
31pub struct PendingPdoRequest {
32 pub message_info: MessageInfo,
33 pub requested_at: std::time::Instant,
34}
35
36pub fn new_pdo_cache() -> Cache<String, PendingPdoRequest> {
39 Cache::builder()
40 .time_to_live(Duration::from_secs(30))
41 .max_capacity(500)
42 .build()
43}
44
45impl Client {
46 pub async fn send_pdo_placeholder_resend_request(
60 self: &Arc<Self>,
61 info: &MessageInfo,
62 ) -> Result<(), anyhow::Error> {
63 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
64
65 let own_pn = device_snapshot
68 .pn
69 .clone()
70 .ok_or_else(|| anyhow::anyhow!("Not logged in - no phone number available for PDO"))?;
71
72 let primary_phone_jid = own_pn.with_device(0);
74
75 let cache_key = format!("{}:{}", info.source.chat, info.id);
78 let pending = PendingPdoRequest {
79 message_info: info.clone(),
80 requested_at: std::time::Instant::now(),
81 };
82 let entry = self
83 .pdo_pending_requests
84 .entry(cache_key.clone())
85 .or_insert(pending)
86 .await;
87
88 if !entry.is_fresh() {
89 debug!(
90 "PDO request already pending for message {} from {}",
91 info.id, info.source.sender
92 );
93 return Ok(());
94 }
95
96 let message_key = wa::MessageKey {
98 remote_jid: Some(info.source.chat.to_string()),
99 from_me: Some(info.source.is_from_me),
100 id: Some(info.id.clone()),
101 participant: if info.source.is_group {
102 Some(info.source.sender.to_string())
103 } else {
104 None
105 },
106 };
107
108 let pdo_request = wa::message::PeerDataOperationRequestMessage {
110 peer_data_operation_request_type: Some(
111 wa::message::PeerDataOperationRequestType::PlaceholderMessageResend as i32,
112 ),
113 placeholder_message_resend_request: vec![
114 wa::message::peer_data_operation_request_message::PlaceholderMessageResendRequest {
115 message_key: Some(message_key),
116 },
117 ],
118 ..Default::default()
119 };
120
121 let protocol_message = wa::message::ProtocolMessage {
123 r#type: Some(
124 wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
125 ),
126 peer_data_operation_request_message: Some(pdo_request),
127 ..Default::default()
128 };
129
130 let msg = wa::Message {
131 protocol_message: Some(Box::new(protocol_message)),
132 ..Default::default()
133 };
134
135 info!(
136 "Sending PDO placeholder resend request for message {} from {} in {} to primary phone {}",
137 info.id, info.source.sender, info.source.chat, primary_phone_jid
138 );
139
140 self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
142 .await?;
143
144 match self.send_peer_message(primary_phone_jid, &msg).await {
146 Ok(_) => {
147 debug!("PDO request sent successfully for message {}", info.id);
148 Ok(())
149 }
150 Err(e) => {
151 self.pdo_pending_requests.remove(&cache_key).await;
153 warn!(
154 "Failed to send PDO request for message {}: {:?}",
155 info.id, e
156 );
157 Err(e)
158 }
159 }
160 }
161
162 async fn send_peer_message(
165 self: &Arc<Self>,
166 to: Jid,
167 msg: &wa::Message,
168 ) -> Result<String, anyhow::Error> {
169 let msg_id = self.generate_message_id().await;
170
171 self.send_message_impl(
173 to,
174 msg,
175 Some(msg_id.clone()),
176 true, false, None,
179 )
180 .await?;
181
182 Ok(msg_id)
183 }
184
185 pub async fn handle_pdo_response(
192 self: &Arc<Self>,
193 response: &wa::message::PeerDataOperationRequestResponseMessage,
194 _pdo_msg_info: &MessageInfo,
195 ) {
196 debug!(
197 "Received PDO response with {} results",
198 response.peer_data_operation_result.len()
199 );
200
201 for result in &response.peer_data_operation_result {
202 if let Some(placeholder_response) = &result.placeholder_message_resend_response {
203 self.handle_placeholder_resend_response(placeholder_response)
204 .await;
205 }
206 }
207 }
208
209 async fn handle_placeholder_resend_response(
211 self: &Arc<Self>,
212 response: &wa::message::peer_data_operation_request_response_message::peer_data_operation_result::PlaceholderMessageResendResponse,
213 ) {
214 let Some(web_message_info_bytes) = &response.web_message_info_bytes else {
215 warn!("PDO placeholder response missing webMessageInfoBytes");
216 return;
217 };
218
219 let web_msg_info = match wa::WebMessageInfo::decode(web_message_info_bytes.as_slice()) {
221 Ok(info) => info,
222 Err(e) => {
223 warn!("Failed to decode WebMessageInfo from PDO response: {:?}", e);
224 return;
225 }
226 };
227
228 let key = &web_msg_info.key;
230
231 let remote_jid = key.remote_jid.as_deref().unwrap_or("");
232 let msg_id = key.id.as_deref().unwrap_or("");
233 let cache_key = format!("{}:{}", remote_jid, msg_id);
234
235 let pending = self.pdo_pending_requests.remove(&cache_key).await;
237
238 let elapsed = pending
239 .as_ref()
240 .map(|p| p.requested_at.elapsed().as_millis())
241 .unwrap_or(0);
242
243 info!(
244 "Received PDO placeholder response for message {} (took {}ms)",
245 msg_id, elapsed
246 );
247
248 let message_info = if let Some(pending) = pending {
250 pending.message_info
251 } else {
252 match self.message_info_from_web_message_info(&web_msg_info).await {
254 Ok(info) => info,
255 Err(e) => {
256 warn!(
257 "Failed to reconstruct MessageInfo from PDO response: {:?}",
258 e
259 );
260 return;
261 }
262 }
263 };
264
265 let Some(message) = web_msg_info.message else {
267 warn!("PDO response WebMessageInfo missing message content");
268 return;
269 };
270
271 info!(
273 "Dispatching PDO-recovered message {} from {} via phone",
274 message_info.id, message_info.source.sender
275 );
276
277 self.core
278 .event_bus
279 .dispatch(&wacore::types::events::Event::Message(
280 Box::new(message),
281 message_info,
282 ));
283 }
284
285 async fn message_info_from_web_message_info(
288 &self,
289 web_msg: &wa::WebMessageInfo,
290 ) -> Result<MessageInfo, anyhow::Error> {
291 let key = &web_msg.key;
292
293 let remote_jid: Jid = key
294 .remote_jid
295 .as_ref()
296 .ok_or_else(|| anyhow::anyhow!("MessageKey missing remoteJid"))?
297 .parse()?;
298
299 let is_group = remote_jid.is_group();
300 let is_from_me = key.from_me.unwrap_or(false);
301
302 let sender = if is_group {
303 key.participant
304 .as_ref()
305 .map(|p: &String| p.parse())
306 .transpose()?
307 .unwrap_or_else(|| remote_jid.clone())
308 } else if is_from_me {
309 self.persistence_manager
310 .get_device_snapshot()
311 .await
312 .pn
313 .clone()
314 .unwrap_or_else(|| remote_jid.clone())
315 } else {
316 remote_jid.clone()
317 };
318
319 let timestamp = web_msg
320 .message_timestamp
321 .map(|ts| {
322 chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_else(chrono::Utc::now)
323 })
324 .unwrap_or_else(chrono::Utc::now);
325
326 Ok(MessageInfo {
327 id: key.id.clone().unwrap_or_default(),
328 server_id: 0,
329 r#type: String::new(),
330 source: MessageSource {
331 chat: remote_jid,
332 sender,
333 sender_alt: None,
334 recipient_alt: None,
335 is_from_me,
336 is_group,
337 addressing_mode: None,
338 broadcast_list_owner: None,
339 recipient: None,
340 },
341 timestamp,
342 push_name: web_msg.push_name.clone().unwrap_or_default(),
343 category: String::new(),
344 multicast: false,
345 media_type: String::new(),
346 edit: EditAttribute::default(),
347 bot_info: None,
348 meta_info: MsgMetaInfo::default(),
349 verified_name: None,
350 device_sent_meta: None,
351 })
352 }
353
354 pub(crate) fn spawn_pdo_request_with_options(
360 self: &Arc<Self>,
361 info: &MessageInfo,
362 immediate: bool,
363 ) {
364 if info.source.is_from_me {
366 return;
367 }
368 if info.source.chat.server == wacore_binary::jid::BROADCAST_SERVER {
369 return;
370 }
371
372 let client_clone = Arc::clone(self);
373 let info_clone = info.clone();
374
375 tokio::spawn(async move {
376 if !immediate {
377 tokio::time::sleep(Duration::from_millis(500)).await;
380 }
381
382 if let Err(e) = client_clone
383 .send_pdo_placeholder_resend_request(&info_clone)
384 .await
385 {
386 warn!(
387 "Failed to send PDO request for message {} from {}: {:?}",
388 info_clone.id, info_clone.source.sender, e
389 );
390 }
391 });
392 }
393
394 pub(crate) fn spawn_pdo_request(self: &Arc<Self>, info: &MessageInfo) {
397 self.spawn_pdo_request_with_options(info, false);
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use wacore_binary::jid::{DEFAULT_USER_SERVER, Jid, JidExt};
404
405 #[test]
406 fn test_pdo_primary_phone_jid_is_device_0() {
407 let own_pn = Jid::pn("559999999999");
409 let primary_phone_jid = own_pn.with_device(0);
410
411 assert_eq!(primary_phone_jid.device, 0);
412 assert!(!primary_phone_jid.is_ad()); }
414
415 #[test]
416 fn test_pdo_primary_phone_jid_preserves_user() {
417 let own_pn = Jid::pn("559999999999");
418 let primary_phone_jid = own_pn.with_device(0);
419
420 assert_eq!(primary_phone_jid.user, "559999999999");
421 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
422 }
423
424 #[test]
425 fn test_pdo_primary_phone_jid_from_linked_device() {
426 let own_pn = Jid::pn_device("559999999999", 33);
428 let primary_phone_jid = own_pn.with_device(0);
429
430 assert_eq!(primary_phone_jid.user, "559999999999");
431 assert_eq!(primary_phone_jid.device, 0);
432 }
433}