whatsapp_rust/request.rs
1use crate::client::Client;
2use crate::socket::error::SocketError;
3use futures::FutureExt;
4use log::warn;
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7use std::time::Duration;
8use thiserror::Error;
9use wacore::runtime::timeout as rt_timeout;
10use wacore_binary::node::Node;
11
12pub use wacore::request::{InfoQuery, InfoQueryType, RequestUtils};
13
14#[derive(Debug, Error)]
15pub enum IqError {
16 #[error("IQ request timed out")]
17 Timeout,
18 #[error("Client is not connected")]
19 NotConnected,
20 #[error("Socket error: {0}")]
21 Socket(#[from] SocketError),
22 #[error("Received disconnect node during IQ wait: {0:?}")]
23 Disconnected(Node),
24 #[error("Received a server error response: code={code}, text='{text}'")]
25 ServerError { code: u16, text: String },
26 #[error("Internal channel closed unexpectedly")]
27 InternalChannelClosed,
28 #[error("Failed to parse IQ response: {0}")]
29 ParseError(#[from] anyhow::Error),
30}
31
32impl From<wacore::request::IqError> for IqError {
33 fn from(err: wacore::request::IqError) -> Self {
34 match err {
35 wacore::request::IqError::Timeout => Self::Timeout,
36 wacore::request::IqError::NotConnected => Self::NotConnected,
37 wacore::request::IqError::Disconnected(node) => Self::Disconnected(node),
38 wacore::request::IqError::ServerError { code, text } => {
39 Self::ServerError { code, text }
40 }
41 wacore::request::IqError::InternalChannelClosed => Self::InternalChannelClosed,
42 wacore::request::IqError::Network(msg) => Self::Socket(SocketError::Crypto(msg)),
43 }
44 }
45}
46
47impl Client {
48 pub(crate) fn generate_request_id(&self) -> String {
49 self.get_request_utils().generate_request_id()
50 }
51
52 /// Generates a unique message ID that conforms to the WhatsApp protocol format.
53 ///
54 /// This is an advanced function that allows library users to generate message IDs
55 /// that are compatible with the WhatsApp protocol. The generated ID includes
56 /// timestamp, user JID, and random components to ensure uniqueness.
57 ///
58 /// # Advanced Use Case
59 ///
60 /// This function is intended for advanced users who need to build custom protocol
61 /// interactions or manage message IDs manually. Most users should use higher-level
62 /// methods like `send_message` which handle ID generation automatically.
63 ///
64 /// # Returns
65 ///
66 /// A string containing the generated message ID in the format expected by WhatsApp.
67 pub async fn generate_message_id(&self) -> String {
68 let device_snapshot = self.persistence_manager.get_device_snapshot().await;
69 self.get_request_utils()
70 .generate_message_id(device_snapshot.pn.as_ref())
71 }
72
73 fn get_request_utils(&self) -> RequestUtils {
74 RequestUtils::with_counter(self.unique_id.clone(), self.id_counter.clone())
75 }
76
77 /// Sends a custom IQ (Info/Query) stanza to the WhatsApp server.
78 ///
79 /// This is an advanced function that allows library users to send custom IQ stanzas
80 /// for protocol interactions that are not covered by higher-level methods. Common
81 /// use cases include live location updates, custom presence management, or other
82 /// advanced WhatsApp features.
83 ///
84 /// # Advanced Use Case
85 ///
86 /// This function bypasses some of the higher-level abstractions and safety checks
87 /// provided by other client methods. Users should be familiar with the WhatsApp
88 /// protocol and IQ stanza format before using this function.
89 ///
90 /// # Arguments
91 ///
92 /// * `query` - The IQ query to send, containing the stanza type, namespace, content, and optional timeout
93 ///
94 /// # Returns
95 ///
96 /// * `Ok(Node)` - The response node from the server
97 /// * `Err(IqError)` - Various error conditions including timeout, connection issues, or server errors
98 ///
99 /// # Example
100 ///
101 /// ```rust,no_run
102 /// use wacore::request::{InfoQuery, InfoQueryType};
103 /// use wacore_binary::builder::NodeBuilder;
104 /// use wacore_binary::node::NodeContent;
105 /// use wacore_binary::jid::{Jid, SERVER_JID};
106 ///
107 /// // This is a simplified example - real usage requires proper setup
108 /// # async fn example(client: &whatsapp_rust::Client) -> Result<(), Box<dyn std::error::Error>> {
109 /// let query_node = NodeBuilder::new("presence")
110 /// .attr("type", "available")
111 /// .build();
112 ///
113 /// let server_jid = Jid::new("", SERVER_JID);
114 ///
115 /// let query = InfoQuery {
116 /// query_type: InfoQueryType::Set,
117 /// namespace: "presence",
118 /// to: server_jid,
119 /// target: None,
120 /// content: Some(NodeContent::Nodes(vec![query_node])),
121 /// id: None,
122 /// timeout: None,
123 /// };
124 ///
125 /// let response = client.send_iq(query).await?;
126 /// # Ok(())
127 /// # }
128 /// ```
129 pub async fn send_iq(&self, query: InfoQuery<'_>) -> Result<Node, IqError> {
130 // Fail fast if the client is shutting down
131 if !self.is_running.load(Ordering::Relaxed) {
132 return Err(IqError::NotConnected);
133 }
134
135 let req_id = query
136 .id
137 .clone()
138 .unwrap_or_else(|| self.generate_request_id());
139 let default_timeout = Duration::from_secs(75);
140
141 let (tx, rx) = futures::channel::oneshot::channel();
142 self.response_waiters
143 .lock()
144 .await
145 .insert(req_id.clone(), tx);
146
147 let request_utils = self.get_request_utils();
148 let node = request_utils.build_iq_node(&query, Some(req_id.clone()));
149
150 // Register the shutdown listener BEFORE sending to avoid a window where
151 // a shutdown fires between send_node() completing and listen() being called.
152 let shutdown = self.shutdown_notifier.listen();
153
154 // Re-check after registering the listener to close the race window where
155 // shutdown fires between the initial check and the listen() call above.
156 if !self.is_running.load(Ordering::Acquire) {
157 self.response_waiters.lock().await.remove(&req_id);
158 return Err(IqError::NotConnected);
159 }
160
161 if let Err(e) = self.send_node(node).await {
162 self.response_waiters.lock().await.remove(&req_id);
163 return match e {
164 crate::client::ClientError::Socket(s_err) => Err(IqError::Socket(s_err)),
165 crate::client::ClientError::NotConnected => Err(IqError::NotConnected),
166 _ => Err(IqError::Socket(SocketError::Crypto(e.to_string()))),
167 };
168 }
169
170 // Race the IQ response against shutdown so we fail fast on disconnect
171 // instead of waiting the full timeout.
172 let iq_timeout = query.timeout.unwrap_or(default_timeout);
173
174 futures::select! {
175 result = rt_timeout(&*self.runtime, iq_timeout, rx).fuse() => {
176 match result {
177 Ok(Ok(response_node)) => match *request_utils.parse_iq_response(&response_node) {
178 Ok(()) => Ok(response_node),
179 Err(e) => Err(e.into()),
180 },
181 Ok(Err(_)) => Err(IqError::InternalChannelClosed),
182 Err(_) => {
183 self.response_waiters.lock().await.remove(&req_id);
184 Err(IqError::Timeout)
185 }
186 }
187 }
188 _ = shutdown.fuse() => {
189 self.response_waiters.lock().await.remove(&req_id);
190 Err(IqError::NotConnected)
191 }
192 }
193 }
194
195 /// Executes an IQ specification and returns the typed response.
196 ///
197 /// This is a convenience method that combines building the IQ request,
198 /// sending it, and parsing the response into a single operation.
199 ///
200 /// # Example
201 ///
202 /// ```ignore
203 /// use wacore::iq::groups::GroupQueryIq;
204 ///
205 /// let group_info = client.execute(GroupQueryIq::new(&group_jid)).await?;
206 /// println!("Group subject: {}", group_info.subject);
207 /// ```
208 pub async fn execute<S>(&self, spec: S) -> Result<S::Response, IqError>
209 where
210 S: wacore::iq::spec::IqSpec,
211 {
212 let iq = spec.build_iq();
213 let response = self.send_iq(iq).await?;
214 spec.parse_response(&response).map_err(IqError::ParseError)
215 }
216
217 /// Handles an IQ response by checking if there's a waiter for this response ID.
218 ///
219 /// This method accepts an `Arc<Node>` - if there's a waiter, we clone the Arc (cheap)
220 /// and unwrap it if we're the only holder, otherwise clone the inner Node.
221 pub(crate) async fn handle_iq_response(&self, node: Arc<Node>) -> bool {
222 let id_opt = node.attrs.get("id").map(|v| v.as_str().into_owned());
223 if let Some(id) = id_opt {
224 // First check if there's a waiter (without cloning)
225 let waiter = self.response_waiters.lock().await.remove(&id);
226 if let Some(waiter) = waiter {
227 // Try to unwrap the Arc, or clone if there are other references
228 let owned_node = Arc::try_unwrap(node).unwrap_or_else(|arc| (*arc).clone());
229 if waiter.send(owned_node).is_err() {
230 warn!(target: "Client/IQ", "Failed to send IQ response to waiter for ID {id}. Receiver was likely dropped.");
231 }
232 return true;
233 }
234 }
235 false
236 }
237}