iota_client/
client.rs

1// Copyright 2021 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! The Client module to connect through HORNET or Bee with API usages
5use crate::{
6    api::*,
7    builder::{ClientBuilder, NetworkInfo, GET_API_TIMEOUT},
8    error::*,
9    node::*,
10    node_manager::Node,
11};
12use bee_common::packable::Packable;
13use bee_message::{
14    constants::INPUT_OUTPUT_COUNT_MAX,
15    payload::Payload,
16    prelude::{
17        Address, Ed25519Address, Message, MessageBuilder, MessageId, Parents, TransactionId, UtxoInput,
18        ED25519_ADDRESS_LENGTH,
19    },
20};
21use bee_pow::providers::{
22    miner::{MinerBuilder, MinerCancel},
23    NonceProvider, NonceProviderBuilder,
24};
25use bee_rest_api::types::{
26    body::SuccessBody,
27    dtos::{LedgerInclusionStateDto, MessageDto, PeerDto, ReceiptDto},
28    responses::{
29        BalanceAddressResponse, InfoResponse as NodeInfo, MessageResponse, MilestoneResponse as MilestoneResponseDto,
30        OutputResponse, PeersResponse, ReceiptsResponse, SubmitMessageResponse, TipsResponse, TreasuryResponse,
31        UtxoChangesResponse as MilestoneUTXOChanges,
32    },
33};
34use crypto::{
35    hashes::{blake2b::Blake2b256, Digest},
36    keys::{
37        bip39::{mnemonic_to_seed, wordlist},
38        slip10::Seed,
39    },
40    utils,
41};
42
43use zeroize::Zeroize;
44
45use crate::builder::TIPS_INTERVAL;
46#[cfg(feature = "wasm")]
47use gloo_timers::future::TimeoutFuture;
48#[cfg(feature = "mqtt")]
49use rumqttc::AsyncClient as MqttClient;
50#[cfg(feature = "mqtt")]
51use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
52#[cfg(not(feature = "wasm"))]
53use tokio::{
54    runtime::Runtime,
55    sync::broadcast::{Receiver, Sender},
56    time::{sleep, Duration as TokioDuration},
57};
58use url::Url;
59
60use std::{
61    collections::{HashMap, HashSet},
62    convert::{TryFrom, TryInto},
63    hash::Hash,
64    ops::Range,
65    str::FromStr,
66    sync::{Arc, RwLock},
67    time::Duration,
68};
69
70const RESPONSE_MAX_OUTPUTS: usize = 1000;
71const DUST_THRESHOLD: u64 = 1_000_000;
72
73/// NodeInfo wrapper which contains the nodeinfo and the url from the node (useful when multiple nodes are used)
74#[derive(Debug, Serialize, Deserialize)]
75pub struct NodeInfoWrapper {
76    /// The returned nodeinfo
77    pub nodeinfo: NodeInfo,
78    /// The url from the node which returned the nodeinfo
79    pub url: String,
80}
81
82#[derive(Debug, Serialize, Clone, Copy)]
83/// Milestone data.
84pub struct MilestoneResponse {
85    /// Milestone index.
86    pub index: u32,
87    /// Milestone message id.
88    #[serde(rename = "messageId")]
89    pub message_id: MessageId,
90    /// Milestone timestamp.
91    pub timestamp: u64,
92}
93
94#[cfg(feature = "mqtt")]
95type TopicHandler = Box<dyn Fn(&TopicEvent) + Send + Sync>;
96#[cfg(feature = "mqtt")]
97pub(crate) type TopicHandlerMap = HashMap<Topic, Vec<Arc<TopicHandler>>>;
98
99/// An event from a MQTT topic.
100#[cfg(feature = "mqtt")]
101#[derive(Debug, Clone, serde::Serialize)]
102pub struct TopicEvent {
103    /// the MQTT topic.
104    pub topic: String,
105    /// The MQTT event payload.
106    pub payload: String,
107}
108
109/// Mqtt events.
110#[cfg(feature = "mqtt")]
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum MqttEvent {
113    /// Client was connected.
114    Connected,
115    /// Client was disconnected.
116    Disconnected,
117}
118
119/// The MQTT broker options.
120#[cfg(feature = "mqtt")]
121#[derive(Debug, Clone, serde::Deserialize)]
122pub struct BrokerOptions {
123    #[serde(default = "default_broker_automatic_disconnect", rename = "automaticDisconnect")]
124    pub(crate) automatic_disconnect: bool,
125    #[serde(default = "default_broker_timeout")]
126    pub(crate) timeout: Duration,
127    #[serde(default = "default_broker_use_ws", rename = "useWs")]
128    pub(crate) use_ws: bool,
129    #[serde(default = "default_broker_port")]
130    pub(crate) port: u16,
131    #[serde(default = "default_max_reconnection_attempts", rename = "maxReconnectionAttempts")]
132    pub(crate) max_reconnection_attempts: usize,
133}
134
135#[cfg(feature = "mqtt")]
136fn default_broker_automatic_disconnect() -> bool {
137    true
138}
139
140#[cfg(feature = "mqtt")]
141fn default_broker_timeout() -> Duration {
142    Duration::from_secs(30)
143}
144#[cfg(feature = "mqtt")]
145fn default_broker_use_ws() -> bool {
146    true
147}
148
149#[cfg(feature = "mqtt")]
150fn default_broker_port() -> u16 {
151    1883
152}
153
154#[cfg(feature = "mqtt")]
155fn default_max_reconnection_attempts() -> usize {
156    0
157}
158
159#[cfg(feature = "mqtt")]
160impl Default for BrokerOptions {
161    fn default() -> Self {
162        Self {
163            automatic_disconnect: default_broker_automatic_disconnect(),
164            timeout: default_broker_timeout(),
165            use_ws: default_broker_use_ws(),
166            port: default_broker_port(),
167            max_reconnection_attempts: default_max_reconnection_attempts(),
168        }
169    }
170}
171
172#[cfg(feature = "mqtt")]
173impl BrokerOptions {
174    /// Creates the default broker options.
175    pub fn new() -> Self {
176        Default::default()
177    }
178
179    /// Whether the MQTT broker should be automatically disconnected when all topics are unsubscribed or not.
180    pub fn automatic_disconnect(mut self, automatic_disconnect: bool) -> Self {
181        self.automatic_disconnect = automatic_disconnect;
182        self
183    }
184
185    /// Sets the timeout used for the MQTT operations.
186    pub fn timeout(mut self, timeout: Duration) -> Self {
187        self.timeout = timeout;
188        self
189    }
190
191    /// Sets the use_ws used for the MQTT operations.
192    pub fn use_ws(mut self, use_ws: bool) -> Self {
193        self.use_ws = use_ws;
194        self
195    }
196
197    /// Sets the port used for the MQTT operations.
198    pub fn port(mut self, port: u16) -> Self {
199        self.port = port;
200        self
201    }
202
203    /// Sets the maximum number of reconnection attempts. 0 is unlimited.
204    pub fn max_reconnection_attempts(mut self, max_reconnection_attempts: usize) -> Self {
205        self.max_reconnection_attempts = max_reconnection_attempts;
206        self
207    }
208}
209
210/// The miner builder.
211#[derive(Default)]
212pub struct ClientMinerBuilder {
213    local_pow: bool,
214    cancel: MinerCancel,
215}
216
217impl ClientMinerBuilder {
218    /// Sets the local PoW config
219    pub fn with_local_pow(mut self, value: bool) -> Self {
220        self.local_pow = value;
221        self
222    }
223    /// Set cancel miner
224    pub fn with_cancel(mut self, cancel: MinerCancel) -> Self {
225        self.cancel = cancel;
226        self
227    }
228}
229
230impl NonceProviderBuilder for ClientMinerBuilder {
231    type Provider = ClientMiner;
232
233    fn new() -> Self {
234        Self::default()
235    }
236
237    fn finish(self) -> ClientMiner {
238        ClientMiner {
239            local_pow: self.local_pow,
240            cancel: self.cancel,
241        }
242    }
243}
244
245/// The miner used for PoW
246pub struct ClientMiner {
247    local_pow: bool,
248    cancel: MinerCancel,
249}
250
251impl NonceProvider for ClientMiner {
252    type Builder = ClientMinerBuilder;
253    type Error = crate::Error;
254
255    fn nonce(&self, bytes: &[u8], target_score: f64) -> std::result::Result<u64, Self::Error> {
256        if self.local_pow {
257            MinerBuilder::new()
258                .with_num_workers(num_cpus::get())
259                .with_cancel(self.cancel.clone())
260                .finish()
261                .nonce(bytes, target_score)
262                .map_err(|e| crate::Error::Pow(e.to_string()))
263        } else {
264            Ok(0)
265        }
266    }
267}
268
269/// Each of the node APIs the client uses.
270#[derive(Clone, Eq, PartialEq, Hash)]
271pub enum Api {
272    /// `get_health` API
273    GetHealth,
274    /// `get_info`API
275    GetInfo,
276    /// `get_peers`API
277    GetPeers,
278    /// `get_tips` API
279    GetTips,
280    /// `post_message` API
281    PostMessage,
282    /// `post_message` API with remote pow
283    PostMessageWithRemotePow,
284    /// `get_output` API
285    GetOutput,
286    /// `get_milestone` API
287    GetMilestone,
288    /// `get_message` API
289    GetMessage,
290    /// `get_balance` API
291    GetBalance,
292}
293
294impl FromStr for Api {
295    type Err = String;
296
297    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
298        let t = match s {
299            "GetHealth" => Self::GetHealth,
300            "GetInfo" => Self::GetInfo,
301            "GetPeers" => Self::GetPeers,
302            "GetTips" => Self::GetTips,
303            "PostMessage" => Self::PostMessage,
304            "PostMessageWithRemotePow" => Self::PostMessageWithRemotePow,
305            "GetOutput" => Self::GetOutput,
306            "GetMilestone" => Self::GetMilestone,
307            "GetMessage" => Self::GetMessage,
308            "GetBalance" => Self::GetBalance,
309            _ => return Err(format!("unknown api kind `{s}`")),
310        };
311        Ok(t)
312    }
313}
314
315/// An instance of the client using HORNET or Bee URI
316#[cfg_attr(feature = "wasm", derive(Clone))]
317pub struct Client {
318    #[allow(dead_code)]
319    #[cfg(not(feature = "wasm"))]
320    pub(crate) runtime: Option<Runtime>,
321    /// Node manager
322    pub(crate) node_manager: crate::node_manager::NodeManager,
323    /// Flag to stop the node syncing
324    #[cfg(not(feature = "wasm"))]
325    pub(crate) sync_kill_sender: Option<Arc<Sender<()>>>,
326    /// A MQTT client to subscribe/unsubscribe to topics.
327    #[cfg(feature = "mqtt")]
328    pub(crate) mqtt_client: Option<MqttClient>,
329    #[cfg(feature = "mqtt")]
330    pub(crate) mqtt_topic_handlers: Arc<tokio::sync::RwLock<TopicHandlerMap>>,
331    #[cfg(feature = "mqtt")]
332    pub(crate) broker_options: BrokerOptions,
333    #[cfg(feature = "mqtt")]
334    pub(crate) mqtt_event_channel: (Arc<WatchSender<MqttEvent>>, WatchReceiver<MqttEvent>),
335    pub(crate) network_info: Arc<RwLock<NetworkInfo>>,
336    /// HTTP request timeout.
337    pub(crate) request_timeout: Duration,
338    /// HTTP request timeout for each API call.
339    pub(crate) api_timeout: HashMap<Api, Duration>,
340}
341
342impl std::fmt::Debug for Client {
343    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
344        let mut d = f.debug_struct("Client");
345        d.field("node_manager", &self.node_manager);
346        #[cfg(feature = "mqtt")]
347        d.field("broker_options", &self.broker_options);
348        d.field("network_info", &self.network_info).finish()
349    }
350}
351
352impl Drop for Client {
353    /// Gracefully shutdown the `Client`
354    fn drop(&mut self) {
355        #[cfg(not(feature = "wasm"))]
356        if let Some(sender) = self.sync_kill_sender.take() {
357            sender.send(()).expect("failed to stop syncing process");
358        }
359
360        #[cfg(not(feature = "wasm"))]
361        if let Some(runtime) = self.runtime.take() {
362            runtime.shutdown_background();
363        }
364
365        #[cfg(feature = "mqtt")]
366        if let Some(mqtt_client) = self.mqtt_client.take() {
367            std::thread::spawn(move || {
368                // ignore errors in case the event loop was already dropped
369                // .cancel() finishes the event loop right away
370                let _ = crate::async_runtime::block_on(mqtt_client.cancel());
371            })
372            .join()
373            .unwrap();
374        }
375    }
376}
377
378impl Client {
379    /// Create the builder to instntiate the IOTA Client.
380    pub fn builder() -> ClientBuilder {
381        ClientBuilder::new()
382    }
383
384    /// Sync the node lists per node_sync_interval milliseconds
385    #[cfg(not(feature = "wasm"))]
386    pub(crate) fn start_sync_process(
387        runtime: &Runtime,
388        sync: Arc<RwLock<HashSet<Node>>>,
389        nodes: HashSet<Node>,
390        node_sync_interval: Duration,
391        network_info: Arc<RwLock<NetworkInfo>>,
392        mut kill: Receiver<()>,
393    ) {
394        let node_sync_interval =
395            TokioDuration::from_nanos(node_sync_interval.as_nanos().try_into().unwrap_or(TIPS_INTERVAL));
396
397        runtime.spawn(async move {
398            loop {
399                tokio::select! {
400                    _ = async {
401                            // delay first since the first `sync_nodes` call is made by the builder
402                            // to ensure the node list is filled before the client is used
403                            sleep(node_sync_interval).await;
404                            Client::sync_nodes(&sync, &nodes, &network_info).await;
405                    } => {}
406                    _ = kill.recv() => {}
407                }
408            }
409        });
410    }
411
412    #[cfg(not(feature = "wasm"))]
413    pub(crate) async fn sync_nodes(
414        sync: &Arc<RwLock<HashSet<Node>>>,
415        nodes: &HashSet<Node>,
416        network_info: &Arc<RwLock<NetworkInfo>>,
417    ) {
418        let mut synced_nodes = HashSet::new();
419        let mut network_nodes: HashMap<String, Vec<(NodeInfo, Node)>> = HashMap::new();
420        for node in nodes {
421            // Put the healthy node url into the network_nodes
422            if let Ok(info) = Client::get_node_info(node.url.as_ref(), None, None).await {
423                if info.is_healthy {
424                    match network_nodes.get_mut(&info.network_id) {
425                        Some(network_id_entry) => {
426                            network_id_entry.push((info, node.clone()));
427                        }
428                        None => match &network_info
429                            .read()
430                            .map_or(NetworkInfo::default().network, |info| info.network.clone())
431                        {
432                            Some(id) => {
433                                if info.network_id.contains(id) {
434                                    network_nodes.insert(info.network_id.clone(), vec![(info, node.clone())]);
435                                }
436                            }
437                            None => {
438                                network_nodes.insert(info.network_id.clone(), vec![(info, node.clone())]);
439                            }
440                        },
441                    }
442                }
443            }
444        }
445        // Get network_id with the most nodes
446        let mut most_nodes = ("network_id", 0);
447        for (network_id, node) in network_nodes.iter() {
448            if node.len() > most_nodes.1 {
449                most_nodes.0 = network_id;
450                most_nodes.1 = node.len();
451            }
452        }
453        if let Some(nodes) = network_nodes.get(most_nodes.0) {
454            for (info, node_url) in nodes.iter() {
455                if let Ok(mut client_network_info) = network_info.write() {
456                    client_network_info.network_id = hash_network(&info.network_id).ok();
457                    client_network_info.min_pow_score = info.min_pow_score;
458                    client_network_info.bech32_hrp = info.bech32_hrp.clone();
459                    if !client_network_info.local_pow {
460                        if info.features.contains(&"PoW".to_string()) {
461                            synced_nodes.insert(node_url.clone());
462                        }
463                    } else {
464                        synced_nodes.insert(node_url.clone());
465                    }
466                }
467            }
468        }
469
470        // Update the sync list
471        if let Ok(mut sync) = sync.write() {
472            *sync = synced_nodes;
473        }
474    }
475
476    /// Get a node candidate from the synced node pool.
477    pub async fn get_node(&self) -> Result<Node> {
478        if let Some(primary_node) = &self.node_manager.primary_node {
479            return Ok(primary_node.clone());
480        }
481        let pool = self.node_manager.nodes.clone();
482        pool.into_iter().next().ok_or(Error::SyncedNodePoolEmpty)
483    }
484
485    /// Gets the network id of the node we're connecting to.
486    pub async fn get_network_id(&self) -> Result<u64> {
487        let network_info = self.get_network_info().await?;
488        network_info
489            .network_id
490            .ok_or(Error::MissingParameter("Missing network id."))
491    }
492
493    /// Gets the miner to use based on the PoW setting
494    pub async fn get_pow_provider(&self) -> ClientMiner {
495        ClientMinerBuilder::new()
496            .with_local_pow(self.get_local_pow().await)
497            .finish()
498    }
499
500    /// Gets the network related information such as network_id and min_pow_score
501    /// and if it's the default one, sync it first.
502    pub async fn get_network_info(&self) -> Result<NetworkInfo> {
503        let not_synced = self.network_info.read().map_or(true, |info| info.network_id.is_none());
504
505        if not_synced {
506            let info = self.get_info().await?.nodeinfo;
507            let network_id = hash_network(&info.network_id).ok();
508            {
509                let mut client_network_info = self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
510                client_network_info.network_id = network_id;
511                client_network_info.min_pow_score = info.min_pow_score;
512                client_network_info.bech32_hrp = info.bech32_hrp;
513            }
514        }
515        let res = self
516            .network_info
517            .read()
518            .map_or(NetworkInfo::default(), |info| info.clone());
519        Ok(res)
520    }
521
522    /// returns the bech32_hrp
523    pub async fn get_bech32_hrp(&self) -> Result<String> {
524        Ok(self.get_network_info().await?.bech32_hrp)
525    }
526
527    /// returns the min pow score
528    pub async fn get_min_pow_score(&self) -> Result<f64> {
529        Ok(self.get_network_info().await?.min_pow_score)
530    }
531
532    /// returns the tips interval
533    pub async fn get_tips_interval(&self) -> u64 {
534        self.network_info
535            .read()
536            .map_or(TIPS_INTERVAL, |info| info.tips_interval)
537    }
538
539    /// returns the local pow
540    pub async fn get_local_pow(&self) -> bool {
541        self.network_info
542            .read()
543            .map_or(NetworkInfo::default().local_pow, |info| info.local_pow)
544    }
545
546    /// returns the fallback_to_local_pow
547    pub async fn get_fallback_to_local_pow(&self) -> bool {
548        self.network_info
549            .read()
550            .map_or(NetworkInfo::default().fallback_to_local_pow, |info| {
551                info.fallback_to_local_pow
552            })
553    }
554
555    /// returns the unsynced nodes.
556    #[cfg(not(feature = "wasm"))]
557    pub async fn unsynced_nodes(&self) -> HashSet<&Node> {
558        self.node_manager.synced_nodes.read().map_or(HashSet::new(), |synced| {
559            self.node_manager
560                .nodes
561                .iter()
562                .filter(|node| !synced.contains(node))
563                .collect()
564        })
565    }
566
567    /// Generates a new mnemonic.
568    pub fn generate_mnemonic() -> Result<String> {
569        let mut entropy = [0u8; 32];
570        utils::rand::fill(&mut entropy)?;
571        let mnemonic = wordlist::encode(&entropy, &crypto::keys::bip39::wordlist::ENGLISH)
572            .map_err(|e| crate::Error::MnemonicError(format!("{e:?}")))?;
573        entropy.zeroize();
574        Ok(mnemonic)
575    }
576
577    /// Returns a hex encoded seed for a mnemonic.
578    pub fn mnemonic_to_hex_seed(mnemonic: &str) -> Result<String> {
579        // trim because empty spaces could create a different seed https://github.com/iotaledger/crypto.rs/issues/125
580        let mnemonic = mnemonic.trim();
581        // first we check if the mnemonic is valid to give meaningful errors
582        crypto::keys::bip39::wordlist::verify(mnemonic, &crypto::keys::bip39::wordlist::ENGLISH)
583            .map_err(|e| crate::Error::InvalidMnemonic(format!("{e:?}")))?;
584        let mut mnemonic_seed = [0u8; 64];
585        mnemonic_to_seed(mnemonic, "", &mut mnemonic_seed);
586        Ok(hex::encode(mnemonic_seed))
587    }
588
589    /// Function to find inputs from addresses for a provided amount (useful for offline signing)
590    pub async fn find_inputs(&self, addresses: Vec<String>, amount: u64) -> Result<Vec<UtxoInput>> {
591        // Get outputs from node and select inputs
592        let mut available_outputs = Vec::new();
593        for address in addresses {
594            available_outputs.extend_from_slice(&self.get_address().outputs(&address, Default::default()).await?);
595        }
596
597        let mut signature_locked_outputs = Vec::new();
598        let mut dust_allowance_outputs = Vec::new();
599
600        for output in available_outputs.into_iter() {
601            let output_data = self.get_output(&output).await?;
602            let (amount, _, signature_locked) =
603                ClientMessageBuilder::get_output_amount_and_address(&output_data.output)?;
604            if signature_locked {
605                signature_locked_outputs.push((output, amount));
606            } else {
607                dust_allowance_outputs.push((output, amount));
608            }
609        }
610        signature_locked_outputs.sort_by(|l, r| r.1.cmp(&l.1));
611        dust_allowance_outputs.sort_by(|l, r| r.1.cmp(&l.1));
612
613        let mut total_already_spent = 0;
614        let mut selected_inputs = Vec::new();
615        for (_offset, output_wrapper) in signature_locked_outputs
616            .into_iter()
617            .chain(dust_allowance_outputs.into_iter())
618            // Max inputs is 127
619            .take(INPUT_OUTPUT_COUNT_MAX)
620            .enumerate()
621        {
622            // Break if we have enough funds and don't create dust for the remainder
623            if total_already_spent == amount || total_already_spent >= amount + DUST_THRESHOLD {
624                break;
625            }
626            selected_inputs.push(output_wrapper.0.clone());
627            total_already_spent += output_wrapper.1;
628        }
629
630        if total_already_spent < amount
631            || (total_already_spent != amount && total_already_spent < amount + DUST_THRESHOLD)
632        {
633            return Err(crate::Error::NotEnoughBalance(total_already_spent, amount));
634        }
635
636        Ok(selected_inputs)
637    }
638
639    ///////////////////////////////////////////////////////////////////////
640    // MQTT API
641    //////////////////////////////////////////////////////////////////////
642
643    /// Returns a handle to the MQTT topics manager.
644    #[cfg(feature = "mqtt")]
645    pub fn subscriber(&mut self) -> MqttManager<'_> {
646        MqttManager::new(self)
647    }
648
649    /// Returns the mqtt event receiver.
650    #[cfg(feature = "mqtt")]
651    pub fn mqtt_event_receiver(&self) -> WatchReceiver<MqttEvent> {
652        self.mqtt_event_channel.1.clone()
653    }
654
655    //////////////////////////////////////////////////////////////////////
656    // Node API
657    //////////////////////////////////////////////////////////////////////
658
659    pub(crate) fn get_timeout(&self, api: Api) -> Duration {
660        *self.api_timeout.get(&api).unwrap_or(&self.request_timeout)
661    }
662
663    /// GET /health endpoint
664    pub async fn get_node_health(url: &str) -> Result<bool> {
665        let mut url = Url::parse(url)?;
666        url.set_path("health");
667        let status = crate::node_manager::HttpClient::new()
668            .get(Node { url, jwt: None }, GET_API_TIMEOUT)
669            .await?
670            .status();
671        match status {
672            200 => Ok(true),
673            _ => Ok(false),
674        }
675    }
676
677    /// GET /health endpoint
678    pub async fn get_health(&self) -> Result<bool> {
679        let mut node = self.get_node().await?;
680        node.url.set_path("health");
681        let status = self.node_manager.http_client.get(node, GET_API_TIMEOUT).await?.status();
682        match status {
683            200 => Ok(true),
684            _ => Ok(false),
685        }
686    }
687
688    /// GET /api/v1/info endpoint
689    pub async fn get_node_info(
690        url: &str,
691        jwt: Option<String>,
692        auth_name_pwd: Option<(&str, &str)>,
693    ) -> Result<NodeInfo> {
694        let mut url = crate::node_manager::validate_url(Url::parse(url)?)?;
695        if let Some((name, password)) = auth_name_pwd {
696            url.set_username(name)
697                .map_err(|_| crate::Error::UrlAuthError("username".to_string()))?;
698            url.set_password(Some(password))
699                .map_err(|_| crate::Error::UrlAuthError("password".to_string()))?;
700        }
701
702        let path = "api/v1/info";
703        url.set_path(path);
704
705        let resp: SuccessBody<NodeInfo> = crate::node_manager::HttpClient::new()
706            .get(Node { url, jwt }, GET_API_TIMEOUT)
707            .await?
708            .json()
709            .await?;
710
711        Ok(resp.data)
712    }
713
714    /// GET /api/v1/info endpoint
715    pub async fn get_info(&self) -> Result<NodeInfoWrapper> {
716        let path = "api/v1/info";
717
718        let resp: NodeInfoWrapper = self
719            .node_manager
720            .get_request(path, None, self.get_timeout(Api::GetInfo))
721            .await?;
722
723        Ok(resp)
724    }
725
726    /// GET /api/v1/peers endpoint
727    pub async fn get_peers(&self) -> Result<Vec<PeerDto>> {
728        let path = "api/v1/peers";
729
730        let resp: SuccessBody<PeersResponse> = self
731            .node_manager
732            .get_request(path, None, self.get_timeout(Api::GetPeers))
733            .await?;
734
735        Ok(resp.data.0)
736    }
737
738    /// GET /api/v1/tips endpoint
739    pub async fn get_tips(&self) -> Result<Vec<MessageId>> {
740        let path = "api/v1/tips";
741
742        let resp: SuccessBody<TipsResponse> = self
743            .node_manager
744            .get_request(path, None, self.get_timeout(Api::GetTips))
745            .await?;
746
747        let mut tips = Vec::new();
748        for tip in resp.data.tip_message_ids {
749            let mut new_tip = [0u8; 32];
750            hex::decode_to_slice(tip, &mut new_tip)?;
751            tips.push(MessageId::from(new_tip));
752        }
753        Ok(tips)
754    }
755
756    /// POST /api/v1/messages endpoint
757    pub async fn post_message(&self, message: &Message) -> Result<MessageId> {
758        let path = "api/v1/messages";
759        let local_pow = self.get_local_pow().await;
760        let timeout = if local_pow {
761            self.get_timeout(Api::PostMessage)
762        } else {
763            self.get_timeout(Api::PostMessageWithRemotePow)
764        };
765
766        // fallback to local PoW if remote PoW fails
767        let resp: SuccessBody<SubmitMessageResponse> = match self
768            .node_manager
769            .post_request_bytes(path, timeout, &message.pack_new(), local_pow)
770            .await
771        {
772            Ok(res) => res,
773            Err(e) => {
774                if let Error::NodeError(e) = e {
775                    let fallback_to_local_pow = self.get_fallback_to_local_pow().await;
776                    // hornet and bee return different error messages
777                    if (e == *"No available nodes with remote PoW"
778                        || e.contains("proof of work is not enabled")
779                        || e.contains("`PoW` not enabled"))
780                        && fallback_to_local_pow
781                    {
782                        // Without this we get:within `impl Future<Output = [async output]>`, the trait `Send` is not
783                        // implemented for `std::sync::RwLockWriteGuard<'_, NetworkInfo>`
784                        {
785                            let mut client_network_info =
786                                self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
787                            // switch to local PoW
788                            client_network_info.local_pow = true;
789                        }
790                        #[cfg(not(feature = "wasm"))]
791                        let msg_res = crate::api::finish_pow(self, message.payload().clone()).await;
792                        #[cfg(feature = "wasm")]
793                        let msg_res = {
794                            let min_pow_score = self.get_min_pow_score().await?;
795                            let network_id = self.get_network_id().await?;
796                            crate::api::finish_single_thread_pow(
797                                self,
798                                network_id,
799                                None,
800                                message.payload().clone(),
801                                min_pow_score,
802                            )
803                            .await
804                        };
805                        let message_with_local_pow = match msg_res {
806                            Ok(msg) => {
807                                // reset local PoW state
808                                let mut client_network_info =
809                                    self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
810                                client_network_info.local_pow = false;
811                                msg
812                            }
813                            Err(e) => {
814                                // reset local PoW state
815                                let mut client_network_info =
816                                    self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
817                                client_network_info.local_pow = false;
818                                return Err(e);
819                            }
820                        };
821                        self.node_manager
822                            .post_request_bytes(path, timeout, &message_with_local_pow.pack_new(), true)
823                            .await?
824                    } else {
825                        return Err(Error::NodeError(e));
826                    }
827                } else {
828                    return Err(e);
829                }
830            }
831        };
832
833        let mut message_id_bytes = [0u8; 32];
834        hex::decode_to_slice(resp.data.message_id, &mut message_id_bytes)?;
835        Ok(MessageId::from(message_id_bytes))
836    }
837
838    /// POST JSON to /api/v1/messages endpoint
839    pub async fn post_message_json(&self, message: &Message) -> Result<MessageId> {
840        let path = "api/v1/messages";
841        let local_pow = self.get_local_pow().await;
842        let timeout = if local_pow {
843            self.get_timeout(Api::PostMessage)
844        } else {
845            self.get_timeout(Api::PostMessageWithRemotePow)
846        };
847        let message_dto = MessageDto::from(message);
848
849        // fallback to local PoW if remote PoW fails
850        let resp: SuccessBody<SubmitMessageResponse> = match self
851            .node_manager
852            .post_request_json(path, timeout, serde_json::to_value(message_dto)?, local_pow)
853            .await
854        {
855            Ok(res) => res,
856            Err(e) => {
857                if let Error::NodeError(e) = e {
858                    let fallback_to_local_pow = self.get_fallback_to_local_pow().await;
859                    // hornet and bee return different error messages
860                    if (e == *"No available nodes with remote PoW"
861                        || e.contains("proof of work is not enabled")
862                        || e.contains("`PoW` not enabled"))
863                        && fallback_to_local_pow
864                    {
865                        // Without this we get:within `impl Future<Output = [async output]>`, the trait `Send` is not
866                        // implemented for `std::sync::RwLockWriteGuard<'_, NetworkInfo>`
867                        {
868                            let mut client_network_info =
869                                self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
870                            // switch to local PoW
871                            client_network_info.local_pow = true;
872                        }
873                        #[cfg(not(feature = "wasm"))]
874                        let msg_res = crate::api::finish_pow(self, message.payload().clone()).await;
875                        #[cfg(feature = "wasm")]
876                        let msg_res = {
877                            let min_pow_score = self.get_min_pow_score().await?;
878                            let network_id = self.get_network_id().await?;
879                            crate::api::finish_single_thread_pow(
880                                self,
881                                network_id,
882                                None,
883                                message.payload().clone(),
884                                min_pow_score,
885                            )
886                            .await
887                        };
888                        let message_with_local_pow = match msg_res {
889                            Ok(msg) => {
890                                // reset local PoW state
891                                let mut client_network_info =
892                                    self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
893                                client_network_info.local_pow = false;
894                                msg
895                            }
896                            Err(e) => {
897                                // reset local PoW state
898                                let mut client_network_info =
899                                    self.network_info.write().map_err(|_| crate::Error::PoisonError)?;
900                                client_network_info.local_pow = false;
901                                return Err(e);
902                            }
903                        };
904                        let message_dto = MessageDto::from(&message_with_local_pow);
905
906                        self.node_manager
907                            .post_request_json(path, timeout, serde_json::to_value(message_dto)?, true)
908                            .await?
909                    } else {
910                        return Err(Error::NodeError(e));
911                    }
912                } else {
913                    return Err(e);
914                }
915            }
916        };
917
918        let mut message_id_bytes = [0u8; 32];
919        hex::decode_to_slice(resp.data.message_id, &mut message_id_bytes)?;
920        Ok(MessageId::from(message_id_bytes))
921    }
922
923    /// GET /api/v1/messages/{messageId} endpoint
924    pub fn get_message(&self) -> GetMessageBuilder<'_> {
925        GetMessageBuilder::new(self)
926    }
927
928    /// GET /api/v1/outputs/{outputId} endpoint
929    /// Find an output by its transaction_id and corresponding output_index.
930    pub async fn get_output(&self, output_id: &UtxoInput) -> Result<OutputResponse> {
931        let path = &format!(
932            "api/v1/outputs/{}{}",
933            output_id.output_id().transaction_id(),
934            hex::encode(output_id.output_id().index().to_le_bytes())
935        );
936
937        let resp: SuccessBody<OutputResponse> = self
938            .node_manager
939            .get_request(path, None, self.get_timeout(Api::GetOutput))
940            .await?;
941
942        Ok(resp.data)
943    }
944
945    /// Find all outputs based on the requests criteria. This method will try to query multiple nodes if
946    /// the request amount exceeds individual node limit.
947    pub async fn find_outputs(&self, outputs: &[UtxoInput], addresses: &[String]) -> Result<Vec<OutputResponse>> {
948        let mut output_metadata = Vec::<OutputResponse>::new();
949        // Use a `HashSet` to prevent duplicate output.
950        let mut output_to_query = HashSet::<UtxoInput>::new();
951
952        // Collect the `UtxoInput` in the HashSet.
953        for output in outputs {
954            output_to_query.insert(output.to_owned());
955        }
956
957        // Use `get_address()` API to get the address outputs first,
958        // then collect the `UtxoInput` in the HashSet.
959        for address in addresses {
960            let address_outputs = self.get_address().outputs(address, Default::default()).await?;
961            for output in address_outputs.iter() {
962                output_to_query.insert(output.to_owned());
963            }
964            // 1000 is the max amount of outputs we get from the node, so if we reach that limit we maybe don't get all
965            // outputs and that's why we additionally only request dust allowance outputs
966            if address_outputs.len() == RESPONSE_MAX_OUTPUTS {
967                let address_dust_allowance_outputs = self
968                    .get_address()
969                    .outputs(
970                        address,
971                        OutputsOptions {
972                            include_spent: false,
973                            output_type: Some(OutputType::SignatureLockedDustAllowance),
974                        },
975                    )
976                    .await?;
977                for output in address_dust_allowance_outputs.iter() {
978                    output_to_query.insert(output.to_owned());
979                }
980            }
981        }
982
983        // Use `get_output` API to get the `OutputMetadata`.
984        for output in output_to_query {
985            let meta_data = self.get_output(&output).await?;
986            output_metadata.push(meta_data);
987        }
988        Ok(output_metadata)
989    }
990
991    /// GET /api/v1/addresses/{address} endpoint
992    pub fn get_address(&self) -> GetAddressBuilder<'_> {
993        GetAddressBuilder::new(self)
994    }
995
996    /// GET /api/v1/milestones/{index} endpoint
997    /// Get the milestone by the given index.
998    pub async fn get_milestone(&self, index: u32) -> Result<MilestoneResponse> {
999        let path = &format!("api/v1/milestones/{index}");
1000
1001        let resp: SuccessBody<MilestoneResponseDto> = self
1002            .node_manager
1003            .get_request(path, None, self.get_timeout(Api::GetMilestone))
1004            .await?;
1005
1006        let milestone = resp.data;
1007        let mut message_id = [0u8; 32];
1008        hex::decode_to_slice(milestone.message_id, &mut message_id)?;
1009        Ok(MilestoneResponse {
1010            index: milestone.milestone_index,
1011            message_id: MessageId::new(message_id),
1012            timestamp: milestone.timestamp,
1013        })
1014    }
1015
1016    /// GET /api/v1/milestones/{index}/utxo-changes endpoint
1017    /// Get the milestone by the given index.
1018    pub async fn get_milestone_utxo_changes(&self, index: u32) -> Result<MilestoneUTXOChanges> {
1019        let path = &format!("api/v1/milestones/{index}/utxo-changes");
1020
1021        let resp: SuccessBody<MilestoneUTXOChanges> = self
1022            .node_manager
1023            .get_request(path, None, self.get_timeout(Api::GetMilestone))
1024            .await?;
1025
1026        Ok(resp.data)
1027    }
1028
1029    /// GET /api/v1/receipts endpoint
1030    /// Get all receipts.
1031    pub async fn get_receipts(&self) -> Result<Vec<ReceiptDto>> {
1032        let path = &"api/v1/receipts";
1033
1034        let resp: SuccessBody<ReceiptsResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1035
1036        Ok(resp.data.receipts)
1037    }
1038
1039    /// GET /api/v1/receipts/{migratedAt} endpoint
1040    /// Get the receipts by the given milestone index.
1041    pub async fn get_receipts_migrated_at(&self, milestone_index: u32) -> Result<Vec<ReceiptDto>> {
1042        let path = &format!("api/v1/receipts/{milestone_index}");
1043
1044        let resp: SuccessBody<ReceiptsResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1045
1046        Ok(resp.data.receipts)
1047    }
1048
1049    /// GET /api/v1/treasury endpoint
1050    /// Get the treasury output.
1051    pub async fn get_treasury(&self) -> Result<TreasuryResponse> {
1052        let path = "api/v1/treasury";
1053
1054        let resp: SuccessBody<TreasuryResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1055
1056        Ok(resp.data)
1057    }
1058
1059    /// GET /api/v1/transactions/{transactionId}/included-message
1060    /// Returns the included message of the transaction.
1061    pub async fn get_included_message(&self, transaction_id: &TransactionId) -> Result<Message> {
1062        let path = &format!("api/v1/transactions/{transaction_id}/included-message");
1063
1064        let resp: SuccessBody<MessageResponse> = self.node_manager.get_request(path, None, GET_API_TIMEOUT).await?;
1065        Ok(Message::try_from(&resp.data.0)?)
1066    }
1067    /// Reattaches messages for provided message id. Messages can be reattached only if they are valid and haven't been
1068    /// confirmed for a while.
1069    pub async fn reattach(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1070        let metadata = self.get_message().metadata(message_id).await?;
1071        if metadata.should_reattach.unwrap_or(false) {
1072            self.reattach_unchecked(message_id).await
1073        } else {
1074            Err(Error::NoNeedPromoteOrReattach(message_id.to_string()))
1075        }
1076    }
1077
1078    /// Reattach a message without checking if it should be reattached
1079    pub async fn reattach_unchecked(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1080        // Get the Message object by the MessageID.
1081        let message = self.get_message().data(message_id).await?;
1082        let reattach_message = {
1083            #[cfg(feature = "wasm")]
1084            {
1085                let network_id = self.get_network_id().await?;
1086                let mut tips = self.get_tips().await?;
1087                tips.sort_unstable_by_key(|a| a.pack_new());
1088                tips.dedup();
1089                let mut message_builder = MessageBuilder::<ClientMiner>::new()
1090                    .with_network_id(network_id)
1091                    .with_parents(Parents::new(tips)?);
1092                if let Some(p) = message.payload().to_owned() {
1093                    message_builder = message_builder.with_payload(p)
1094                }
1095                message_builder.finish().map_err(Error::MessageError)?
1096            }
1097            #[cfg(not(feature = "wasm"))]
1098            {
1099                finish_pow(self, message.payload().to_owned()).await?
1100            }
1101        };
1102
1103        // Post the modified
1104        let message_id = self.post_message(&reattach_message).await?;
1105        // Get message if we use remote PoW, because the node will change parents and nonce
1106        let msg = match self.get_local_pow().await {
1107            true => reattach_message,
1108            false => self.get_message().data(&message_id).await?,
1109        };
1110        Ok((message_id, msg))
1111    }
1112
1113    /// Promotes a message. The method should validate if a promotion is necessary through get_message. If not, the
1114    /// method should error out and should not allow unnecessary promotions.
1115    pub async fn promote(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1116        let metadata = self.get_message().metadata(message_id).await?;
1117        if metadata.should_promote.unwrap_or(false) {
1118            self.promote_unchecked(message_id).await
1119        } else {
1120            Err(Error::NoNeedPromoteOrReattach(message_id.to_string()))
1121        }
1122    }
1123
1124    /// Promote a message without checking if it should be promoted
1125    pub async fn promote_unchecked(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1126        // Create a new message (zero value message) for which one tip would be the actual message
1127        let mut tips = self.get_tips().await?;
1128        let min_pow_score = self.get_min_pow_score().await?;
1129        let network_id = self.get_network_id().await?;
1130        tips.push(*message_id);
1131        // Sort tips/parents
1132        tips.sort_unstable_by_key(|a| a.pack_new());
1133        tips.dedup();
1134
1135        let promote_message = MessageBuilder::<ClientMiner>::new()
1136            .with_network_id(network_id)
1137            .with_parents(Parents::new(tips)?)
1138            .with_nonce_provider(self.get_pow_provider().await, min_pow_score)
1139            .finish()
1140            .map_err(|_| Error::TransactionError)?;
1141
1142        let message_id = self.post_message(&promote_message).await?;
1143        // Get message if we use remote PoW, because the node will change parents and nonce
1144        let msg = match self.get_local_pow().await {
1145            true => promote_message,
1146            false => self.get_message().data(&message_id).await?,
1147        };
1148        Ok((message_id, msg))
1149    }
1150
1151    //////////////////////////////////////////////////////////////////////
1152    // High level API
1153    //////////////////////////////////////////////////////////////////////
1154
1155    /// A generic send function for easily sending transaction or indexation messages.
1156    pub fn message(&self) -> ClientMessageBuilder<'_> {
1157        ClientMessageBuilder::new(self)
1158    }
1159
1160    /// Return a valid unspent address.
1161    pub fn get_unspent_address<'a>(&'a self, seed: &'a Seed) -> GetUnspentAddressBuilder<'a> {
1162        GetUnspentAddressBuilder::new(self, seed)
1163    }
1164
1165    /// Return a list of addresses from the seed regardless of their validity.
1166    pub fn get_addresses<'a>(&'a self, seed: &'a Seed) -> GetAddressesBuilder<'a> {
1167        GetAddressesBuilder::new(seed).with_client(self)
1168    }
1169
1170    /// Find all messages by provided message IDs and/or indexation_keys.
1171    pub async fn find_messages<I: AsRef<[u8]>>(
1172        &self,
1173        indexation_keys: &[I],
1174        message_ids: &[MessageId],
1175    ) -> Result<Vec<Message>> {
1176        let mut messages = Vec::new();
1177
1178        // Use a `HashSet` to prevent duplicate message_ids.
1179        let mut message_ids_to_query = HashSet::<MessageId>::new();
1180
1181        // Collect the `MessageId` in the HashSet.
1182        for message_id in message_ids {
1183            message_ids_to_query.insert(message_id.to_owned());
1184        }
1185
1186        // Use `get_message().index()` API to get the message ID first,
1187        // then collect the `MessageId` in the HashSet.
1188        for index in indexation_keys {
1189            let message_ids = self.get_message().index(index).await?;
1190            for message_id in message_ids.iter() {
1191                message_ids_to_query.insert(message_id.to_owned());
1192            }
1193        }
1194
1195        // Use `get_message().data()` API to get the `Message`.
1196        for message_id in message_ids_to_query {
1197            let message = self.get_message().data(&message_id).await?;
1198            messages.push(message);
1199        }
1200        Ok(messages)
1201    }
1202
1203    /// Return the balance for a provided seed
1204    /// Addresses with balance must be consecutive, so this method will return once it encounters a zero
1205    /// balance address.
1206    pub fn get_balance<'a>(&'a self, seed: &'a Seed) -> GetBalanceBuilder<'a> {
1207        GetBalanceBuilder::new(self, seed)
1208    }
1209
1210    /// Return the balance in iota for the given addresses; No seed needed to do this since we are only checking and
1211    /// already know the addresses.
1212    pub async fn get_address_balances(&self, addresses: &[String]) -> Result<Vec<BalanceAddressResponse>> {
1213        let mut address_balance_pairs = Vec::new();
1214        for address in addresses {
1215            let balance_response = self.get_address().balance(address).await?;
1216            address_balance_pairs.push(balance_response);
1217        }
1218        Ok(address_balance_pairs)
1219    }
1220
1221    /// Transforms bech32 to hex
1222    pub fn bech32_to_hex(bech32: &str) -> crate::Result<String> {
1223        let address = Address::try_from_bech32(bech32)?;
1224        let Address::Ed25519(ed) = address;
1225        Ok(ed.to_string())
1226    }
1227
1228    /// Transforms a hex encoded address to a bech32 encoded address
1229    pub async fn hex_to_bech32(&self, hex: &str, bech32_hrp: Option<&str>) -> crate::Result<String> {
1230        let address: Ed25519Address = hex.parse::<Ed25519Address>()?;
1231        match bech32_hrp {
1232            Some(hrp) => Ok(Address::Ed25519(address).to_bech32(hrp)),
1233            None => Ok(Address::Ed25519(address).to_bech32(self.get_bech32_hrp().await?.as_str())),
1234        }
1235    }
1236
1237    /// Transforms a hex encoded public key to a bech32 encoded address
1238    pub async fn hex_public_key_to_bech32_address(&self, hex: &str, bech32_hrp: Option<&str>) -> crate::Result<String> {
1239        let mut public_key = [0u8; ED25519_ADDRESS_LENGTH];
1240        hex::decode_to_slice(hex, &mut public_key)?;
1241
1242        let address = Blake2b256::digest(&public_key)
1243            .try_into()
1244            .map_err(|_e| Error::Blake2b256Error("Hashing the public key failed."))?;
1245        let address: Ed25519Address = Ed25519Address::new(address);
1246        match bech32_hrp {
1247            Some(hrp) => Ok(Address::Ed25519(address).to_bech32(hrp)),
1248            None => Ok(Address::Ed25519(address).to_bech32(self.get_bech32_hrp().await?.as_str())),
1249        }
1250    }
1251
1252    /// Returns a valid Address parsed from a String.
1253    pub fn parse_bech32_address(address: &str) -> crate::Result<Address> {
1254        Ok(Address::try_from_bech32(address)?)
1255    }
1256
1257    /// Checks if a String is a valid bech32 encoded address.
1258    pub fn is_address_valid(address: &str) -> bool {
1259        Address::try_from_bech32(address).is_ok()
1260    }
1261
1262    /// Retries (promotes or reattaches) a message for provided message id. Message should only be
1263    /// retried only if they are valid and haven't been confirmed for a while.
1264    pub async fn retry(&self, message_id: &MessageId) -> Result<(MessageId, Message)> {
1265        // Get the metadata to check if it needs to promote or reattach
1266        let message_metadata = self.get_message().metadata(message_id).await?;
1267        if message_metadata.should_promote.unwrap_or(false) {
1268            self.promote_unchecked(message_id).await
1269        } else if message_metadata.should_reattach.unwrap_or(false) {
1270            self.reattach_unchecked(message_id).await
1271        } else {
1272            Err(Error::NoNeedPromoteOrReattach(message_id.to_string()))
1273        }
1274    }
1275
1276    /// Retries (promotes or reattaches) a message for provided message id until it's included (referenced by a
1277    /// milestone). Default interval is 5 seconds and max attempts is 40. Returns the included message at first position
1278    /// and additional reattached messages
1279    pub async fn retry_until_included(
1280        &self,
1281        message_id: &MessageId,
1282        interval: Option<u64>,
1283        max_attempts: Option<u64>,
1284    ) -> Result<Vec<(MessageId, Message)>> {
1285        // Attachments of the Message to check inclusion state
1286        let mut message_ids = vec![*message_id];
1287        // Reattached Messages that get returned
1288        let mut messages_with_id = Vec::new();
1289        for _ in 0..max_attempts.unwrap_or(40) {
1290            #[cfg(feature = "wasm")]
1291            {
1292                TimeoutFuture::new((interval.unwrap_or(5) * 1000).try_into().unwrap()).await;
1293            }
1294            #[cfg(not(feature = "wasm"))]
1295            sleep(Duration::from_secs(interval.unwrap_or(5))).await;
1296            // Check inclusion state for each attachment
1297            let message_ids_len = message_ids.len();
1298            let mut conflicting = false;
1299            for (index, msg_id) in message_ids.clone().iter().enumerate() {
1300                let message_metadata = self.get_message().metadata(msg_id).await?;
1301                if let Some(inclusion_state) = message_metadata.ledger_inclusion_state {
1302                    match inclusion_state {
1303                        LedgerInclusionStateDto::Included | LedgerInclusionStateDto::NoTransaction => {
1304                            // if original message, request it so we can return it on first position
1305                            if message_id == msg_id {
1306                                let mut included_and_reattached_messages =
1307                                    vec![(*message_id, self.get_message().data(message_id).await?)];
1308                                included_and_reattached_messages.extend(messages_with_id);
1309                                return Ok(included_and_reattached_messages);
1310                            } else {
1311                                // Move included message to first position
1312                                messages_with_id.rotate_left(index);
1313                                return Ok(messages_with_id);
1314                            }
1315                        }
1316                        // only set it as conflicting here and don't return, because another reattached message could
1317                        // have the included transaction
1318                        LedgerInclusionStateDto::Conflicting => conflicting = true,
1319                    };
1320                }
1321                // Only reattach or promote latest attachment of the message
1322                if index == message_ids_len - 1 {
1323                    if message_metadata.should_promote.unwrap_or(false) {
1324                        // Safe to unwrap since we iterate over it
1325                        self.promote_unchecked(message_ids.last().unwrap()).await?;
1326                    } else if message_metadata.should_reattach.unwrap_or(false) {
1327                        // Safe to unwrap since we iterate over it
1328                        let reattached = self.reattach_unchecked(message_ids.last().unwrap()).await?;
1329                        message_ids.push(reattached.0);
1330                        messages_with_id.push(reattached);
1331                    }
1332                }
1333            }
1334            // After we checked all our reattached messages, check if the transaction got reattached in another message
1335            // and confirmed
1336            if conflicting {
1337                let message = self.get_message().data(message_id).await?;
1338                if let Some(Payload::Transaction(transaction_payload)) = message.payload() {
1339                    let included_message = self.get_included_message(&transaction_payload.id()).await?;
1340                    let mut included_and_reattached_messages = vec![(included_message.id().0, included_message)];
1341                    included_and_reattached_messages.extend(messages_with_id);
1342                    return Ok(included_and_reattached_messages);
1343                }
1344            }
1345        }
1346        Err(Error::TangleInclusionError(message_id.to_string()))
1347    }
1348
1349    /// Function to consolidate all funds from a range of addresses to the address with the lowest index in that range
1350    /// Returns the address to which the funds got consolidated, if any were available
1351    pub async fn consolidate_funds(
1352        &self,
1353        seed: &Seed,
1354        account_index: usize,
1355        address_range: Range<usize>,
1356    ) -> crate::Result<String> {
1357        crate::api::consolidate_funds(self, seed, account_index, address_range).await
1358    }
1359}
1360
1361/// Hash the network id str from the nodeinfo to an u64 for the messageBuilder
1362pub fn hash_network(network_id_string: &str) -> Result<u64> {
1363    let bytes = Blake2b256::digest(network_id_string.as_bytes())[0..8]
1364        .try_into()
1365        .map_err(|_e| Error::Blake2b256Error("Hashing the network id failed."))?;
1366
1367    Ok(u64::from_le_bytes(bytes))
1368}