hiero_sdk/
node_address_book_query.rs1use std::time::Duration;
4
5use futures_core::future::BoxFuture;
6use futures_core::stream::BoxStream;
7use futures_core::Stream;
8use futures_util::{
9 TryFutureExt,
10 TryStreamExt,
11};
12use hiero_sdk_proto::{
13 mirror,
14 services,
15};
16use mirror::network_service_client::NetworkServiceClient;
17use tonic::transport::Channel;
18use tonic::Response;
19
20use crate::mirror_query::{
21 AnyMirrorQueryData,
22 AnyMirrorQueryMessage,
23 MirrorRequest,
24};
25use crate::protobuf::FromProtobuf;
26use crate::{
27 AnyMirrorQueryResponse,
28 FileId,
29 MirrorQuery,
30 NodeAddress,
31 NodeAddressBook,
32 ToProtobuf,
33};
34
35pub type NodeAddressBookQuery = MirrorQuery<NodeAddressBookQueryData>;
40
41#[derive(Debug, Clone)]
42pub struct NodeAddressBookQueryData {
43 file_id: FileId,
46
47 limit: u32,
50
51 shard: Option<u64>,
53 realm: Option<u64>,
54}
55
56impl NodeAddressBookQueryData {
57 fn map_stream<'a, S>(stream: S) -> impl Stream<Item = crate::Result<NodeAddress>>
58 where
59 S: Stream<Item = crate::Result<services::NodeAddress>> + Send + 'a,
60 {
61 stream.and_then(|it| std::future::ready(NodeAddress::from_protobuf(it)))
62 }
63}
64
65impl Default for NodeAddressBookQueryData {
66 fn default() -> Self {
67 Self {
68 file_id: FileId::get_address_book_file_id_for(0, 0),
69 limit: 0,
70 shard: None,
71 realm: None,
72 }
73 }
74}
75
76impl NodeAddressBookQuery {
77 #[must_use]
79 pub fn get_file_id(&self) -> FileId {
80 self.data.file_id
81 }
82
83 pub fn file_id(&mut self, id: impl Into<FileId>) -> &mut Self {
86 self.data.file_id = id.into();
87 self
88 }
89
90 #[must_use]
92 pub fn get_limit(&self) -> u32 {
93 self.data.limit
94 }
95
96 pub fn limit(&mut self, limit: u32) -> &mut Self {
99 self.data.limit = limit;
100 self
101 }
102
103 pub fn shard(&mut self, shard: u64) -> &mut Self {
105 self.data.shard = Some(shard);
106 self
107 }
108
109 pub fn realm(&mut self, realm: u64) -> &mut Self {
111 self.data.realm = Some(realm);
112 self
113 }
114}
115
116impl From<NodeAddressBookQueryData> for AnyMirrorQueryData {
117 fn from(data: NodeAddressBookQueryData) -> Self {
118 Self::NodeAddressBook(data)
119 }
120}
121
122impl MirrorRequest for NodeAddressBookQueryData {
123 type GrpcItem = services::NodeAddress;
124
125 type ConnectStream = tonic::Streaming<Self::GrpcItem>;
126
127 type Item = NodeAddress;
128
129 type Context = ();
130
131 type Response = NodeAddressBook;
132
133 type ItemStream<'a> = BoxStream<'a, crate::Result<NodeAddress>>;
134
135 fn connect(
136 &self,
137 _context: &Self::Context,
138 channel: Channel,
139 ) -> BoxFuture<'_, tonic::Result<Self::ConnectStream>> {
140 Box::pin(async {
141 let file_id = if self.shard.is_some() && self.realm.is_some() {
142 FileId::get_address_book_file_id_for(self.shard.unwrap(), self.realm.unwrap())
143 .to_protobuf()
144 } else {
145 FileId::get_address_book_file_id_for(0, 0).to_protobuf()
146 };
147
148 let request =
149 mirror::AddressBookQuery { file_id: Some(file_id), limit: self.limit as i32 };
150
151 NetworkServiceClient::new(channel).get_nodes(request).await.map(Response::into_inner)
152 })
153 }
154
155 fn make_item_stream<'a, S>(stream: S) -> Self::ItemStream<'a>
156 where
157 S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
158 {
159 Box::pin(Self::map_stream(stream))
160 }
161
162 fn try_collect<'a, S>(stream: S) -> BoxFuture<'a, crate::Result<Self::Response>>
163 where
164 S: Stream<Item = crate::Result<Self::GrpcItem>> + Send + 'a,
165 {
166 Box::pin(
168 Self::map_stream(stream)
169 .try_collect()
170 .map_ok(|addresses| NodeAddressBook { node_addresses: addresses }),
171 )
172 }
173
174 fn update_context(_context: &mut Self::Context, _item: &Self::GrpcItem) {}
175}
176
177impl From<NodeAddress> for AnyMirrorQueryMessage {
178 fn from(value: NodeAddress) -> Self {
179 Self::NodeAddressBook(value)
180 }
181}
182
183impl From<NodeAddressBook> for AnyMirrorQueryResponse {
184 fn from(value: NodeAddressBook) -> Self {
185 Self::NodeAddressBook(value)
186 }
187}
188
189impl NodeAddressBookQuery {
190 pub(crate) async fn execute_mirrornet(
191 &self,
192 channel: Channel,
193 timeout: Option<Duration>,
194 ) -> crate::Result<NodeAddressBook> {
195 let timeout = timeout.unwrap_or_else(|| {
196 std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
197 });
198
199 NodeAddressBookQueryData::try_collect(crate::mirror_query::subscribe(
200 channel,
201 timeout,
202 self.data.clone(),
203 ))
204 .await
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use crate::{
211 FileId,
212 NodeAddressBookQuery,
213 };
214
215 #[test]
216 fn get_set_file_id() {
217 let mut query = NodeAddressBookQuery::new();
218 query.file_id(FileId::new(0, 0, 1111));
219
220 assert_eq!(query.get_file_id(), FileId::new(0, 0, 1111));
221 }
222
223 #[test]
224 fn get_set_limit() {
225 let mut query = NodeAddressBookQuery::new();
226 query.limit(231);
227
228 assert_eq!(query.get_limit(), 231);
229 }
230}