Skip to main content

hiero_sdk/
node_address_book_query.rs

1// SPDX-License-Identifier: Apache-2.0
2
3use std::time::Duration;
4
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_core::Stream;
8use futures_util::{
9    TryFutureExt,
10    TryStreamExt,
11};
12use hiero_sdk_proto::{
13    mirror,
14    services,
15};
16use mirror::network_service_client::NetworkServiceClient;
17use tonic::transport::Channel;
18use tonic::Response;
19
20use crate::mirror_query::{
21    AnyMirrorQueryData,
22    AnyMirrorQueryMessage,
23    MirrorRequest,
24};
25use crate::protobuf::FromProtobuf;
26use crate::{
27    AnyMirrorQueryResponse,
28    FileId,
29    MirrorQuery,
30    NodeAddress,
31    NodeAddressBook,
32    ToProtobuf,
33};
34
35// TODO: validate checksums after PR is merged
36
37/// Query for an address book and return its nodes.
38/// The nodes are returned in ascending order by node ID.
39pub type NodeAddressBookQuery = MirrorQuery<NodeAddressBookQueryData>;
40
41#[derive(Debug, Clone)]
42pub struct NodeAddressBookQueryData {
43    /// The ID of the address book file on the network.
44    /// Can either be `0.0.101` or `0.0.102`. Defaults to `0.0.102`.
45    file_id: FileId,
46
47    /// The maximum number of node addresses to receive.
48    /// Defaults to _all_.
49    limit: u32,
50
51    /// The shard and realm of the address book file on the network.
52    shard: Option<u64>,
53    realm: Option<u64>,
54}
55
56impl NodeAddressBookQueryData {
57    fn map_stream<'a, S>(stream: S) -> impl Stream<Item = crate::Result<NodeAddress>>
58    where
59        S: Stream<Item = crate::Result<services::NodeAddress>> + Send + 'a,
60    {
61        stream.and_then(|it| std::future::ready(NodeAddress::from_protobuf(it)))
62    }
63}
64
65impl Default for NodeAddressBookQueryData {
66    fn default() -> Self {
67        Self {
68            file_id: FileId::get_address_book_file_id_for(0, 0),
69            limit: 0,
70            shard: None,
71            realm: None,
72        }
73    }
74}
75
76impl NodeAddressBookQuery {
77    /// Returns the file ID of the address book file on the network.
78    #[must_use]
79    pub fn get_file_id(&self) -> FileId {
80        self.data.file_id
81    }
82
83    /// Sets the ID of the address book file on the network.
84    /// Can either be `0.0.101` or `0.0.102`. Defaults to `0.0.102`.
85    pub fn file_id(&mut self, id: impl Into<FileId>) -> &mut Self {
86        self.data.file_id = id.into();
87        self
88    }
89
90    /// Returns the configured limit of node addresses to receive.
91    #[must_use]
92    pub fn get_limit(&self) -> u32 {
93        self.data.limit
94    }
95
96    /// Sets the maximum number of node addresses to receive.
97    /// Defaults to _all_.
98    pub fn limit(&mut self, limit: u32) -> &mut Self {
99        self.data.limit = limit;
100        self
101    }
102
103    /// Sets the shard of the address book file on the network.
104    pub fn shard(&mut self, shard: u64) -> &mut Self {
105        self.data.shard = Some(shard);
106        self
107    }
108
109    /// Sets the realm of the address book file on the network.
110    pub fn realm(&mut self, realm: u64) -> &mut Self {
111        self.data.realm = Some(realm);
112        self
113    }
114}
115
116impl From<NodeAddressBookQueryData> for AnyMirrorQueryData {
117    fn from(data: NodeAddressBookQueryData) -> Self {
118        Self::NodeAddressBook(data)
119    }
120}
121
122impl MirrorRequest for NodeAddressBookQueryData {
123    type GrpcItem = services::NodeAddress;
124
125    type ConnectStream = tonic::Streaming<Self::GrpcItem>;
126
127    type Item = NodeAddress;
128
129    type Context = ();
130
131    type Response = NodeAddressBook;
132
133    type ItemStream<'a> = BoxStream<'a, crate::Result<NodeAddress>>;
134
135    fn connect(
136        &self,
137        _context: &Self::Context,
138        channel: Channel,
139    ) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>> {
140        Box::pin(async {
141            let file_id = if self.shard.is_some() && self.realm.is_some() {
142                FileId::get_address_book_file_id_for(self.shard.unwrap(), self.realm.unwrap())
143                    .to_protobuf()
144            } else {
145                FileId::get_address_book_file_id_for(0, 0).to_protobuf()
146            };
147
148            let request =
149                mirror::AddressBookQuery { file_id: Some(file_id), limit: self.limit as i32 };
150
151            NetworkServiceClient::new(channel).get_nodes(request).await.map(Response::into_inner)
152        })
153    }
154
155    fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
156    where
157        S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
158    {
159        Box::pin(Self::map_stream(stream))
160    }
161
162    fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
163    where
164        S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
165    {
166        // this doesn't reuse the work in `make_item_stream`
167        Box::pin(
168            Self::map_stream(stream)
169                .try_collect()
170                .map_ok(|addresses| NodeAddressBook { node_addresses: addresses }),
171        )
172    }
173
174    fn update_context(_context: &mut Self::Context, _item: &Self::GrpcItem) {}
175}
176
177impl From<NodeAddress> for AnyMirrorQueryMessage {
178    fn from(value: NodeAddress) -> Self {
179        Self::NodeAddressBook(value)
180    }
181}
182
183impl From<NodeAddressBook> for AnyMirrorQueryResponse {
184    fn from(value: NodeAddressBook) -> Self {
185        Self::NodeAddressBook(value)
186    }
187}
188
189impl NodeAddressBookQuery {
190    pub(crate) async fn execute_mirrornet(
191        &self,
192        channel: Channel,
193        timeout: Option<Duration>,
194    ) -> crate::Result<NodeAddressBook> {
195        let timeout = timeout.unwrap_or_else(|| {
196            std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
197        });
198
199        NodeAddressBookQueryData::try_collect(crate::mirror_query::subscribe(
200            channel,
201            timeout,
202            self.data.clone(),
203        ))
204        .await
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use crate::{
211        FileId,
212        NodeAddressBookQuery,
213    };
214
215    #[test]
216    fn get_set_file_id() {
217        let mut query = NodeAddressBookQuery::new();
218        query.file_id(FileId::new(0, 0, 1111));
219
220        assert_eq!(query.get_file_id(), FileId::new(0, 0, 1111));
221    }
222
223    #[test]
224    fn get_set_limit() {
225        let mut query = NodeAddressBookQuery::new();
226        query.limit(231);
227
228        assert_eq!(query.get_limit(), 231);
229    }
230}