ih_muse_client/
poet_client.rs

1// crates/ih-muse-client/src/poet_client.rs
2
3use std::net::SocketAddr;
4
5use async_trait::async_trait;
6use reqwest::{Client, StatusCode};
7
8use ih_muse_core::{MuseError, MuseResult, Transport};
9use ih_muse_proto::*;
10
11pub struct PoetClient {
12    client: Client,
13    endpoints: Vec<String>,
14    // You can add cache_strategy here if needed in the future
15}
16
17impl PoetClient {
18    pub fn new(endpoints: &[String]) -> Self {
19        let client = Client::new();
20        Self {
21            client,
22            endpoints: endpoints.to_vec(),
23        }
24    }
25
26    fn get_base_url(&self) -> &str {
27        // TODO rotate endpoints on failure
28        self.endpoints.first().unwrap()
29    }
30
31    /// Returns the base URL or constructs it from `node_addr` if provided.
32    fn build_url(&self, path: &str, node_addr: Option<SocketAddr>) -> String {
33        match node_addr {
34            Some(addr) => format!("http://{}{}", addr, path),
35            None => format!("{}{}", self.get_base_url(), path),
36        }
37    }
38}
39
40#[async_trait]
41impl Transport for PoetClient {
42    async fn health_check(&self) -> MuseResult<()> {
43        let url = format!("{}/health", self.get_base_url());
44        let response = self
45            .client
46            .get(&url)
47            .send()
48            .await
49            .map_err(|e| MuseError::Client(format!("Failed to perform health check: {}", e)))?;
50
51        if response.status().is_success() {
52            Ok(())
53        } else {
54            Err(MuseError::Client(format!(
55                "Health check failed: HTTP {}",
56                response.status()
57            )))
58        }
59    }
60
61    async fn get_node_state(&self) -> MuseResult<NodeState> {
62        let url = format!("{}/sync/state", self.get_base_url());
63        let response = self
64            .client
65            .get(&url)
66            .send()
67            .await
68            .map_err(|e| MuseError::Client(format!("Failed to retrieve node state: {}", e)))?;
69
70        if response.status().is_success() {
71            let resp_haikus: NodeState = response.json().await.map_err(|e| {
72                MuseError::Client(format!("Failed to parse response as NodeState: {e}"))
73            })?;
74            Ok(resp_haikus)
75        } else {
76            Err(MuseError::Client(format!(
77                "Get Finest Resolution failed: {}",
78                response.status()
79            )))
80        }
81    }
82
83    async fn get_finest_resolution(&self) -> MuseResult<TimestampResolution> {
84        let url = format!("{}/config/finest_resolution", self.get_base_url());
85        let response = self
86            .client
87            .get(&url)
88            .send()
89            .await
90            .map_err(|e| MuseError::Client(format!("Failed to perform health check: {}", e)))?;
91
92        if response.status().is_success() {
93            let resp_haikus: TimestampResolution = response.json().await.map_err(|e| {
94                MuseError::Client(format!(
95                    "Failed to parse response as TimestampResolution: {e}"
96                ))
97            })?;
98            Ok(resp_haikus)
99        } else {
100            Err(MuseError::Client(format!(
101                "Get Finest Resolution failed: {}",
102                response.status()
103            )))
104        }
105    }
106
107    async fn get_node_elem_ranges(
108        &self,
109        ini: Option<u64>,
110        end: Option<u64>,
111    ) -> MuseResult<Vec<NodeElementRange>> {
112        let url = format!("{}/ds/elements/ranges", self.get_base_url());
113        let response = self
114            .client
115            .get(&url)
116            .json(&GetRangesRequest { ini, end })
117            .send()
118            .await
119            .map_err(|e| MuseError::Client(format!("Failed to retrieve node state: {}", e)))?;
120
121        if response.status().is_success() {
122            let ranges: Vec<NodeElementRange> = response.json().await.map_err(|e| {
123                MuseError::Client(format!(
124                    "Failed to parse response as Vec<NodeElementRange>: {e}"
125                ))
126            })?;
127            Ok(ranges)
128        } else {
129            Err(MuseError::Client(format!(
130                "Get All Element Ranges failed: {}",
131                response.status()
132            )))
133        }
134    }
135
136    async fn register_metrics(&self, payload: &[MetricDefinition]) -> MuseResult<()> {
137        let url = format!("{}/ds/metrics", self.get_base_url());
138        let response = self
139            .client
140            .post(&url)
141            .json(payload)
142            .send()
143            .await
144            .map_err(|e| MuseError::Client(format!("Failed to send metric: {e}")))?;
145
146        if response.status().is_success() {
147            Ok(())
148        } else {
149            Err(MuseError::Client(format!(
150                "Failed to send metric: HTTP {}",
151                response.status()
152            )))
153        }
154    }
155
156    async fn get_metric_order(&self) -> MuseResult<Vec<MetricDefinition>> {
157        let url = format!("{}/ds/metrics", self.get_base_url());
158        let response = self
159            .client
160            .get(&url)
161            .send()
162            .await
163            .map_err(|e| MuseError::Client(format!("Failed to send metric: {}", e)))?;
164
165        if response.status().is_success() {
166            let metric_defs: Vec<MetricDefinition> = response.json().await.map_err(|e| {
167                MuseError::Client(format!(
168                    "Failed to parse response as Vec<MetricDefinition>: {e}"
169                ))
170            })?;
171            Ok(metric_defs)
172        } else {
173            Err(MuseError::Client(format!(
174                "Failed to send metric: HTTP {}",
175                response.status()
176            )))
177        }
178    }
179
180    async fn send_metrics(
181        &self,
182        payload: Vec<MetricPayload>,
183        node_addr: Option<SocketAddr>,
184    ) -> MuseResult<()> {
185        let url = self.build_url("/ds/abs_metrics", node_addr);
186        let response = self
187            .client
188            .post(&url)
189            .json(&payload)
190            .send()
191            .await
192            .map_err(|e| MuseError::Client(format!("Failed to send metric: {}", e)))?;
193
194        if response.status().is_success() {
195            Ok(())
196        } else {
197            Err(MuseError::Client(format!(
198                "Failed to send metric: HTTP {}",
199                response.status()
200            )))
201        }
202    }
203
204    async fn get_metrics(
205        &self,
206        query: &MetricQuery,
207        node_addr: Option<SocketAddr>,
208    ) -> MuseResult<Vec<MetricPayload>> {
209        let url = self.build_url("/ds/abs_metrics", node_addr);
210        let response = self
211            .client
212            .get(&url)
213            .json(query)
214            .send()
215            .await
216            .map_err(|e| MuseError::Client(format!("Failed to get metrics: {}", e)))?;
217
218        if response.status().is_success() {
219            let metrics: Vec<MetricPayload> = response.json().await.map_err(|e| {
220                MuseError::Client(format!("Failed to parse metrics response: {}", e))
221            })?;
222            Ok(metrics)
223        } else {
224            Err(MuseError::Client(format!(
225                "Failed to get metrics: HTTP {}",
226                response.status()
227            )))
228        }
229    }
230
231    async fn register_elements(
232        &self,
233        elements: &[ElementRegistration],
234    ) -> MuseResult<Vec<MuseResult<ElementId>>> {
235        let url = format!("{}/ds/elements", self.get_base_url());
236        let response = self
237            .client
238            .post(&url)
239            .json(elements)
240            .send()
241            .await
242            .map_err(|e| MuseError::Client(format!("Failed to register elements: {}", e)))?;
243
244        match response.status() {
245            StatusCode::CREATED | StatusCode::MULTI_STATUS | StatusCode::BAD_REQUEST => {
246                // Deserialize response as `NewElementsResponse` for all relevant cases
247                let response_data: NewElementsResponse = response
248                    .json()
249                    .await
250                    .map_err(|e| MuseError::Client(format!("Failed to parse response: {}", e)))?;
251
252                // Convert Vec<Result<u64, String>> to Vec<Result<ElementId, Error>>
253                let results = response_data
254                    .results
255                    .into_iter()
256                    .map(|res| res.map_err(MuseError::Client))
257                    .collect();
258
259                Ok(results)
260            }
261            status => {
262                // Handle any unexpected HTTP status codes
263                Err(MuseError::Client(format!(
264                    "Failed to register elements: HTTP {}",
265                    status
266                )))
267            }
268        }
269    }
270
271    async fn register_element_kinds(
272        &self,
273        element_kind: &[ElementKindRegistration],
274    ) -> MuseResult<()> {
275        let url = format!("{}/ds/element_kinds", self.get_base_url());
276        let response = self
277            .client
278            .post(&url)
279            .json(element_kind)
280            .send()
281            .await
282            .map_err(|e| MuseError::Client(format!("Failed to register element kind: {}", e)))?;
283
284        if response.status().is_success() {
285            Ok(())
286        } else {
287            Err(MuseError::Client(format!(
288                "Failed to register element kind: HTTP {}",
289                response.status()
290            )))
291        }
292    }
293}