iota_client/
node_manager.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! The node manager that takes care of sending requests and quroum if enabled
5
6use bee_rest_api::types::{body::SuccessBody, responses::InfoResponse};
7
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use serde_json::Value;
10
11use crate::{
12    builder::NetworkInfo,
13    error::{Error, Result},
14};
15use log::warn;
16use regex::Regex;
17use std::sync::RwLock;
18#[cfg(all(feature = "sync", not(feature = "async")))]
19use ureq::{Agent, AgentBuilder};
20use url::Url;
21
22use std::{
23    collections::{HashMap, HashSet},
24    sync::Arc,
25    time::Duration,
26};
27
28const NODE_SYNC_INTERVAL: Duration = Duration::from_secs(60);
29const DEFAULT_QUORUM_SIZE: usize = 3;
30const DEFAULT_QUORUM_THRESHOLD: usize = 66;
31
32/// Node struct
33#[derive(Debug, Clone, Eq, PartialEq, Hash)]
34pub struct Node {
35    /// node url
36    pub url: Url,
37    /// node jwt
38    pub jwt: Option<String>,
39}
40
41// Nodemanger, takes care of selecting node(s) for requests until a result is returned or if quorum
42// is enabled it will send the requests for some endpoints to multiple nodes and compares the results
43#[derive(Clone)]
44pub(crate) struct NodeManager {
45    pub(crate) primary_node: Option<Node>,
46    primary_pow_node: Option<Node>,
47    pub(crate) nodes: HashSet<Node>,
48    permanodes: Option<HashSet<Node>>,
49    pub(crate) sync: bool,
50    sync_interval: Duration,
51    pub(crate) synced_nodes: Arc<RwLock<HashSet<Node>>>,
52    quorum: bool,
53    quorum_size: usize,
54    quorum_threshold: usize,
55    pub(crate) http_client: HttpClient,
56}
57
58impl std::fmt::Debug for NodeManager {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        let mut d = f.debug_struct("NodeManager");
61        d.field("primary_node", &self.primary_node);
62        d.field("primary_pow_node", &self.primary_pow_node);
63        d.field("nodes", &self.nodes);
64        d.field("permanodes", &self.permanodes);
65        d.field("sync", &self.sync);
66        d.field("sync_interval", &self.sync_interval);
67        d.field("synced_nodes", &self.synced_nodes);
68        d.field("quorum", &self.quorum);
69        d.field("quorum_size", &self.quorum_size);
70        d.field("quorum_threshold", &self.quorum_threshold).finish()
71    }
72}
73
74impl NodeManager {
75    pub(crate) fn builder() -> NodeManagerBuilder {
76        NodeManagerBuilder::new()
77    }
78    pub(crate) async fn get_nodes(
79        &self,
80        path: &str,
81        query: Option<&str>,
82        use_primary_pow_node: bool,
83    ) -> Result<Vec<Node>> {
84        let mut nodes_with_modified_url = Vec::new();
85
86        // Endpoints for which only permanodes will be used if provided
87        let permanode_regexes = lazy_static!(
88            [
89              Regex::new(r"messages/([A-Fa-f0-9]{64})").expect("regex failed"),
90              Regex::new(r"messages/([A-Fa-f0-9]{64})/metadata").expect("regex failed"),
91              Regex::new(r"messages/([A-Fa-f0-9]{64})/children").expect("regex failed"),
92              Regex::new(r"outputs/([A-Fa-f0-9]{64})(\d{4})").expect("regex failed"),
93              // BIP-173 compliant bech32 address
94              Regex::new("addresses/[\x21-\x7E]{1,30}1[A-Za-z0-9]").expect("regex failed"),
95              Regex::new("addresses/[\x21-\x7E]{1,30}1[A-Za-z0-9]+/outputs").expect("regex failed"),
96              // ED25519 address hex
97              Regex::new("addresses/ed25519/([A-Fa-f0-9]{64})").expect("regex failed"),
98              Regex::new("addresses/ed25519/([A-Fa-f0-9]{64})/outputs").expect("regex failed"),
99              Regex::new(r"transactions/([A-Fa-f0-9]{64})/included-message").expect("regex failed"),
100              Regex::new(r"milestones/[0-9]").expect("regex failed"),
101            ].to_vec() => Vec<Regex>
102        );
103        if permanode_regexes.iter().any(|re| re.is_match(path)) || (path == "api/v1/messages" && query.is_some()) {
104            if let Some(permanodes) = self.permanodes.clone() {
105                // remove api/v1/ since permanodes can have custom keyspaces
106                // https://editor.swagger.io/?url=https://raw.githubusercontent.com/iotaledger/chronicle.rs/main/docs/api.yaml
107                let path = &path["api/v1/".len()..];
108                for mut permanode in permanodes {
109                    permanode.url.set_path(&format!("{}{}", permanode.url.path(), path));
110                    permanode.url.set_query(query);
111                    nodes_with_modified_url.push(permanode);
112                }
113            }
114        }
115
116        if use_primary_pow_node {
117            if let Some(mut pow_node) = self.primary_pow_node.clone() {
118                pow_node.url.set_path(path);
119                pow_node.url.set_query(query);
120                nodes_with_modified_url.push(pow_node);
121            }
122        }
123        if let Some(mut primary_node) = self.primary_node.clone() {
124            primary_node.url.set_path(path);
125            primary_node.url.set_query(query);
126            nodes_with_modified_url.push(primary_node);
127        }
128        let nodes = if self.sync {
129            #[cfg(not(feature = "wasm"))]
130            {
131                self.synced_nodes
132                    .read()
133                    .map_err(|_| crate::Error::NodeReadError)?
134                    .clone()
135            }
136            #[cfg(feature = "wasm")]
137            {
138                self.nodes.clone()
139            }
140        } else {
141            self.nodes.clone()
142        };
143        for mut node in nodes {
144            node.url.set_path(path);
145            node.url.set_query(query);
146            nodes_with_modified_url.push(node);
147        }
148        Ok(nodes_with_modified_url)
149    }
150
151    pub(crate) async fn get_request<T: serde::de::DeserializeOwned + std::fmt::Debug + serde::Serialize>(
152        &self,
153        path: &str,
154        query: Option<&str>,
155        timeout: Duration,
156    ) -> Result<T> {
157        // Endpoints for which quorum will be used if enabled
158        let quorum_regexes = lazy_static!(
159            [
160              Regex::new(r"messages/([A-Fa-f0-9]{64})/metadata").expect("regex failed"),
161              Regex::new(r"outputs/([A-Fa-f0-9]{64})(\d{4})").expect("regex failed"),
162              // BIP-173 compliant bech32 address
163              Regex::new("addresses/[\x21-\x7E]{1,30}1[A-Za-z0-9]").expect("regex failed"),
164              Regex::new("addresses/[\x21-\x7E]{1,30}1[A-Za-z0-9]+/outputs").expect("regex failed"),
165              // ED25519 address hex
166              Regex::new("addresses/ed25519/([A-Fa-f0-9]{64})").expect("regex failed"),
167              Regex::new("addresses/ed25519/([A-Fa-f0-9]{64})/outputs").expect("regex failed"),
168              Regex::new(r"transactions/([A-Fa-f0-9]{64})/included-message").expect("regex failed"),
169            ].to_vec() => Vec<Regex>
170        );
171        let mut result: HashMap<String, usize> = HashMap::new();
172        // submit message with local PoW should use primary pow node
173        // Get node urls and set path
174        let nodes = self.get_nodes(path, query, false).await?;
175        if self.quorum && quorum_regexes.iter().any(|re| re.is_match(path)) && nodes.len() < self.quorum_size {
176            return Err(Error::QuorumPoolSizeError(nodes.len(), self.quorum_size));
177        }
178
179        // Track amount of results for quorum
180        let mut result_counter = 0;
181        let mut error = None;
182        // Send requests parallel for quorum
183        #[cfg(feature = "wasm")]
184        let wasm = true;
185        #[cfg(not(feature = "wasm"))]
186        let wasm = false;
187        if !wasm && self.quorum && quorum_regexes.iter().any(|re| re.is_match(path)) && query.is_none() {
188            #[cfg(not(feature = "wasm"))]
189            {
190                let mut tasks = Vec::new();
191                let nodes_ = nodes.clone();
192                for (index, node) in nodes_.into_iter().enumerate() {
193                    if index < self.quorum_size {
194                        let client_ = self.http_client.clone();
195                        tasks.push(async move { tokio::spawn(async move { client_.get(node, timeout).await }).await });
196                    }
197                }
198                for res in futures::future::try_join_all(tasks).await? {
199                    match res {
200                        Ok(res) => {
201                            if let Ok(res_text) = res.text().await {
202                                let counters = result.entry(res_text.to_string()).or_insert(0);
203                                *counters += 1;
204                                result_counter += 1;
205                            } else {
206                                warn!("Couldn't convert noderesult to text");
207                            }
208                        }
209                        Err(err) => {
210                            error.replace(err);
211                        }
212                    }
213                }
214            }
215        } else {
216            // Send requests
217            for node in nodes {
218                match self.http_client.get(node.clone(), timeout).await {
219                    Ok(res) => {
220                        let status = res.status();
221                        if let Ok(res_text) = res.text().await {
222                            match status {
223                                200 => {
224                                    // Handle nodeinfo extra because we also want to return the url
225                                    if path == "api/v1/info" {
226                                        if let Ok(nodeinfo) =
227                                            serde_json::from_str::<SuccessBody<InfoResponse>>(&res_text)
228                                        {
229                                            let wrapper = crate::client::NodeInfoWrapper {
230                                                nodeinfo: nodeinfo.data,
231                                                url: format!(
232                                                    "{}://{}",
233                                                    node.url.scheme(),
234                                                    node.url.host_str().unwrap_or("")
235                                                ),
236                                            };
237                                            let serde_res = serde_json::to_string(&wrapper)?;
238                                            return Ok(serde_json::from_str(&serde_res)?);
239                                        }
240                                    }
241
242                                    match serde_json::from_str::<T>(&res_text) {
243                                        Ok(result_data) => {
244                                            let counters =
245                                                result.entry(serde_json::to_string(&result_data)?).or_insert(0);
246                                            *counters += 1;
247                                            result_counter += 1;
248                                            // Without quorum it's enough if we got one response
249                                            if !self.quorum
250                                            || result_counter >= self.quorum_size
251                                            || !quorum_regexes.iter().any(|re| re.is_match(path))
252                                            // with query we ignore quorum because the nodes can store a different amount of history
253                                            || query.is_some()
254                                            {
255                                                break;
256                                            }
257                                        }
258                                        Err(e) => {
259                                            error.replace(e.into());
260                                        }
261                                    }
262                                }
263                                _ => {
264                                    error.replace(crate::Error::NodeError(res_text));
265                                }
266                            }
267                        } else {
268                            warn!("Couldn't convert noderesult to text");
269                        }
270                    }
271                    Err(err) => {
272                        error.replace(err);
273                    }
274                }
275            }
276        }
277
278        let res = result
279            .into_iter()
280            .max_by_key(|v| v.1)
281            .ok_or_else(|| error.unwrap_or_else(|| Error::NodeError("Couldn't get a result from any node".into())))?;
282
283        // Return if quorum is false or check if quorum was reached
284        if !self.quorum
285            || res.1 as f64 >= self.quorum_size as f64 * (self.quorum_threshold as f64 / 100.0)
286            || !quorum_regexes.iter().any(|re| re.is_match(path))
287            // with query we ignore quorum because the nodes can store a different amount of history
288            || query.is_some()
289        {
290            Ok(serde_json::from_str(&res.0)?)
291        } else {
292            Err(Error::QuorumThresholdError(res.1, self.quorum_size))
293        }
294    }
295    // Only used for api/v1/messages/{messageID}/raw, that's why we don't need the quorum stuff
296    pub(crate) async fn get_request_text(&self, path: &str, query: Option<&str>, timeout: Duration) -> Result<String> {
297        // Get node urls and set path
298        let nodes = self.get_nodes(path, query, false).await?;
299        let mut error = None;
300        // Send requests
301        for node in nodes {
302            match self.http_client.get(node, timeout).await {
303                Ok(res) => {
304                    let status = res.status();
305                    if let Ok(res_text) = res.text().await {
306                        // Without quorum it's enough if we got one response
307                        match status {
308                            200 => return Ok(res_text),
309                            _ => error.replace(crate::Error::NodeError(res_text)),
310                        };
311                    }
312                }
313                Err(e) => {
314                    error.replace(crate::Error::NodeError(e.to_string()));
315                }
316            }
317        }
318        Err(error.unwrap_or_else(|| Error::NodeError("Couldn't get a result from any node".into())))
319    }
320    pub(crate) async fn post_request_bytes<T: serde::de::DeserializeOwned>(
321        &self,
322        path: &str,
323        timeout: Duration,
324        body: &[u8],
325        local_pow: bool,
326    ) -> Result<T> {
327        // primary_pow_node should only be used when remote PoW is used
328        let nodes = self.get_nodes(path, None, !local_pow).await?;
329        if nodes.is_empty() {
330            return Err(Error::NodeError("No available nodes with remote PoW".into()));
331        }
332        let mut error = None;
333        // Send requests
334        for node in nodes {
335            match self.http_client.post_bytes(node, timeout, body).await {
336                Ok(res) => {
337                    let status = res.status();
338                    if let Ok(res_text) = res.text().await {
339                        match status {
340                            200 | 201 => match serde_json::from_str(&res_text) {
341                                Ok(res) => return Ok(res),
342                                Err(e) => error.replace(e.into()),
343                            },
344                            _ => error.replace(crate::Error::NodeError(res_text)),
345                        };
346                    }
347                }
348                Err(e) => {
349                    error.replace(crate::Error::NodeError(e.to_string()));
350                }
351            }
352        }
353        Err(error.unwrap_or_else(|| Error::NodeError("Couldn't get a result from any node".into())))
354    }
355
356    pub(crate) async fn post_request_json<T: serde::de::DeserializeOwned>(
357        &self,
358        path: &str,
359        timeout: Duration,
360        json: Value,
361        local_pow: bool,
362    ) -> Result<T> {
363        // primary_pow_node should only be used when remote PoW is used
364        let nodes = self.get_nodes(path, None, !local_pow).await?;
365        if nodes.is_empty() {
366            return Err(Error::NodeError("No available nodes with remote PoW".into()));
367        }
368        let mut error = None;
369        // Send requests
370        for node in nodes {
371            match self.http_client.post_json(node, timeout, json.clone()).await {
372                Ok(res) => {
373                    let status = res.status();
374                    if let Ok(res_text) = res.text().await {
375                        match status {
376                            200 | 201 => match serde_json::from_str(&res_text) {
377                                Ok(res) => return Ok(res),
378                                Err(e) => error.replace(e.into()),
379                            },
380                            _ => error.replace(crate::Error::NodeError(res_text)),
381                        };
382                    }
383                }
384                Err(e) => {
385                    error.replace(crate::Error::NodeError(e.to_string()));
386                }
387            }
388        }
389        Err(error.unwrap_or_else(|| Error::NodeError("Couldn't get a result from any node".into())))
390    }
391}
392
393#[derive(Clone)]
394pub(crate) struct NodeManagerBuilder {
395    pub(crate) primary_node: Option<Node>,
396    primary_pow_node: Option<Node>,
397    pub(crate) nodes: HashSet<Node>,
398    pub(crate) permanodes: Option<HashSet<Node>>,
399    sync: bool,
400    sync_interval: Duration,
401    quorum: bool,
402    quorum_size: usize,
403    quorum_threshold: usize,
404}
405
406impl NodeManagerBuilder {
407    pub(crate) fn new() -> Self {
408        Default::default()
409    }
410    pub(crate) fn with_node(mut self, url: &str) -> Result<Self> {
411        let url = validate_url(Url::parse(url)?)?;
412        self.nodes.insert(Node { url, jwt: None });
413        Ok(self)
414    }
415    pub(crate) fn with_primary_node(
416        mut self,
417        url: &str,
418        jwt: Option<String>,
419        basic_auth_name_pwd: Option<(&str, &str)>,
420    ) -> Result<Self> {
421        let mut url = validate_url(Url::parse(url)?)?;
422        if let Some((name, password)) = basic_auth_name_pwd {
423            url.set_username(name)
424                .map_err(|_| crate::Error::UrlAuthError("username".to_string()))?;
425            url.set_password(Some(password))
426                .map_err(|_| crate::Error::UrlAuthError("password".to_string()))?;
427        }
428        self.primary_node.replace(Node { url, jwt });
429        Ok(self)
430    }
431    pub(crate) fn with_primary_pow_node(
432        mut self,
433        url: &str,
434        jwt: Option<String>,
435        basic_auth_name_pwd: Option<(&str, &str)>,
436    ) -> Result<Self> {
437        let mut url = validate_url(Url::parse(url)?)?;
438        if let Some((name, password)) = basic_auth_name_pwd {
439            url.set_username(name)
440                .map_err(|_| crate::Error::UrlAuthError("username".to_string()))?;
441            url.set_password(Some(password))
442                .map_err(|_| crate::Error::UrlAuthError("password".to_string()))?;
443        }
444        self.primary_pow_node.replace(Node { url, jwt });
445        Ok(self)
446    }
447    pub(crate) fn with_permanode(
448        mut self,
449        url: &str,
450        jwt: Option<String>,
451        basic_auth_name_pwd: Option<(&str, &str)>,
452    ) -> Result<Self> {
453        let mut url = validate_url(Url::parse(url)?)?;
454        if let Some((name, password)) = basic_auth_name_pwd {
455            url.set_username(name)
456                .map_err(|_| crate::Error::UrlAuthError("username".to_string()))?;
457            url.set_password(Some(password))
458                .map_err(|_| crate::Error::UrlAuthError("password".to_string()))?;
459        }
460        match self.permanodes {
461            Some(ref mut permanodes) => {
462                permanodes.insert(Node { url, jwt });
463            }
464            None => {
465                let mut permanodes = HashSet::new();
466                permanodes.insert(Node { url, jwt });
467                self.permanodes.replace(permanodes);
468            }
469        }
470        Ok(self)
471    }
472    pub(crate) fn with_node_sync_disabled(mut self) -> Self {
473        self.sync = false;
474        self
475    }
476    pub(crate) fn with_node_auth(
477        mut self,
478        url: &str,
479        jwt: Option<String>,
480        basic_auth_name_pwd: Option<(&str, &str)>,
481    ) -> Result<Self> {
482        let mut url = validate_url(Url::parse(url)?)?;
483        if let Some((name, password)) = basic_auth_name_pwd {
484            url.set_username(name)
485                .map_err(|_| crate::Error::UrlAuthError("username".to_string()))?;
486            url.set_password(Some(password))
487                .map_err(|_| crate::Error::UrlAuthError("password".to_string()))?;
488        }
489        self.nodes.insert(Node { url, jwt });
490        Ok(self)
491    }
492    pub(crate) fn with_nodes(mut self, urls: &[&str]) -> Result<Self> {
493        for url in urls {
494            let url = validate_url(Url::parse(url)?)?;
495            self.nodes.insert(Node { url, jwt: None });
496        }
497        Ok(self)
498    }
499    /// Get node list from the node_pool_urls
500    pub(crate) async fn with_node_pool_urls(mut self, node_pool_urls: &[String]) -> Result<Self> {
501        for pool_url in node_pool_urls {
502            let http_client = crate::node_manager::HttpClient::new();
503            let nodes_details: Vec<NodeDetail> = http_client
504                .get(
505                    Node {
506                        url: validate_url(Url::parse(pool_url)?)?,
507                        jwt: None,
508                    },
509                    crate::builder::GET_API_TIMEOUT,
510                )
511                .await?
512                .json()
513                .await?;
514            for node_detail in nodes_details {
515                let url = validate_url(Url::parse(&node_detail.node)?)?;
516                self.nodes.insert(Node { url, jwt: None });
517            }
518        }
519        Ok(self)
520    }
521    pub(crate) fn with_node_sync_interval(mut self, node_sync_interval: Duration) -> Self {
522        self.sync_interval = node_sync_interval;
523        self
524    }
525    pub(crate) fn with_quorum(mut self, quorum: bool) -> Self {
526        self.quorum = quorum;
527        self
528    }
529    pub(crate) fn with_quorum_size(mut self, quorum_size: usize) -> Self {
530        self.quorum_size = quorum_size;
531        self
532    }
533    pub(crate) fn with_quorum_threshold(mut self, threshold: usize) -> Self {
534        self.quorum_threshold = threshold;
535        self
536    }
537    pub(crate) async fn add_default_nodes(mut self, network_info: &NetworkInfo) -> Result<Self> {
538        // todo update with new node pool
539        // let default_testnet_node_pools = vec!["https://giftiota.com/nodes.json".to_string()];
540        let default_testnet_nodes = vec![
541            "https://api.lb-0.h.chrysalis-devnet.iota.cafe/",
542            "https://api.lb-1.h.chrysalis-devnet.iota.cafe/",
543        ];
544        if self.nodes.is_empty() && self.primary_node.is_none() {
545            match network_info.network {
546                Some(ref network) => match network.to_lowercase().as_str() {
547                    "testnet" | "devnet" | "test" | "dev" => {
548                        self = self.with_nodes(&default_testnet_nodes[..])?;
549                        // self = self.with_node_pool_urls(&default_testnet_node_pools[..]).await?;
550                    }
551                    _ => return Err(Error::SyncedNodePoolEmpty),
552                },
553                _ => {
554                    self = self.with_nodes(&default_testnet_nodes[..])?;
555                    // self = self.with_node_pool_urls(&default_testnet_node_pools[..]).await?;
556                }
557            }
558        }
559        Ok(self)
560    }
561    pub(crate) fn build(self, synced_nodes: Arc<RwLock<HashSet<Node>>>) -> NodeManager {
562        NodeManager {
563            primary_node: self.primary_node,
564            primary_pow_node: self.primary_pow_node,
565            nodes: self.nodes,
566            permanodes: self.permanodes,
567            sync: self.sync,
568            sync_interval: self.sync_interval,
569            synced_nodes,
570            quorum: self.quorum,
571            quorum_size: self.quorum_size,
572            quorum_threshold: self.quorum_threshold,
573            http_client: HttpClient::new(),
574        }
575    }
576}
577
578impl Default for NodeManagerBuilder {
579    fn default() -> Self {
580        Self {
581            primary_node: None,
582            primary_pow_node: None,
583            nodes: HashSet::new(),
584            permanodes: None,
585            sync: true,
586            sync_interval: NODE_SYNC_INTERVAL,
587            quorum: false,
588            quorum_size: DEFAULT_QUORUM_SIZE,
589            quorum_threshold: DEFAULT_QUORUM_THRESHOLD,
590        }
591    }
592}
593
594#[cfg(all(feature = "sync", not(feature = "async")))]
595pub(crate) struct Response(ureq::Response);
596
597#[cfg(all(feature = "sync", not(feature = "async")))]
598impl From<ureq::Response> for Response {
599    fn from(response: ureq::Response) -> Self {
600        Self(response)
601    }
602}
603
604#[cfg(all(feature = "sync", not(feature = "async")))]
605impl Response {
606    pub(crate) fn status(&self) -> u16 {
607        self.0.status()
608    }
609
610    pub(crate) async fn json<T: DeserializeOwned>(self) -> Result<T> {
611        self.0.into_json().map_err(Into::into)
612    }
613
614    pub(crate) async fn text(self) -> Result<String> {
615        self.0.into_string().map_err(Into::into)
616    }
617}
618
619#[cfg(any(feature = "async", feature = "wasm"))]
620pub(crate) struct Response(reqwest::Response);
621
622#[cfg(any(feature = "async", feature = "wasm"))]
623impl Response {
624    pub(crate) fn status(&self) -> u16 {
625        self.0.status().as_u16()
626    }
627
628    pub(crate) async fn json<T: DeserializeOwned>(self) -> Result<T> {
629        self.0.json().await.map_err(Into::into)
630    }
631
632    pub(crate) async fn text(self) -> Result<String> {
633        self.0.text().await.map_err(Into::into)
634    }
635}
636
637#[cfg(any(feature = "async", feature = "wasm"))]
638#[derive(Clone)]
639pub(crate) struct HttpClient {
640    client: reqwest::Client,
641}
642
643#[cfg(all(feature = "sync", not(feature = "async")))]
644#[derive(Clone)]
645pub(crate) struct HttpClient;
646
647#[cfg(any(feature = "async", feature = "wasm"))]
648impl HttpClient {
649    pub(crate) fn new() -> Self {
650        Self {
651            client: reqwest::Client::new(),
652        }
653    }
654
655    async fn parse_response(response: reqwest::Response) -> Result<Response> {
656        let status = response.status();
657        if status.is_success() {
658            Ok(Response(response))
659        } else {
660            Err(Error::ResponseError(status.as_u16(), response.text().await?))
661        }
662    }
663
664    pub(crate) async fn get(&self, node: Node, _timeout: Duration) -> Result<Response> {
665        #[cfg(feature = "wasm")]
666        let start_time = instant::Instant::now();
667        #[cfg(not(feature = "wasm"))]
668        let start_time = std::time::Instant::now();
669        let mut request_builder = self.client.get(node.url.clone());
670        if let Some(jwt) = node.jwt {
671            request_builder = request_builder.bearer_auth(jwt);
672        }
673        #[cfg(not(feature = "wasm"))]
674        {
675            request_builder = request_builder.timeout(_timeout);
676        }
677        let resp = request_builder.send().await?;
678        let response = Self::parse_response(resp).await;
679        log::debug!(
680            "GET request took {:?} ms for {}",
681            start_time.elapsed().as_millis(),
682            node.url
683        );
684        response
685    }
686
687    pub(crate) async fn post_bytes(&self, node: Node, _timeout: Duration, body: &[u8]) -> Result<Response> {
688        let mut request_builder = self.client.post(node.url);
689        if let Some(jwt) = node.jwt {
690            request_builder = request_builder.bearer_auth(jwt);
691        }
692        #[cfg(not(feature = "wasm"))]
693        {
694            request_builder = request_builder.timeout(_timeout);
695        }
696        request_builder = request_builder.header("Content-Type", "application/octet-stream");
697        Self::parse_response(request_builder.body(body.to_vec()).send().await?).await
698    }
699
700    pub(crate) async fn post_json(&self, node: Node, _timeout: Duration, json: Value) -> Result<Response> {
701        let mut request_builder = self.client.post(node.url);
702        if let Some(jwt) = node.jwt {
703            request_builder = request_builder.bearer_auth(jwt);
704        }
705        #[cfg(not(feature = "wasm"))]
706        {
707            request_builder = request_builder.timeout(_timeout);
708        }
709        Self::parse_response(request_builder.json(&json).send().await?).await
710    }
711}
712
713#[cfg(all(feature = "sync", not(feature = "async")))]
714impl HttpClient {
715    pub(crate) fn new() -> Self {
716        Self {}
717    }
718
719    pub(crate) fn clone(&self) -> Self {
720        Self {}
721    }
722
723    pub(crate) async fn get(&self, node: Node, timeout: Duration) -> Result<Response> {
724        let mut request_builder = Self::get_ureq_agent(timeout).get(node.url.as_str());
725        if let Some(jwt) = node.jwt {
726            request_builder = request_builder.set("Authorization", &format!("Bearer {}", jwt));
727        }
728        Ok(request_builder.call()?.into())
729    }
730
731    pub(crate) async fn post_bytes(&self, node: Node, timeout: Duration, body: &[u8]) -> Result<Response> {
732        let mut request_builder = Self::get_ureq_agent(timeout).post(node.url.as_str());
733        if let Some(jwt) = node.jwt {
734            request_builder = request_builder.set("Authorization", &format!("Bearer {}", jwt));
735        }
736        request_builder = request_builder.set("Content-Type", "application/octet-stream");
737        Ok(request_builder.send_bytes(body)?.into())
738    }
739
740    pub(crate) async fn post_json(&self, node: Node, timeout: Duration, json: Value) -> Result<Response> {
741        let mut request_builder = Self::get_ureq_agent(timeout).post(node.url.as_str());
742        if let Some(jwt) = node.jwt {
743            request_builder = request_builder.set("Authorization", &format!("Bearer {}", jwt));
744        }
745        Ok(request_builder.send_json(json)?.into())
746    }
747
748    fn get_ureq_agent(timeout: Duration) -> Agent {
749        AgentBuilder::new().timeout_read(timeout).timeout_write(timeout).build()
750    }
751}
752
753/// Validates if the url starts with http or https
754pub fn validate_url(url: Url) -> Result<Url> {
755    if url.scheme() != "http" && url.scheme() != "https" {
756        return Err(Error::UrlValidationError(format!("Invalid scheme: {}", url.scheme())));
757    }
758    Ok(url)
759}
760
761/// JSON struct for NodeDetail from the node_pool_urls
762#[derive(Debug, Serialize, Deserialize)]
763pub struct NodeDetail {
764    /// Iota node url
765    pub node: String,
766    /// Network id
767    pub network_id: String,
768    /// Implementation name
769    pub implementation: String,
770    /// Enabled PoW
771    pub pow: bool,
772}