1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use std::sync::Arc;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore_binary::jid::{Jid, JidExt as _};
8use waproto::whatsapp as wa;
9
10impl Client {
11 pub async fn send_message(
12 &self,
13 to: Jid,
14 message: wa::Message,
15 ) -> Result<String, anyhow::Error> {
16 let request_id = self.generate_message_id().await;
17 self.send_message_impl(
18 to,
19 Arc::new(message),
20 Some(request_id.clone()),
21 false,
22 false,
23 None,
24 )
25 .await?;
26 Ok(request_id)
27 }
28
29 pub(crate) async fn send_message_impl(
30 &self,
31 to: Jid,
32 message: Arc<wa::Message>,
33 request_id_override: Option<String>,
34 peer: bool,
35 force_key_distribution: bool,
36 edit: Option<crate::types::message::EditAttribute>,
37 ) -> Result<(), anyhow::Error> {
38 let chat_mutex = self
39 .chat_locks
40 .entry(to.clone())
41 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
42 .clone();
43 let _chat_guard = chat_mutex.lock().await;
44
45 let request_id = match request_id_override {
46 Some(id) => id,
47 None => self.generate_message_id().await,
48 };
49
50 let stanza_to_send: wacore_binary::Node = if peer {
51 let device_store_arc = self.persistence_manager.get_device_arc().await;
52 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
53
54 wacore::send::prepare_peer_stanza(
55 &mut store_adapter.session_store,
56 &mut store_adapter.identity_store,
57 to,
58 message.as_ref(),
59 request_id,
60 )
61 .await?
62 } else if to.is_group() {
63 let mut group_info = self.query_group_info(&to).await?;
64
65 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
66 let own_jid = device_snapshot
67 .pn
68 .clone()
69 .ok_or_else(|| anyhow!("Not logged in"))?;
70 let own_lid = device_snapshot
71 .lid
72 .clone()
73 .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
74 let account_info = device_snapshot.account.clone();
75
76 let _ = self
77 .add_recent_message(to.clone(), request_id.clone(), Arc::clone(&message))
78 .await;
79
80 let device_store_arc = self.persistence_manager.get_device_arc().await;
81
82 let (own_sending_jid, _) = match group_info.addressing_mode {
83 crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
84 crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
85 };
86
87 if !group_info
88 .participants
89 .iter()
90 .any(|participant| participant.is_same_user_as(&own_sending_jid))
91 {
92 group_info.participants.push(own_sending_jid.to_non_ad());
93 }
94
95 let force_skdm = {
96 use wacore::libsignal::protocol::SenderKeyStore;
97 use wacore::libsignal::store::sender_key_name::SenderKeyName;
98 let mut device_guard = device_store_arc.write().await;
99 let sender_address = own_sending_jid.to_protocol_address();
100 let sender_key_name =
101 SenderKeyName::new(to.to_string(), sender_address.to_string());
102
103 let key_exists = device_guard
104 .load_sender_key(&sender_key_name)
105 .await?
106 .is_some();
107
108 force_key_distribution || !key_exists
109 };
110
111 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
112
113 let mut stores = wacore::send::SignalStores {
114 session_store: &mut store_adapter.session_store,
115 identity_store: &mut store_adapter.identity_store,
116 prekey_store: &mut store_adapter.pre_key_store,
117 signed_prekey_store: &store_adapter.signed_pre_key_store,
118 sender_key_store: &mut store_adapter.sender_key_store,
119 };
120
121 match wacore::send::prepare_group_stanza(
122 &mut stores,
123 self,
124 &mut group_info,
125 &own_jid,
126 &own_lid,
127 account_info.as_ref(),
128 to.clone(),
129 message.as_ref(),
130 request_id.clone(),
131 force_skdm,
132 edit.clone(),
133 )
134 .await
135 {
136 Ok(stanza) => stanza,
137 Err(e) => {
138 if let Some(SignalProtocolError::NoSenderKeyState) =
139 e.downcast_ref::<SignalProtocolError>()
140 {
141 log::warn!("No sender key for group {}, forcing distribution.", to);
142
143 let mut store_adapter_retry =
144 SignalProtocolStoreAdapter::new(device_store_arc.clone());
145 let mut stores_retry = wacore::send::SignalStores {
146 session_store: &mut store_adapter_retry.session_store,
147 identity_store: &mut store_adapter_retry.identity_store,
148 prekey_store: &mut store_adapter_retry.pre_key_store,
149 signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
150 sender_key_store: &mut store_adapter_retry.sender_key_store,
151 };
152
153 wacore::send::prepare_group_stanza(
154 &mut stores_retry,
155 self,
156 &mut group_info,
157 &own_jid,
158 &own_lid,
159 account_info.as_ref(),
160 to,
161 message.as_ref(),
162 request_id,
163 true, edit.clone(),
165 )
166 .await?
167 } else {
168 return Err(e);
169 }
170 }
171 }
172 } else {
173 let _ = self
174 .add_recent_message(to.clone(), request_id.clone(), Arc::clone(&message))
175 .await;
176
177 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
178 let own_jid = device_snapshot
179 .pn
180 .clone()
181 .ok_or_else(|| anyhow!("Not logged in"))?;
182 let account_info = device_snapshot.account.clone();
183
184 let device_store_arc = self.persistence_manager.get_device_arc().await;
185 let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
186
187 let mut stores = wacore::send::SignalStores {
188 session_store: &mut store_adapter.session_store,
189 identity_store: &mut store_adapter.identity_store,
190 prekey_store: &mut store_adapter.pre_key_store,
191 signed_prekey_store: &store_adapter.signed_pre_key_store,
192 sender_key_store: &mut store_adapter.sender_key_store,
193 };
194
195 wacore::send::prepare_dm_stanza(
196 &mut stores,
197 self,
198 &own_jid,
199 account_info.as_ref(),
200 to,
201 message.as_ref(),
202 request_id,
203 edit,
204 )
205 .await?
206 };
207 self.send_node(stanza_to_send).await.map_err(|e| e.into())
208 }
209}