whatsapp_rust/
send.rs

1use crate::client::Client;
2use crate::store::signal_adapter::SignalProtocolStoreAdapter;
3use anyhow::anyhow;
4use std::sync::Arc;
5use wacore::libsignal::protocol::SignalProtocolError;
6use wacore::types::jid::JidExt;
7use wacore_binary::jid::{Jid, JidExt as _};
8use waproto::whatsapp as wa;
9
10impl Client {
11    pub async fn send_message(
12        &self,
13        to: Jid,
14        message: wa::Message,
15    ) -> Result<String, anyhow::Error> {
16        let request_id = self.generate_message_id().await;
17        self.send_message_impl(
18            to,
19            Arc::new(message),
20            Some(request_id.clone()),
21            false,
22            false,
23            None,
24        )
25        .await?;
26        Ok(request_id)
27    }
28
29    pub(crate) async fn send_message_impl(
30        &self,
31        to: Jid,
32        message: Arc<wa::Message>,
33        request_id_override: Option<String>,
34        peer: bool,
35        force_key_distribution: bool,
36        edit: Option<crate::types::message::EditAttribute>,
37    ) -> Result<(), anyhow::Error> {
38        let chat_mutex = self
39            .chat_locks
40            .entry(to.clone())
41            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
42            .clone();
43        let _chat_guard = chat_mutex.lock().await;
44
45        let request_id = match request_id_override {
46            Some(id) => id,
47            None => self.generate_message_id().await,
48        };
49
50        let stanza_to_send: wacore_binary::Node = if peer {
51            let device_store_arc = self.persistence_manager.get_device_arc().await;
52            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
53
54            wacore::send::prepare_peer_stanza(
55                &mut store_adapter.session_store,
56                &mut store_adapter.identity_store,
57                to,
58                message.as_ref(),
59                request_id,
60            )
61            .await?
62        } else if to.is_group() {
63            let mut group_info = self.query_group_info(&to).await?;
64
65            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
66            let own_jid = device_snapshot
67                .pn
68                .clone()
69                .ok_or_else(|| anyhow!("Not logged in"))?;
70            let own_lid = device_snapshot
71                .lid
72                .clone()
73                .ok_or_else(|| anyhow!("LID not set, cannot send to group"))?;
74            let account_info = device_snapshot.account.clone();
75
76            let _ = self
77                .add_recent_message(to.clone(), request_id.clone(), Arc::clone(&message))
78                .await;
79
80            let device_store_arc = self.persistence_manager.get_device_arc().await;
81
82            let (own_sending_jid, _) = match group_info.addressing_mode {
83                crate::types::message::AddressingMode::Lid => (own_lid.clone(), "lid"),
84                crate::types::message::AddressingMode::Pn => (own_jid.clone(), "pn"),
85            };
86
87            if !group_info
88                .participants
89                .iter()
90                .any(|participant| participant.is_same_user_as(&own_sending_jid))
91            {
92                group_info.participants.push(own_sending_jid.to_non_ad());
93            }
94
95            let force_skdm = {
96                use wacore::libsignal::protocol::SenderKeyStore;
97                use wacore::libsignal::store::sender_key_name::SenderKeyName;
98                let mut device_guard = device_store_arc.write().await;
99                let sender_address = own_sending_jid.to_protocol_address();
100                let sender_key_name =
101                    SenderKeyName::new(to.to_string(), sender_address.to_string());
102
103                let key_exists = device_guard
104                    .load_sender_key(&sender_key_name)
105                    .await?
106                    .is_some();
107
108                force_key_distribution || !key_exists
109            };
110
111            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc.clone());
112
113            let mut stores = wacore::send::SignalStores {
114                session_store: &mut store_adapter.session_store,
115                identity_store: &mut store_adapter.identity_store,
116                prekey_store: &mut store_adapter.pre_key_store,
117                signed_prekey_store: &store_adapter.signed_pre_key_store,
118                sender_key_store: &mut store_adapter.sender_key_store,
119            };
120
121            match wacore::send::prepare_group_stanza(
122                &mut stores,
123                self,
124                &mut group_info,
125                &own_jid,
126                &own_lid,
127                account_info.as_ref(),
128                to.clone(),
129                message.as_ref(),
130                request_id.clone(),
131                force_skdm,
132                edit.clone(),
133            )
134            .await
135            {
136                Ok(stanza) => stanza,
137                Err(e) => {
138                    if let Some(SignalProtocolError::NoSenderKeyState) =
139                        e.downcast_ref::<SignalProtocolError>()
140                    {
141                        log::warn!("No sender key for group {}, forcing distribution.", to);
142
143                        let mut store_adapter_retry =
144                            SignalProtocolStoreAdapter::new(device_store_arc.clone());
145                        let mut stores_retry = wacore::send::SignalStores {
146                            session_store: &mut store_adapter_retry.session_store,
147                            identity_store: &mut store_adapter_retry.identity_store,
148                            prekey_store: &mut store_adapter_retry.pre_key_store,
149                            signed_prekey_store: &store_adapter_retry.signed_pre_key_store,
150                            sender_key_store: &mut store_adapter_retry.sender_key_store,
151                        };
152
153                        wacore::send::prepare_group_stanza(
154                            &mut stores_retry,
155                            self,
156                            &mut group_info,
157                            &own_jid,
158                            &own_lid,
159                            account_info.as_ref(),
160                            to,
161                            message.as_ref(),
162                            request_id,
163                            true, // Force distribution on retry
164                            edit.clone(),
165                        )
166                        .await?
167                    } else {
168                        return Err(e);
169                    }
170                }
171            }
172        } else {
173            let _ = self
174                .add_recent_message(to.clone(), request_id.clone(), Arc::clone(&message))
175                .await;
176
177            let device_snapshot = self.persistence_manager.get_device_snapshot().await;
178            let own_jid = device_snapshot
179                .pn
180                .clone()
181                .ok_or_else(|| anyhow!("Not logged in"))?;
182            let account_info = device_snapshot.account.clone();
183
184            let device_store_arc = self.persistence_manager.get_device_arc().await;
185            let mut store_adapter = SignalProtocolStoreAdapter::new(device_store_arc);
186
187            let mut stores = wacore::send::SignalStores {
188                session_store: &mut store_adapter.session_store,
189                identity_store: &mut store_adapter.identity_store,
190                prekey_store: &mut store_adapter.pre_key_store,
191                signed_prekey_store: &store_adapter.signed_pre_key_store,
192                sender_key_store: &mut store_adapter.sender_key_store,
193            };
194
195            wacore::send::prepare_dm_stanza(
196                &mut stores,
197                self,
198                &own_jid,
199                account_info.as_ref(),
200                to,
201                message.as_ref(),
202                request_id,
203                edit,
204            )
205            .await?
206        };
207        self.send_node(stanza_to_send).await.map_err(|e| e.into())
208    }
209}