ih_muse_client/
poet_client.rs1use std::net::SocketAddr;
4
5use async_trait::async_trait;
6use reqwest::{Client, StatusCode};
7
8use ih_muse_core::{MuseError, MuseResult, Transport};
9use ih_muse_proto::*;
10
11pub struct PoetClient {
12 client: Client,
13 endpoints: Vec<String>,
14 }
16
17impl PoetClient {
18 pub fn new(endpoints: &[String]) -> Self {
19 let client = Client::new();
20 Self {
21 client,
22 endpoints: endpoints.to_vec(),
23 }
24 }
25
26 fn get_base_url(&self) -> &str {
27 self.endpoints.first().unwrap()
29 }
30
31 fn build_url(&self, path: &str, node_addr: Option<SocketAddr>) -> String {
33 match node_addr {
34 Some(addr) => format!("http://{}{}", addr, path),
35 None => format!("{}{}", self.get_base_url(), path),
36 }
37 }
38}
39
40#[async_trait]
41impl Transport for PoetClient {
42 async fn health_check(&self) -> MuseResult<()> {
43 let url = format!("{}/health", self.get_base_url());
44 let response = self
45 .client
46 .get(&url)
47 .send()
48 .await
49 .map_err(|e| MuseError::Client(format!("Failed to perform health check: {}", e)))?;
50
51 if response.status().is_success() {
52 Ok(())
53 } else {
54 Err(MuseError::Client(format!(
55 "Health check failed: HTTP {}",
56 response.status()
57 )))
58 }
59 }
60
61 async fn get_node_state(&self) -> MuseResult<NodeState> {
62 let url = format!("{}/sync/state", self.get_base_url());
63 let response = self
64 .client
65 .get(&url)
66 .send()
67 .await
68 .map_err(|e| MuseError::Client(format!("Failed to retrieve node state: {}", e)))?;
69
70 if response.status().is_success() {
71 let resp_haikus: NodeState = response.json().await.map_err(|e| {
72 MuseError::Client(format!("Failed to parse response as NodeState: {e}"))
73 })?;
74 Ok(resp_haikus)
75 } else {
76 Err(MuseError::Client(format!(
77 "Get Finest Resolution failed: {}",
78 response.status()
79 )))
80 }
81 }
82
83 async fn get_finest_resolution(&self) -> MuseResult<TimestampResolution> {
84 let url = format!("{}/config/finest_resolution", self.get_base_url());
85 let response = self
86 .client
87 .get(&url)
88 .send()
89 .await
90 .map_err(|e| MuseError::Client(format!("Failed to perform health check: {}", e)))?;
91
92 if response.status().is_success() {
93 let resp_haikus: TimestampResolution = response.json().await.map_err(|e| {
94 MuseError::Client(format!(
95 "Failed to parse response as TimestampResolution: {e}"
96 ))
97 })?;
98 Ok(resp_haikus)
99 } else {
100 Err(MuseError::Client(format!(
101 "Get Finest Resolution failed: {}",
102 response.status()
103 )))
104 }
105 }
106
107 async fn get_node_elem_ranges(
108 &self,
109 ini: Option<u64>,
110 end: Option<u64>,
111 ) -> MuseResult<Vec<NodeElementRange>> {
112 let url = format!("{}/ds/elements/ranges", self.get_base_url());
113 let response = self
114 .client
115 .get(&url)
116 .json(&GetRangesRequest { ini, end })
117 .send()
118 .await
119 .map_err(|e| MuseError::Client(format!("Failed to retrieve node state: {}", e)))?;
120
121 if response.status().is_success() {
122 let ranges: Vec<NodeElementRange> = response.json().await.map_err(|e| {
123 MuseError::Client(format!(
124 "Failed to parse response as Vec<NodeElementRange>: {e}"
125 ))
126 })?;
127 Ok(ranges)
128 } else {
129 Err(MuseError::Client(format!(
130 "Get All Element Ranges failed: {}",
131 response.status()
132 )))
133 }
134 }
135
136 async fn register_metrics(&self, payload: &[MetricDefinition]) -> MuseResult<()> {
137 let url = format!("{}/ds/metrics", self.get_base_url());
138 let response = self
139 .client
140 .post(&url)
141 .json(payload)
142 .send()
143 .await
144 .map_err(|e| MuseError::Client(format!("Failed to send metric: {e}")))?;
145
146 if response.status().is_success() {
147 Ok(())
148 } else {
149 Err(MuseError::Client(format!(
150 "Failed to send metric: HTTP {}",
151 response.status()
152 )))
153 }
154 }
155
156 async fn get_metric_order(&self) -> MuseResult<Vec<MetricDefinition>> {
157 let url = format!("{}/ds/metrics", self.get_base_url());
158 let response = self
159 .client
160 .get(&url)
161 .send()
162 .await
163 .map_err(|e| MuseError::Client(format!("Failed to send metric: {}", e)))?;
164
165 if response.status().is_success() {
166 let metric_defs: Vec<MetricDefinition> = response.json().await.map_err(|e| {
167 MuseError::Client(format!(
168 "Failed to parse response as Vec<MetricDefinition>: {e}"
169 ))
170 })?;
171 Ok(metric_defs)
172 } else {
173 Err(MuseError::Client(format!(
174 "Failed to send metric: HTTP {}",
175 response.status()
176 )))
177 }
178 }
179
180 async fn send_metrics(
181 &self,
182 payload: Vec<MetricPayload>,
183 node_addr: Option<SocketAddr>,
184 ) -> MuseResult<()> {
185 let url = self.build_url("/ds/abs_metrics", node_addr);
186 let response = self
187 .client
188 .post(&url)
189 .json(&payload)
190 .send()
191 .await
192 .map_err(|e| MuseError::Client(format!("Failed to send metric: {}", e)))?;
193
194 if response.status().is_success() {
195 Ok(())
196 } else {
197 Err(MuseError::Client(format!(
198 "Failed to send metric: HTTP {}",
199 response.status()
200 )))
201 }
202 }
203
204 async fn get_metrics(
205 &self,
206 query: &MetricQuery,
207 node_addr: Option<SocketAddr>,
208 ) -> MuseResult<Vec<MetricPayload>> {
209 let url = self.build_url("/ds/abs_metrics", node_addr);
210 let response = self
211 .client
212 .get(&url)
213 .json(query)
214 .send()
215 .await
216 .map_err(|e| MuseError::Client(format!("Failed to get metrics: {}", e)))?;
217
218 if response.status().is_success() {
219 let metrics: Vec<MetricPayload> = response.json().await.map_err(|e| {
220 MuseError::Client(format!("Failed to parse metrics response: {}", e))
221 })?;
222 Ok(metrics)
223 } else {
224 Err(MuseError::Client(format!(
225 "Failed to get metrics: HTTP {}",
226 response.status()
227 )))
228 }
229 }
230
231 async fn register_elements(
232 &self,
233 elements: &[ElementRegistration],
234 ) -> MuseResult<Vec<MuseResult<ElementId>>> {
235 let url = format!("{}/ds/elements", self.get_base_url());
236 let response = self
237 .client
238 .post(&url)
239 .json(elements)
240 .send()
241 .await
242 .map_err(|e| MuseError::Client(format!("Failed to register elements: {}", e)))?;
243
244 match response.status() {
245 StatusCode::CREATED | StatusCode::MULTI_STATUS | StatusCode::BAD_REQUEST => {
246 let response_data: NewElementsResponse = response
248 .json()
249 .await
250 .map_err(|e| MuseError::Client(format!("Failed to parse response: {}", e)))?;
251
252 let results = response_data
254 .results
255 .into_iter()
256 .map(|res| res.map_err(MuseError::Client))
257 .collect();
258
259 Ok(results)
260 }
261 status => {
262 Err(MuseError::Client(format!(
264 "Failed to register elements: HTTP {}",
265 status
266 )))
267 }
268 }
269 }
270
271 async fn register_element_kinds(
272 &self,
273 element_kind: &[ElementKindRegistration],
274 ) -> MuseResult<()> {
275 let url = format!("{}/ds/element_kinds", self.get_base_url());
276 let response = self
277 .client
278 .post(&url)
279 .json(element_kind)
280 .send()
281 .await
282 .map_err(|e| MuseError::Client(format!("Failed to register element kind: {}", e)))?;
283
284 if response.status().is_success() {
285 Ok(())
286 } else {
287 Err(MuseError::Client(format!(
288 "Failed to register element kind: HTTP {}",
289 response.status()
290 )))
291 }
292 }
293}