1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
use crate::client::Client;
use crate::client::ClientError;
use crate::socket::error::{EncryptSendError, SocketError};
use futures::FutureExt;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use thiserror::Error;
use wacore::runtime::timeout as rt_timeout;
use wacore_binary::Node;
pub use wacore::request::{InfoQuery, InfoQueryType, RequestUtils};
#[derive(Debug, Error)]
pub enum IqError {
#[error("IQ request timed out")]
Timeout,
#[error("client is not connected")]
NotConnected,
#[error("socket error")]
Socket(#[from] SocketError),
#[error("encrypted send pipeline failed")]
EncryptSend(#[from] EncryptSendError),
#[error("client state prevented send")]
ClientState(#[source] ClientError),
#[error("received disconnect node during IQ wait: {0:?}")]
Disconnected(Node),
#[error("received a server error response: code={code}, text='{text}'")]
ServerError { code: u16, text: String },
#[error("internal channel closed unexpectedly")]
InternalChannelClosed,
#[error("failed to encode IQ request")]
EncodeError(#[source] anyhow::Error),
#[error("failed to parse IQ response")]
ParseError(#[from] anyhow::Error),
}
impl From<wacore::request::IqError> for IqError {
fn from(err: wacore::request::IqError) -> Self {
match err {
wacore::request::IqError::Timeout => Self::Timeout,
wacore::request::IqError::NotConnected => Self::NotConnected,
wacore::request::IqError::Disconnected(node) => Self::Disconnected(node),
wacore::request::IqError::ServerError { code, text } => {
Self::ServerError { code, text }
}
wacore::request::IqError::InternalChannelClosed => Self::InternalChannelClosed,
}
}
}
impl Client {
pub(crate) fn generate_request_id(&self) -> String {
self.get_request_utils().generate_request_id()
}
/// Generates a unique message ID that conforms to the WhatsApp protocol format.
///
/// This is an advanced function that allows library users to generate message IDs
/// that are compatible with the WhatsApp protocol. The generated ID includes
/// timestamp, user JID, and random components to ensure uniqueness.
///
/// # Advanced Use Case
///
/// This function is intended for advanced users who need to build custom protocol
/// interactions or manage message IDs manually. Most users should use higher-level
/// methods like `send_message` which handle ID generation automatically.
///
/// # Returns
///
/// A string containing the generated message ID in the format expected by WhatsApp.
pub async fn generate_message_id(&self) -> String {
let device_snapshot = self.persistence_manager.get_device_snapshot().await;
self.get_request_utils()
.generate_message_id(device_snapshot.pn.as_ref())
}
fn get_request_utils(&self) -> RequestUtils {
RequestUtils::with_counter(self.unique_id.clone(), self.id_counter.clone())
}
/// Sends a custom IQ (Info/Query) stanza to the WhatsApp server.
///
/// This is an advanced function that allows library users to send custom IQ stanzas
/// for protocol interactions that are not covered by higher-level methods. Common
/// use cases include live location updates, custom presence management, or other
/// advanced WhatsApp features.
///
/// # Advanced Use Case
///
/// This function bypasses some of the higher-level abstractions and safety checks
/// provided by other client methods. Users should be familiar with the WhatsApp
/// protocol and IQ stanza format before using this function.
///
/// # Arguments
///
/// * `query` - The IQ query to send, containing the stanza type, namespace, content, and optional timeout
///
/// # Returns
///
/// * `Ok(Arc<OwnedNodeRef>)` - The response node from the server (zero-copy, borrowed from decode buffer)
/// * `Err(IqError)` - Various error conditions including timeout, connection issues, or server errors
///
/// # Example
///
/// ```rust,no_run
/// use wacore::request::{InfoQuery, InfoQueryType};
/// use wacore_binary::builder::NodeBuilder;
/// use wacore_binary::NodeContent;
/// use wacore_binary::{Jid, Server};
///
/// // This is a simplified example - real usage requires proper setup
/// # async fn example(client: &whatsapp_rust::Client) -> Result<(), Box<dyn std::error::Error>> {
/// let query_node = NodeBuilder::new("presence")
/// .attr("type", "available")
/// .build();
///
/// let server_jid = Jid::new("", Server::Pn);
///
/// let query = InfoQuery {
/// query_type: InfoQueryType::Set,
/// namespace: "presence",
/// to: server_jid,
/// target: None,
/// content: Some(NodeContent::Nodes(vec![query_node])),
/// id: None,
/// timeout: None,
/// };
///
/// let response = client.send_iq(query).await?;
/// // Access the node via response.get()
/// # Ok(())
/// # }
/// ```
pub async fn send_iq(
&self,
query: InfoQuery<'_>,
) -> Result<Arc<wacore_binary::OwnedNodeRef>, IqError> {
let default_timeout = Duration::from_secs(75);
let iq_timeout = query.timeout.unwrap_or(default_timeout);
let req_id = query
.id
.clone()
.unwrap_or_else(|| self.generate_request_id());
let request_utils = self.get_request_utils();
let node = request_utils.build_iq_node(query, Some(req_id.clone()));
self.send_and_wait_iq(req_id, iq_timeout, async { self.send_node(node).await })
.await
}
/// Executes an IQ specification and returns the typed response.
///
/// This is a convenience method that combines building the IQ request,
/// sending it, and parsing the response into a single operation.
///
/// # Example
///
/// ```ignore
/// use wacore::iq::groups::GroupQueryIq;
///
/// let group_info = client.execute(GroupQueryIq::new(&group_jid)).await?;
/// println!("Group subject: {}", group_info.subject);
/// ```
pub async fn execute<S>(&self, spec: S) -> Result<S::Response, IqError>
where
S: wacore::iq::spec::IqSpec,
{
let req_id = self.generate_request_id();
// Direct-encode fast path: skip Node tree for hot IQ specs (e.g. PreKeyUploadSpec)
{
let mut buf = Vec::new();
match spec.encode_iq_direct(&req_id, &mut buf) {
Ok(true) => {
let response = self
.send_and_wait_iq(req_id, Duration::from_secs(75), async {
self.send_raw_bytes(buf).await
})
.await?;
return spec
.parse_response(response.get())
.map_err(IqError::ParseError);
}
Err(e) => return Err(IqError::EncodeError(e)),
Ok(false) => {}
}
}
let mut iq = spec.build_iq();
if iq.id.is_none() {
iq.id = Some(req_id);
}
let response = self.send_iq(iq).await?;
spec.parse_response(response.get())
.map_err(IqError::ParseError)
}
/// Centralizes waiter registration and shutdown/timeout handling.
async fn send_and_wait_iq<F>(
&self,
req_id: String,
timeout: Duration,
send_fn: F,
) -> Result<Arc<wacore_binary::OwnedNodeRef>, IqError>
where
F: std::future::Future<Output = Result<(), crate::client::ClientError>>,
{
if !self.is_running.load(Ordering::Relaxed) {
return Err(IqError::NotConnected);
}
let (tx, rx) = futures::channel::oneshot::channel();
self.response_waiters
.lock()
.await
.insert(req_id.clone(), tx);
// Per-connection: pending IQ requests are bound to the current socket;
// a reconnect aborts them (sender retries on the new connection).
let shutdown = wacore::runtime::wait_for_shutdown(&self.connection_shutdown_signal());
if !self.is_running.load(Ordering::Acquire) {
self.response_waiters.lock().await.remove(&req_id);
return Err(IqError::NotConnected);
}
if let Err(e) = send_fn.await {
self.response_waiters.lock().await.remove(&req_id);
return match e {
ClientError::Socket(s_err) => Err(IqError::Socket(s_err)),
ClientError::EncryptSend(es_err) => Err(IqError::EncryptSend(es_err)),
ClientError::NotConnected => Err(IqError::NotConnected),
other @ (ClientError::AlreadyConnected | ClientError::NotLoggedIn) => {
Err(IqError::ClientState(other))
}
};
}
let request_utils = self.get_request_utils();
futures::select! {
result = rt_timeout(&*self.runtime, timeout, rx).fuse() => {
match result {
Ok(Ok(response_node)) => match request_utils.parse_iq_response(response_node.get()) {
Ok(()) => Ok(response_node),
Err(e) => Err(e.into()),
},
Ok(Err(_)) => Err(IqError::InternalChannelClosed),
Err(_) => {
self.response_waiters.lock().await.remove(&req_id);
Err(IqError::Timeout)
}
}
}
_ = shutdown.fuse() => {
self.response_waiters.lock().await.remove(&req_id);
Err(IqError::NotConnected)
}
}
}
}