1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use wacore::client::context::SendContextResolver;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore_binary::jid::{Jid, JidExt as _};
8use waproto::whatsapp as wa;
9
10impl Client {
11 pub async fn send_message(
12 &self,
13 to: Jid,
14 message: wa::Message,
15 ) -> Result<String, anyhow::Error> {
16 let request_id = self.generate_message_id().await;
17 self.send_message_impl(to, &message, Some(request_id.clone()), false, false, None)
18 .await?;
19 Ok(request_id)
20 }
21
22 pub(crate) async fn send_message_impl(
23 &self,
24 to: Jid,
25 message: &wa::Message,
26 request_id_override: Option<String>,
27 peer: bool,
28 force_key_distribution: bool,
29 edit: Option<crate::types::message::EditAttribute>,
30 ) -> Result<(), anyhow::Error> {
31 let request_id = match request_id_override {
33 Some(id) => id,
34 None => self.generate_message_id().await,
35 };
36
37 let stanza_to_send: wacore_binary::Node = if peer && !to.is_group() {
38 let encryption_jid = self.resolve_encryption_jid(&to).await;
41 let signal_addr_str = encryption_jid.to_protocol_address().to_string();
42
43 let session_mutex = self
44 .session_locks
45 .get_with(signal_addr_str.clone(), async {
46 std::sync::Arc::new(tokio::sync::Mutex::new(()))
47 })
48 .await;
49 let _session_guard = session_mutex.lock().await;
50
51 let device_store_arc = self.persistence_manager.get_device_arc().await;
53 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
54
55 wacore::send::prepare_peer_stanza(
56 &mut store_adapter.session_store,
57 &mut store_adapter.identity_store,
58 to,
59 message,
60 request_id,
61 )
62 .await?
63 } else if to.is_group() {
65 let mut group_info = self.groups().query_info(&to).await?;
71
72 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
73 let own_jid = device_snapshot
74 .pn
75 .clone()
76 .ok_or_else(|| anyhow!("Not logged in"))?;
77 let own_lid = device_snapshot
78 .lid
79 .clone()
80 .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
81 let account_info = device_snapshot.account.clone();
82
83 self.add_recent_message(to.clone(), request_id.clone(), message)
85 .await;
86
87 let device_store_arc = self.persistence_manager.get_device_arc().await;
88
89 let (own_sending_jid, _) = match group_info.addressing_mode {
90 crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
91 crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
92 };
93
94 if !group_info
95 .participants
96 .iter()
97 .any(|participant| participant.is_same_user_as(&own_sending_jid))
98 {
99 group_info.participants.push(own_sending_jid.to_non_ad());
100 }
101
102 let force_skdm = {
103 use wacore::libsignal::protocol::SenderKeyStore;
104 use wacore::libsignal::store::sender_key_name::SenderKeyName;
105 let mut device_guard = device_store_arc.write().await;
106 let sender_address = own_sending_jid.to_protocol_address();
107 let sender_key_name =
108 SenderKeyName::new(to.to_string(), sender_address.to_string());
109
110 let key_exists = device_guard
111 .load_sender_key(&sender_key_name)
112 .await?
113 .is_some();
114
115 force_key_distribution || !key_exists
116 };
117
118 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
119
120 let mut stores = wacore::send::SignalStores {
121 session_store: &mut store_adapter.session_store,
122 identity_store: &mut store_adapter.identity_store,
123 prekey_store: &mut store_adapter.pre_key_store,
124 signed_prekey_store: &store_adapter.signed_pre_key_store,
125 sender_key_store: &mut store_adapter.sender_key_store,
126 };
127
128 let marked_for_fresh_skdm = self
131 .consume_forget_marks(&to.to_string())
132 .await
133 .unwrap_or_default();
134
135 let skdm_target_devices: Option<Vec<Jid>> = if force_skdm {
137 None } else {
140 let known_recipients = self
142 .persistence_manager
143 .get_skdm_recipients(&to.to_string())
144 .await
145 .unwrap_or_default();
146
147 if known_recipients.is_empty() {
148 None
150 } else {
151 let jids_to_resolve: Vec<Jid> = group_info
153 .participants
154 .iter()
155 .map(|jid| jid.to_non_ad())
156 .collect();
157
158 match SendContextResolver::resolve_devices(self, &jids_to_resolve).await {
159 Ok(all_devices) => {
160 let new_devices: Vec<Jid> = all_devices
162 .into_iter()
163 .filter(|device: &Jid| {
164 !known_recipients.contains(&device.to_string())
165 })
166 .collect();
167
168 if new_devices.is_empty() {
169 Some(vec![]) } else {
171 log::debug!(
172 "Found {} new devices needing SKDM for group {}",
173 new_devices.len(),
174 to
175 );
176 Some(new_devices)
177 }
178 }
179 Err(e) => {
180 log::warn!("Failed to resolve devices for SKDM check: {:?}", e);
181 None }
183 }
184 }
185 };
186
187 let skdm_target_devices: Option<Vec<Jid>> = if !marked_for_fresh_skdm.is_empty() {
190 match skdm_target_devices {
191 None => None, Some(mut devices) => {
193 for marked_jid_str in &marked_for_fresh_skdm {
195 if let Ok(marked_jid) = marked_jid_str.parse::<Jid>()
196 && !devices.iter().any(|d| d.to_string() == *marked_jid_str)
197 {
198 log::debug!(
199 "Adding {} to SKDM targets (marked for fresh key)",
200 marked_jid_str
201 );
202 devices.push(marked_jid);
203 }
204 }
205 Some(devices)
206 }
207 }
208 } else {
209 skdm_target_devices
210 };
211
212 let devices_receiving_skdm: Vec<String> = skdm_target_devices
214 .as_ref()
215 .map(|devices: &Vec<Jid>| devices.iter().map(|d: &Jid| d.to_string()).collect())
216 .unwrap_or_default();
217
218 match wacore::send::prepare_group_stanza(
220 &mut stores,
221 self,
222 &mut group_info,
223 &own_jid,
224 &own_lid,
225 account_info.as_ref(),
226 to.clone(),
227 message,
228 request_id.clone(),
229 force_skdm,
230 skdm_target_devices.clone(),
231 edit.clone(),
232 )
233 .await
234 {
235 Ok(stanza) => {
236 if !devices_receiving_skdm.is_empty() {
238 if let Err(e) = self
239 .persistence_manager
240 .add_skdm_recipients(&to.to_string(), &devices_receiving_skdm)
241 .await
242 {
243 log::warn!("Failed to update SKDM recipients: {:?}", e);
244 }
245 } else if force_skdm || skdm_target_devices.is_none() {
246 let jids_to_resolve: Vec<Jid> = group_info
248 .participants
249 .iter()
250 .map(|jid| jid.to_non_ad())
251 .collect();
252
253 if let Ok(all_devices) =
254 SendContextResolver::resolve_devices(self, &jids_to_resolve).await
255 {
256 let all_device_strs: Vec<String> =
257 all_devices.iter().map(|d| d.to_string()).collect();
258 if let Err(e) = self
259 .persistence_manager
260 .add_skdm_recipients(&to.to_string(), &all_device_strs)
261 .await
262 {
263 log::warn!("Failed to update SKDM recipients: {:?}", e);
264 }
265 }
266 }
267 stanza
268 }
269 Err(e) => {
270 if let Some(SignalProtocolError::NoSenderKeyState(_)) =
271 e.downcast_ref::<SignalProtocolError>()
272 {
273 log::warn!("No sender key for group {}, forcing distribution.", to);
274
275 if let Err(e) = self
277 .persistence_manager
278 .clear_skdm_recipients(&to.to_string())
279 .await
280 {
281 log::warn!("Failed to clear SKDM recipients: {:?}", e);
282 }
283
284 let mut store_adapter_retry =
285 SignalProtocolStoreAdapter::new(device_store_arc.clone());
286 let mut stores_retry = wacore::send::SignalStores {
287 session_store: &mut store_adapter_retry.session_store,
288 identity_store: &mut store_adapter_retry.identity_store,
289 prekey_store: &mut store_adapter_retry.pre_key_store,
290 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
291 sender_key_store: &mut store_adapter_retry.sender_key_store,
292 };
293
294 wacore::send::prepare_group_stanza(
295 &mut stores_retry,
296 self,
297 &mut group_info,
298 &own_jid,
299 &own_lid,
300 account_info.as_ref(),
301 to,
302 message,
303 request_id,
304 true, None, edit.clone(),
307 )
308 .await?
309 } else {
310 return Err(e);
311 }
312 }
313 }
314 } else {
315 let recipient_devices = self.get_user_devices(std::slice::from_ref(&to)).await?;
320 self.ensure_e2e_sessions(recipient_devices).await?;
321
322 let encryption_jid = self.resolve_encryption_jid(&to).await;
324 let signal_addr_str = encryption_jid.to_protocol_address().to_string();
325
326 self.add_recent_message(to.clone(), request_id.clone(), message)
328 .await;
329
330 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
331 let own_jid = device_snapshot
332 .pn
333 .clone()
334 .ok_or_else(|| anyhow!("Not logged in"))?;
335 let account_info = device_snapshot.account.clone();
336
337 let session_mutex = self
339 .session_locks
340 .get_with(signal_addr_str.clone(), async {
341 std::sync::Arc::new(tokio::sync::Mutex::new(()))
342 })
343 .await;
344 let _session_guard = session_mutex.lock().await;
345
346 let device_store_arc = self.persistence_manager.get_device_arc().await;
348 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
349
350 let mut stores = wacore::send::SignalStores {
351 session_store: &mut store_adapter.session_store,
352 identity_store: &mut store_adapter.identity_store,
353 prekey_store: &mut store_adapter.pre_key_store,
354 signed_prekey_store: &store_adapter.signed_pre_key_store,
355 sender_key_store: &mut store_adapter.sender_key_store,
356 };
357
358 wacore::send::prepare_dm_stanza(
359 &mut stores,
360 self,
361 &own_jid,
362 account_info.as_ref(),
363 to,
364 message,
365 request_id,
366 edit,
367 )
368 .await?
369 };
371 self.send_node(stanza_to_send).await.map_err(|e| e.into())
373 }
374}