enya_client/otlp/
http_metrics_client.rs1use poll_promise::Promise;
7
8use crate::error::ClientError;
9use crate::normalize_url;
10use crate::now_unix_secs;
11use crate::promise::promise_channel;
12use crate::url_encode;
13use crate::{
14 BackendInfo, HealthCheckResult, LabelsResult, MetricLabelsResult, MetricsClient, QueryRequest,
15 QueryResult,
16};
17
18pub struct OtlpHttpMetricsClient {
22 base_url: String,
23 http_client: reqwest::Client,
24 #[cfg(not(target_arch = "wasm32"))]
25 runtime_handle: tokio::runtime::Handle,
26}
27
28impl OtlpHttpMetricsClient {
29 #[must_use]
35 pub fn new(base_url: impl Into<String>) -> Self {
36 Self {
37 base_url: normalize_url(base_url),
38 http_client: reqwest::Client::new(),
39 #[cfg(not(target_arch = "wasm32"))]
40 runtime_handle: tokio::runtime::Handle::current(),
41 }
42 }
43
44 #[cfg(not(target_arch = "wasm32"))]
46 #[must_use]
47 pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
48 Self {
49 base_url: normalize_url(base_url),
50 http_client: reqwest::Client::new(),
51 runtime_handle: handle,
52 }
53 }
54
55 #[cfg(not(target_arch = "wasm32"))]
56 fn spawn<F>(&self, future: F)
57 where
58 F: std::future::Future<Output = ()> + Send + 'static,
59 {
60 self.runtime_handle.spawn(future);
61 }
62
63 #[cfg(target_arch = "wasm32")]
64 fn spawn<F>(&self, future: F)
65 where
66 F: std::future::Future<Output = ()> + 'static,
67 {
68 wasm_bindgen_futures::spawn_local(future);
69 }
70}
71
72impl MetricsClient for OtlpHttpMetricsClient {
73 fn query(&self, request: QueryRequest, ctx: &egui::Context) -> Promise<QueryResult> {
74 let now_ns = now_unix_secs() as u128 * 1_000_000_000;
75 let start_ns = request.start.unwrap_or(now_ns - 3_600_000_000_000);
76 let end_ns = request.end.unwrap_or(now_ns);
77 let step_ns = request.step_secs as u128 * 1_000_000_000;
78
79 let url = format!(
80 "{}/api/otlp/metrics/query?metric={}&start_ns={}&end_ns={}&step_ns={}",
81 self.base_url,
82 url_encode(&request.metric),
83 start_ns,
84 end_ns,
85 step_ns,
86 );
87
88 let (sender, promise) = promise_channel();
89 let ctx = ctx.clone();
90 let client = self.http_client.clone();
91
92 self.spawn(async move {
93 let result = fetch_json(&client, &url).await;
94 sender.send(result);
95 ctx.request_repaint();
96 });
97
98 promise
99 }
100
101 fn fetch_label_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
102 let url = format!("{}/api/otlp/metrics/labels", self.base_url);
103
104 let (sender, promise) = promise_channel();
105 let ctx = ctx.clone();
106 let client = self.http_client.clone();
107
108 self.spawn(async move {
109 let result = fetch_json(&client, &url).await;
110 sender.send(result);
111 ctx.request_repaint();
112 });
113
114 promise
115 }
116
117 fn fetch_label_values(&self, label: &str, ctx: &egui::Context) -> Promise<LabelsResult> {
118 let url = format!(
119 "{}/api/otlp/metrics/label_values?label={}",
120 self.base_url,
121 url_encode(label)
122 );
123
124 let (sender, promise) = promise_channel();
125 let ctx = ctx.clone();
126 let client = self.http_client.clone();
127
128 self.spawn(async move {
129 let result = fetch_json(&client, &url).await;
130 sender.send(result);
131 ctx.request_repaint();
132 });
133
134 promise
135 }
136
137 fn fetch_metric_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
138 let url = format!("{}/api/otlp/metrics/names", self.base_url);
139
140 let (sender, promise) = promise_channel();
141 let ctx = ctx.clone();
142 let client = self.http_client.clone();
143
144 self.spawn(async move {
145 let result = fetch_json(&client, &url).await;
146 sender.send(result);
147 ctx.request_repaint();
148 });
149
150 promise
151 }
152
153 fn fetch_metric_labels(
154 &self,
155 metric: &str,
156 ctx: &egui::Context,
157 ) -> Promise<MetricLabelsResult> {
158 let url = format!(
159 "{}/api/otlp/metrics/metric_labels?metric={}",
160 self.base_url,
161 url_encode(metric)
162 );
163
164 let (sender, promise) = promise_channel();
165 let ctx = ctx.clone();
166 let client = self.http_client.clone();
167
168 self.spawn(async move {
169 let result = fetch_json(&client, &url).await;
170 sender.send(result);
171 ctx.request_repaint();
172 });
173
174 promise
175 }
176
177 fn backend_type(&self) -> &'static str {
178 "otlp"
179 }
180
181 fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
182 let url = format!("{}/api/otlp/health", self.base_url);
183
184 let (sender, promise) = promise_channel();
185 let ctx = ctx.clone();
186 let client = self.http_client.clone();
187
188 self.spawn(async move {
189 let result: Result<BackendInfo, ClientError> = fetch_json(&client, &url).await;
190 sender.send(result);
191 ctx.request_repaint();
192 });
193
194 promise
195 }
196}
197
198async fn fetch_json<T: serde::de::DeserializeOwned>(
200 client: &reqwest::Client,
201 url: &str,
202) -> Result<T, ClientError> {
203 match client.get(url).send().await {
204 Ok(response) => {
205 let status = response.status();
206 if status.is_success() {
207 match response.bytes().await {
208 Ok(bytes) => serde_json::from_slice::<T>(&bytes)
209 .map_err(|e| ClientError::ParseError(e.to_string())),
210 Err(e) => Err(ClientError::NetworkError(e.to_string())),
211 }
212 } else {
213 Err(ClientError::BackendError {
214 status: status.as_u16(),
215 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
216 })
217 }
218 }
219 Err(e) => Err(ClientError::NetworkError(e.to_string())),
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 fn with_runtime<F: FnOnce()>(f: F) {
228 let rt = tokio::runtime::Runtime::new().unwrap();
229 let _guard = rt.enter();
230 f();
231 }
232
233 #[test]
234 fn test_new_normalizes_url() {
235 with_runtime(|| {
236 let client = OtlpHttpMetricsClient::new("localhost:3030/");
237 assert_eq!(client.base_url, "http://localhost:3030");
238 });
239 }
240
241 #[test]
242 fn test_backend_type() {
243 with_runtime(|| {
244 let client = OtlpHttpMetricsClient::new("http://localhost:3030");
245 assert_eq!(client.backend_type(), "otlp");
246 });
247 }
248}