1use crate::cache::Cache;
18use crate::client::Client;
19use crate::types::message::MessageInfo;
20use log::{debug, info, warn};
21use prost::Message;
22use std::sync::Arc;
23use std::time::Duration;
24use wacore::types::message::{EditAttribute, MessageSource, MsgMetaInfo};
25use wacore_binary::jid::{Jid, JidExt};
26use waproto::whatsapp as wa;
27
28#[derive(Clone, Debug)]
31pub struct PendingPdoRequest {
32 pub message_info: MessageInfo,
33 pub requested_at: wacore::time::Instant,
34}
35
36pub fn new_pdo_cache() -> Cache<String, PendingPdoRequest> {
39 Cache::builder()
40 .time_to_live(Duration::from_secs(30))
41 .max_capacity(500)
42 .build()
43}
44
45impl Client {
46 pub async fn send_pdo_placeholder_resend_request(
60 self: &Arc<Self>,
61 info: &MessageInfo,
62 ) -> Result<(), anyhow::Error> {
63 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
64
65 let own_pn = device_snapshot
68 .pn
69 .clone()
70 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
71
72 let primary_phone_jid = own_pn.with_device(0);
74
75 let remote_jid = self.resolve_encryption_jid(&info.source.chat).await;
78 let participant = if info.source.is_group {
79 Some(self.resolve_encryption_jid(&info.source.sender).await)
80 } else {
81 None
82 };
83
84 let cache_key = format!("{}:{}", remote_jid, info.id);
86
87 if self.pdo_pending_requests.get(&cache_key).await.is_some() {
88 debug!(
89 "PDO request already pending for message {} from {} (resolved: {})",
90 info.id, info.source.sender, remote_jid
91 );
92 return Ok(());
93 }
94
95 let pending = PendingPdoRequest {
96 message_info: info.clone(),
97 requested_at: wacore::time::Instant::now(),
98 };
99 self.pdo_pending_requests
100 .insert(cache_key.clone(), pending)
101 .await;
102
103 let message_key = wa::MessageKey {
105 remote_jid: Some(remote_jid.to_string()),
106 from_me: Some(info.source.is_from_me),
107 id: Some(info.id.clone()),
108 participant: participant.map(|p| p.to_string()),
109 };
110
111 let pdo_request = wa::message::PeerDataOperationRequestMessage {
113 peer_data_operation_request_type: Some(
114 wa::message::PeerDataOperationRequestType::PlaceholderMessageResend as i32,
115 ),
116 placeholder_message_resend_request: vec![
117 wa::message::peer_data_operation_request_message::PlaceholderMessageResendRequest {
118 message_key: Some(message_key),
119 },
120 ],
121 ..Default::default()
122 };
123
124 let protocol_message = wa::message::ProtocolMessage {
126 r#type: Some(
127 wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
128 ),
129 peer_data_operation_request_message: Some(pdo_request),
130 ..Default::default()
131 };
132
133 let msg = wa::Message {
134 protocol_message: Some(Box::new(protocol_message)),
135 ..Default::default()
136 };
137
138 info!(
139 "Sending PDO placeholder resend request for message {} from {} in {} to primary phone {}",
140 info.id, info.source.sender, info.source.chat, primary_phone_jid
141 );
142
143 self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
145 .await?;
146
147 match self.send_peer_message(primary_phone_jid, &msg).await {
149 Ok(_) => {
150 debug!("PDO request sent successfully for message {}", info.id);
151 Ok(())
152 }
153 Err(e) => {
154 self.pdo_pending_requests.remove(&cache_key).await;
156 warn!(
157 "Failed to send PDO request for message {}: {:?}",
158 info.id, e
159 );
160 Err(e)
161 }
162 }
163 }
164
165 pub async fn fetch_message_history(
167 self: &Arc<Self>,
168 chat_jid: &Jid,
169 oldest_msg_id: &str,
170 oldest_msg_from_me: bool,
171 oldest_msg_timestamp_ms: i64,
172 count: i32,
173 ) -> Result<String, anyhow::Error> {
174 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
175 let own_pn = device_snapshot
176 .pn
177 .clone()
178 .ok_or_else(|| anyhow::Error::from(crate::client::ClientError::NotLoggedIn))?;
179 let primary_phone_jid = own_pn.with_device(0);
180
181 let pdo_request = wa::message::PeerDataOperationRequestMessage {
182 peer_data_operation_request_type: Some(
183 wa::message::PeerDataOperationRequestType::HistorySyncOnDemand as i32,
184 ),
185 history_sync_on_demand_request: Some(
186 wa::message::peer_data_operation_request_message::HistorySyncOnDemandRequest {
187 chat_jid: Some(chat_jid.to_string()),
188 oldest_msg_id: Some(oldest_msg_id.to_string()),
189 oldest_msg_from_me: Some(oldest_msg_from_me),
190 oldest_msg_timestamp_ms: Some(oldest_msg_timestamp_ms),
191 on_demand_msg_count: Some(count),
192 ..Default::default()
193 },
194 ),
195 ..Default::default()
196 };
197
198 let protocol_message = wa::message::ProtocolMessage {
199 r#type: Some(
200 wa::message::protocol_message::Type::PeerDataOperationRequestMessage as i32,
201 ),
202 peer_data_operation_request_message: Some(pdo_request),
203 ..Default::default()
204 };
205
206 let msg = wa::Message {
207 protocol_message: Some(Box::new(protocol_message)),
208 ..Default::default()
209 };
210
211 info!(
212 "Sending PDO history sync on-demand request for chat {} (count={}) to primary phone {}",
213 chat_jid, count, primary_phone_jid
214 );
215
216 self.ensure_e2e_sessions(vec![primary_phone_jid.clone()])
217 .await?;
218 self.send_peer_message(primary_phone_jid, &msg).await
219 }
220
221 async fn send_peer_message(
224 self: &Arc<Self>,
225 to: Jid,
226 msg: &wa::Message,
227 ) -> Result<String, anyhow::Error> {
228 let msg_id = self.generate_message_id().await;
229
230 self.send_message_impl(
232 to,
233 msg,
234 Some(msg_id.clone()),
235 true, false, None,
238 vec![], )
240 .await?;
241
242 Ok(msg_id)
243 }
244
245 pub async fn handle_pdo_response(
252 self: &Arc<Self>,
253 response: &wa::message::PeerDataOperationRequestResponseMessage,
254 _pdo_msg_info: &MessageInfo,
255 ) {
256 debug!(
257 "Received PDO response with {} results",
258 response.peer_data_operation_result.len()
259 );
260
261 for result in &response.peer_data_operation_result {
262 if let Some(placeholder_response) = &result.placeholder_message_resend_response {
263 self.handle_placeholder_resend_response(placeholder_response)
264 .await;
265 }
266 }
267 }
268
269 async fn handle_placeholder_resend_response(
271 self: &Arc<Self>,
272 response: &wa::message::peer_data_operation_request_response_message::peer_data_operation_result::PlaceholderMessageResendResponse,
273 ) {
274 let Some(web_message_info_bytes) = &response.web_message_info_bytes else {
275 warn!("PDO placeholder response missing webMessageInfoBytes");
276 return;
277 };
278
279 let web_msg_info = match wa::WebMessageInfo::decode(web_message_info_bytes.as_slice()) {
281 Ok(info) => info,
282 Err(e) => {
283 warn!("Failed to decode WebMessageInfo from PDO response: {:?}", e);
284 return;
285 }
286 };
287
288 let key = &web_msg_info.key;
290
291 let remote_jid = key.remote_jid.as_deref().unwrap_or("");
292 let msg_id = key.id.as_deref().unwrap_or("");
293 let cache_key = format!("{}:{}", remote_jid, msg_id);
294
295 let pending = self.pdo_pending_requests.remove(&cache_key).await;
297
298 let elapsed = pending
299 .as_ref()
300 .map(|p| p.requested_at.elapsed().as_millis())
301 .unwrap_or(0);
302
303 info!(
304 "Received PDO placeholder response for message {} (took {}ms)",
305 msg_id, elapsed
306 );
307
308 let message_info = if let Some(pending) = pending {
310 pending.message_info
311 } else {
312 match self.message_info_from_web_message_info(&web_msg_info).await {
314 Ok(info) => info,
315 Err(e) => {
316 warn!(
317 "Failed to reconstruct MessageInfo from PDO response: {:?}",
318 e
319 );
320 return;
321 }
322 }
323 };
324
325 let Some(message) = web_msg_info.message else {
327 warn!("PDO response WebMessageInfo missing message content");
328 return;
329 };
330
331 info!(
333 "Dispatching PDO-recovered message {} from {} via phone",
334 message_info.id, message_info.source.sender
335 );
336
337 self.core
338 .event_bus
339 .dispatch(&wacore::types::events::Event::Message(
340 Box::new(message),
341 message_info,
342 ));
343 }
344
345 async fn message_info_from_web_message_info(
348 &self,
349 web_msg: &wa::WebMessageInfo,
350 ) -> Result<MessageInfo, anyhow::Error> {
351 let key = &web_msg.key;
352
353 let remote_jid: Jid = key
354 .remote_jid
355 .as_ref()
356 .ok_or_else(|| anyhow::anyhow!("MessageKey missing remoteJid"))?
357 .parse()?;
358
359 let is_group = remote_jid.is_group();
360 let is_from_me = key.from_me.unwrap_or(false);
361
362 let sender = if is_group {
363 key.participant
364 .as_ref()
365 .map(|p: &String| p.parse())
366 .transpose()?
367 .unwrap_or_else(|| remote_jid.clone())
368 } else if is_from_me {
369 self.persistence_manager
370 .get_device_snapshot()
371 .await
372 .pn
373 .clone()
374 .unwrap_or_else(|| remote_jid.clone())
375 } else {
376 remote_jid.clone()
377 };
378
379 let timestamp = web_msg
380 .message_timestamp
381 .map(|ts| {
382 chrono::DateTime::from_timestamp(ts as i64, 0).unwrap_or_else(chrono::Utc::now)
383 })
384 .unwrap_or_else(chrono::Utc::now);
385
386 Ok(MessageInfo {
387 id: key.id.clone().unwrap_or_default(),
388 server_id: 0,
389 r#type: String::new(),
390 source: MessageSource {
391 chat: remote_jid,
392 sender,
393 sender_alt: None,
394 recipient_alt: None,
395 is_from_me,
396 is_group,
397 addressing_mode: None,
398 broadcast_list_owner: None,
399 recipient: None,
400 },
401 timestamp,
402 push_name: web_msg.push_name.clone().unwrap_or_default(),
403 category: String::new(),
404 multicast: false,
405 media_type: String::new(),
406 edit: EditAttribute::default(),
407 bot_info: None,
408 meta_info: MsgMetaInfo::default(),
409 verified_name: None,
410 device_sent_meta: None,
411 })
412 }
413
414 pub(crate) fn spawn_pdo_request_with_options(
420 self: &Arc<Self>,
421 info: &MessageInfo,
422 immediate: bool,
423 ) {
424 if info.source.is_from_me {
426 return;
427 }
428 if info.source.chat.server == wacore_binary::jid::BROADCAST_SERVER {
429 return;
430 }
431
432 let client_clone = Arc::clone(self);
433 let info_clone = info.clone();
434
435 self.runtime
436 .spawn(Box::pin(async move {
437 if !immediate {
438 client_clone.runtime.sleep(Duration::from_millis(500)).await;
441 }
442
443 if let Err(e) = client_clone
444 .send_pdo_placeholder_resend_request(&info_clone)
445 .await
446 {
447 warn!(
448 "Failed to send PDO request for message {} from {}: {:?}",
449 info_clone.id, info_clone.source.sender, e
450 );
451 }
452 }))
453 .detach();
454 }
455
456 pub(crate) fn spawn_pdo_request(self: &Arc<Self>, info: &MessageInfo) {
459 self.spawn_pdo_request_with_options(info, false);
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use wacore_binary::jid::{DEFAULT_USER_SERVER, Jid, JidExt};
466
467 #[test]
468 fn test_pdo_primary_phone_jid_is_device_0() {
469 let own_pn = Jid::pn("559999999999");
471 let primary_phone_jid = own_pn.with_device(0);
472
473 assert_eq!(primary_phone_jid.device, 0);
474 assert!(!primary_phone_jid.is_ad()); }
476
477 #[test]
478 fn test_pdo_primary_phone_jid_preserves_user() {
479 let own_pn = Jid::pn("559999999999");
480 let primary_phone_jid = own_pn.with_device(0);
481
482 assert_eq!(primary_phone_jid.user, "559999999999");
483 assert_eq!(primary_phone_jid.server, DEFAULT_USER_SERVER);
484 }
485
486 #[test]
487 fn test_pdo_primary_phone_jid_from_linked_device() {
488 let own_pn = Jid::pn_device("559999999999", 33);
490 let primary_phone_jid = own_pn.with_device(0);
491
492 assert_eq!(primary_phone_jid.user, "559999999999");
493 assert_eq!(primary_phone_jid.device, 0);
494 }
495}