Skip to main content

whatsapp_rust/
request.rs

1use crate::client::Client;
2use crate::socket::error::SocketError;
3use futures::FutureExt;
4use log::warn;
5use std::sync::Arc;
6use std::sync::atomic::Ordering;
7use std::time::Duration;
8use thiserror::Error;
9use wacore::runtime::timeout as rt_timeout;
10use wacore_binary::node::Node;
11
12pub use wacore::request::{InfoQuery, InfoQueryType, RequestUtils};
13
14#[derive(Debug, Error)]
15pub enum IqError {
16    #[error("IQ request timed out")]
17    Timeout,
18    #[error("Client is not connected")]
19    NotConnected,
20    #[error("Socket error: {0}")]
21    Socket(#[from] SocketError),
22    #[error("Received disconnect node during IQ wait: {0:?}")]
23    Disconnected(Node),
24    #[error("Received a server error response: code={code}, text='{text}'")]
25    ServerError { code: u16, text: String },
26    #[error("Internal channel closed unexpectedly")]
27    InternalChannelClosed,
28    #[error("Failed to parse IQ response: {0}")]
29    ParseError(#[from] anyhow::Error),
30}
31
32impl From<wacore::request::IqError> for IqError {
33    fn from(err: wacore::request::IqError) -> Self {
34        match err {
35            wacore::request::IqError::Timeout => Self::Timeout,
36            wacore::request::IqError::NotConnected => Self::NotConnected,
37            wacore::request::IqError::Disconnected(node) => Self::Disconnected(node),
38            wacore::request::IqError::ServerError { code, text } => {
39                Self::ServerError { code, text }
40            }
41            wacore::request::IqError::InternalChannelClosed => Self::InternalChannelClosed,
42            wacore::request::IqError::Network(msg) => Self::Socket(SocketError::Crypto(msg)),
43        }
44    }
45}
46
47impl Client {
48    pub(crate) fn generate_request_id(&self) -> String {
49        self.get_request_utils().generate_request_id()
50    }
51
52    /// Generates a unique message ID that conforms to the WhatsApp protocol format.
53    ///
54    /// This is an advanced function that allows library users to generate message IDs
55    /// that are compatible with the WhatsApp protocol. The generated ID includes
56    /// timestamp, user JID, and random components to ensure uniqueness.
57    ///
58    /// # Advanced Use Case
59    ///
60    /// This function is intended for advanced users who need to build custom protocol
61    /// interactions or manage message IDs manually. Most users should use higher-level
62    /// methods like `send_message` which handle ID generation automatically.
63    ///
64    /// # Returns
65    ///
66    /// A string containing the generated message ID in the format expected by WhatsApp.
67    pub async fn generate_message_id(&self) -> String {
68        let device_snapshot = self.persistence_manager.get_device_snapshot().await;
69        self.get_request_utils()
70            .generate_message_id(device_snapshot.pn.as_ref())
71    }
72
73    fn get_request_utils(&self) -> RequestUtils {
74        RequestUtils::with_counter(self.unique_id.clone(), self.id_counter.clone())
75    }
76
77    /// Sends a custom IQ (Info/Query) stanza to the WhatsApp server.
78    ///
79    /// This is an advanced function that allows library users to send custom IQ stanzas
80    /// for protocol interactions that are not covered by higher-level methods. Common
81    /// use cases include live location updates, custom presence management, or other
82    /// advanced WhatsApp features.
83    ///
84    /// # Advanced Use Case
85    ///
86    /// This function bypasses some of the higher-level abstractions and safety checks
87    /// provided by other client methods. Users should be familiar with the WhatsApp
88    /// protocol and IQ stanza format before using this function.
89    ///
90    /// # Arguments
91    ///
92    /// * `query` - The IQ query to send, containing the stanza type, namespace, content, and optional timeout
93    ///
94    /// # Returns
95    ///
96    /// * `Ok(Node)` - The response node from the server
97    /// * `Err(IqError)` - Various error conditions including timeout, connection issues, or server errors
98    ///
99    /// # Example
100    ///
101    /// ```rust,no_run
102    /// use wacore::request::{InfoQuery, InfoQueryType};
103    /// use wacore_binary::builder::NodeBuilder;
104    /// use wacore_binary::node::NodeContent;
105    /// use wacore_binary::jid::{Jid, SERVER_JID};
106    ///
107    /// // This is a simplified example - real usage requires proper setup
108    /// # async fn example(client: &whatsapp_rust::Client) -> Result<(), Box<dyn std::error::Error>> {
109    /// let query_node = NodeBuilder::new("presence")
110    ///     .attr("type", "available")
111    ///     .build();
112    ///
113    /// let server_jid = Jid::new("", SERVER_JID);
114    ///
115    /// let query = InfoQuery {
116    ///     query_type: InfoQueryType::Set,
117    ///     namespace: "presence",
118    ///     to: server_jid,
119    ///     target: None,
120    ///     content: Some(NodeContent::Nodes(vec![query_node])),
121    ///     id: None,
122    ///     timeout: None,
123    /// };
124    ///
125    /// let response = client.send_iq(query).await?;
126    /// # Ok(())
127    /// # }
128    /// ```
129    pub async fn send_iq(&self, query: InfoQuery<'_>) -> Result<Node, IqError> {
130        // Fail fast if the client is shutting down
131        if !self.is_running.load(Ordering::Relaxed) {
132            return Err(IqError::NotConnected);
133        }
134
135        let req_id = query
136            .id
137            .clone()
138            .unwrap_or_else(|| self.generate_request_id());
139        let default_timeout = Duration::from_secs(75);
140
141        let (tx, rx) = futures::channel::oneshot::channel();
142        self.response_waiters
143            .lock()
144            .await
145            .insert(req_id.clone(), tx);
146
147        let request_utils = self.get_request_utils();
148        let node = request_utils.build_iq_node(&query, Some(req_id.clone()));
149
150        // Register the shutdown listener BEFORE sending to avoid a window where
151        // a shutdown fires between send_node() completing and listen() being called.
152        let shutdown = self.shutdown_notifier.listen();
153
154        // Re-check after registering the listener to close the race window where
155        // shutdown fires between the initial check and the listen() call above.
156        if !self.is_running.load(Ordering::Acquire) {
157            self.response_waiters.lock().await.remove(&req_id);
158            return Err(IqError::NotConnected);
159        }
160
161        if let Err(e) = self.send_node(node).await {
162            self.response_waiters.lock().await.remove(&req_id);
163            return match e {
164                crate::client::ClientError::Socket(s_err) => Err(IqError::Socket(s_err)),
165                crate::client::ClientError::NotConnected => Err(IqError::NotConnected),
166                _ => Err(IqError::Socket(SocketError::Crypto(e.to_string()))),
167            };
168        }
169
170        // Race the IQ response against shutdown so we fail fast on disconnect
171        // instead of waiting the full timeout.
172        let iq_timeout = query.timeout.unwrap_or(default_timeout);
173
174        futures::select! {
175            result = rt_timeout(&*self.runtime, iq_timeout, rx).fuse() => {
176                match result {
177                    Ok(Ok(response_node)) => match *request_utils.parse_iq_response(&response_node) {
178                        Ok(()) => Ok(response_node),
179                        Err(e) => Err(e.into()),
180                    },
181                    Ok(Err(_)) => Err(IqError::InternalChannelClosed),
182                    Err(_) => {
183                        self.response_waiters.lock().await.remove(&req_id);
184                        Err(IqError::Timeout)
185                    }
186                }
187            }
188            _ = shutdown.fuse() => {
189                self.response_waiters.lock().await.remove(&req_id);
190                Err(IqError::NotConnected)
191            }
192        }
193    }
194
195    /// Executes an IQ specification and returns the typed response.
196    ///
197    /// This is a convenience method that combines building the IQ request,
198    /// sending it, and parsing the response into a single operation.
199    ///
200    /// # Example
201    ///
202    /// ```ignore
203    /// use wacore::iq::groups::GroupQueryIq;
204    ///
205    /// let group_info = client.execute(GroupQueryIq::new(&group_jid)).await?;
206    /// println!("Group subject: {}", group_info.subject);
207    /// ```
208    pub async fn execute<S>(&self, spec: S) -> Result<S::Response, IqError>
209    where
210        S: wacore::iq::spec::IqSpec,
211    {
212        let iq = spec.build_iq();
213        let response = self.send_iq(iq).await?;
214        spec.parse_response(&response).map_err(IqError::ParseError)
215    }
216
217    /// Handles an IQ response by checking if there's a waiter for this response ID.
218    ///
219    /// This method accepts an `Arc<Node>` - if there's a waiter, we clone the Arc (cheap)
220    /// and unwrap it if we're the only holder, otherwise clone the inner Node.
221    pub(crate) async fn handle_iq_response(&self, node: Arc<Node>) -> bool {
222        let id_opt = node.attrs.get("id").map(|v| v.as_str().into_owned());
223        if let Some(id) = id_opt {
224            // First check if there's a waiter (without cloning)
225            let waiter = self.response_waiters.lock().await.remove(&id);
226            if let Some(waiter) = waiter {
227                // Try to unwrap the Arc, or clone if there are other references
228                let owned_node = Arc::try_unwrap(node).unwrap_or_else(|arc| (*arc).clone());
229                if waiter.send(owned_node).is_err() {
230                    warn!(target: "Client/IQ", "Failed to send IQ response to waiter for ID {id}. Receiver was likely dropped.");
231                }
232                return true;
233            }
234        }
235        false
236    }
237}