1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

#[cfg(not(target_family = "wasm"))]
use {
    crate::NetworkInfo,
    iota_types::{api::response::InfoResponse, block::protocol::ProtocolParameters},
    std::collections::HashMap,
    std::{
        collections::HashSet,
        sync::{Arc, RwLock},
        time::Duration,
    },
    tokio::{runtime::Runtime, sync::broadcast::Receiver, time::sleep},
};

use super::Node;
use crate::{Client, Error, Result};

impl Client {
    /// Get a node candidate from the healthy node pool.
    pub fn get_node(&self) -> Result<Node> {
        if let Some(primary_node) = &self.node_manager.primary_node {
            return Ok(primary_node.clone());
        }

        let pool = self.node_manager.nodes.clone();

        pool.into_iter().next().ok_or(Error::HealthyNodePoolEmpty)
    }

    /// returns the unhealthy nodes.
    #[cfg(not(target_family = "wasm"))]
    pub fn unhealthy_nodes(&self) -> HashSet<&Node> {
        self.node_manager
            .healthy_nodes
            .read()
            .map_or(HashSet::new(), |healthy_nodes| {
                self.node_manager
                    .nodes
                    .iter()
                    .filter(|node| !healthy_nodes.contains_key(node))
                    .collect()
            })
    }

    /// Sync the node lists per node_sync_interval milliseconds
    #[cfg(not(target_family = "wasm"))]
    pub(crate) fn start_sync_process(
        runtime: &Runtime,
        sync: Arc<RwLock<HashMap<Node, InfoResponse>>>,
        nodes: HashSet<Node>,
        node_sync_interval: Duration,
        network_info: Arc<RwLock<NetworkInfo>>,
        mut kill: Receiver<()>,
    ) {
        runtime.spawn(async move {
            loop {
                tokio::select! {
                    _ = async {
                        // delay first since the first `sync_nodes` call is made by the builder
                        // to ensure the node list is filled before the client is used
                        sleep(node_sync_interval).await;
                        if let Err(e) = Client::sync_nodes(&sync, &nodes, &network_info).await {
                            log::warn!("Syncing nodes failed: {e}");
                        }
                    } => {}
                    _ = kill.recv() => {}
                }
            }
        });
    }

    #[cfg(not(target_family = "wasm"))]
    pub(crate) async fn sync_nodes(
        sync: &Arc<RwLock<HashMap<Node, InfoResponse>>>,
        nodes: &HashSet<Node>,
        network_info: &Arc<RwLock<NetworkInfo>>,
    ) -> Result<()> {
        log::debug!("sync_nodes");
        let mut healthy_nodes = HashMap::new();
        let mut network_nodes: HashMap<String, Vec<(InfoResponse, Node)>> = HashMap::new();

        for node in nodes {
            // Put the healthy node url into the network_nodes
            if let Ok(info) = Client::get_node_info(node.url.as_ref(), None).await {
                if info.status.is_healthy {
                    match network_nodes.get_mut(&info.protocol.network_name) {
                        Some(network_node_entry) => {
                            network_node_entry.push((info, node.clone()));
                        }
                        None => {
                            network_nodes.insert(info.protocol.network_name.clone(), vec![(info, node.clone())]);
                        }
                    }
                } else {
                    log::debug!("{} is not healthy: {:?}", node.url, info);
                }
            } else {
                log::error!("Couldn't get the node info from {}", node.url);
            }
        }

        // Get network_id with the most nodes
        let mut most_nodes = ("network_id", 0);
        for (network_id, node) in &network_nodes {
            if node.len() > most_nodes.1 {
                most_nodes.0 = network_id;
                most_nodes.1 = node.len();
            }
        }

        if let Some(nodes) = network_nodes.get(most_nodes.0) {
            if let Some((info, _node_url)) = nodes.first() {
                let mut network_info = network_info.write().map_err(|_| crate::Error::PoisonError)?;

                network_info.latest_milestone_timestamp = info.status.latest_milestone.timestamp;
                network_info.protocol_parameters = ProtocolParameters::try_from(info.protocol.clone())?;
            }

            for (info, node_url) in nodes {
                healthy_nodes.insert(node_url.clone(), info.clone());
            }
        }

        // Update the sync list.
        *sync.write().map_err(|_| crate::Error::PoisonError)? = healthy_nodes;

        Ok(())
    }
}