Skip to main content

enya_client/otlp/
http_metrics_client.rs

1//! HTTP-based OTLP metrics client.
2//!
3//! Queries the agent daemon's OTLP metrics store over HTTP, following the same
4//! promise-based async pattern as [`PrometheusClient`](crate::prometheus::PrometheusClient).
5
6use poll_promise::Promise;
7
8use crate::error::ClientError;
9use crate::normalize_url;
10use crate::now_unix_secs;
11use crate::promise::promise_channel;
12use crate::url_encode;
13use crate::{
14    BackendInfo, HealthCheckResult, LabelsResult, MetricLabelsResult, MetricsClient, QueryRequest,
15    QueryResult,
16};
17
18/// HTTP client for querying the agent's OTLP metrics store.
19///
20/// Sends GET requests to `/api/otlp/metrics/*` endpoints on the agent daemon.
21pub struct OtlpHttpMetricsClient {
22    base_url: String,
23    http_client: reqwest::Client,
24    #[cfg(not(target_arch = "wasm32"))]
25    runtime_handle: tokio::runtime::Handle,
26}
27
28impl OtlpHttpMetricsClient {
29    /// Create a new OTLP HTTP metrics client.
30    ///
31    /// # Panics
32    ///
33    /// On native, panics if called outside a tokio runtime context.
34    #[must_use]
35    pub fn new(base_url: impl Into<String>) -> Self {
36        Self {
37            base_url: normalize_url(base_url),
38            http_client: reqwest::Client::new(),
39            #[cfg(not(target_arch = "wasm32"))]
40            runtime_handle: tokio::runtime::Handle::current(),
41        }
42    }
43
44    /// Create a new client with an explicit runtime handle.
45    #[cfg(not(target_arch = "wasm32"))]
46    #[must_use]
47    pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
48        Self {
49            base_url: normalize_url(base_url),
50            http_client: reqwest::Client::new(),
51            runtime_handle: handle,
52        }
53    }
54
55    #[cfg(not(target_arch = "wasm32"))]
56    fn spawn<F>(&self, future: F)
57    where
58        F: std::future::Future<Output = ()> + Send + 'static,
59    {
60        self.runtime_handle.spawn(future);
61    }
62
63    #[cfg(target_arch = "wasm32")]
64    fn spawn<F>(&self, future: F)
65    where
66        F: std::future::Future<Output = ()> + 'static,
67    {
68        wasm_bindgen_futures::spawn_local(future);
69    }
70}
71
72impl MetricsClient for OtlpHttpMetricsClient {
73    fn query(&self, request: QueryRequest, ctx: &egui::Context) -> Promise<QueryResult> {
74        let now_ns = now_unix_secs() as u128 * 1_000_000_000;
75        let start_ns = request.start.unwrap_or(now_ns - 3_600_000_000_000);
76        let end_ns = request.end.unwrap_or(now_ns);
77        let step_ns = request.step_secs as u128 * 1_000_000_000;
78
79        let url = format!(
80            "{}/api/otlp/metrics/query?metric={}&start_ns={}&end_ns={}&step_ns={}",
81            self.base_url,
82            url_encode(&request.metric),
83            start_ns,
84            end_ns,
85            step_ns,
86        );
87
88        let (sender, promise) = promise_channel();
89        let ctx = ctx.clone();
90        let client = self.http_client.clone();
91
92        self.spawn(async move {
93            let result = fetch_json(&client, &url).await;
94            sender.send(result);
95            ctx.request_repaint();
96        });
97
98        promise
99    }
100
101    fn fetch_label_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
102        let url = format!("{}/api/otlp/metrics/labels", self.base_url);
103
104        let (sender, promise) = promise_channel();
105        let ctx = ctx.clone();
106        let client = self.http_client.clone();
107
108        self.spawn(async move {
109            let result = fetch_json(&client, &url).await;
110            sender.send(result);
111            ctx.request_repaint();
112        });
113
114        promise
115    }
116
117    fn fetch_label_values(&self, label: &str, ctx: &egui::Context) -> Promise<LabelsResult> {
118        let url = format!(
119            "{}/api/otlp/metrics/label_values?label={}",
120            self.base_url,
121            url_encode(label)
122        );
123
124        let (sender, promise) = promise_channel();
125        let ctx = ctx.clone();
126        let client = self.http_client.clone();
127
128        self.spawn(async move {
129            let result = fetch_json(&client, &url).await;
130            sender.send(result);
131            ctx.request_repaint();
132        });
133
134        promise
135    }
136
137    fn fetch_metric_names(&self, ctx: &egui::Context) -> Promise<LabelsResult> {
138        let url = format!("{}/api/otlp/metrics/names", self.base_url);
139
140        let (sender, promise) = promise_channel();
141        let ctx = ctx.clone();
142        let client = self.http_client.clone();
143
144        self.spawn(async move {
145            let result = fetch_json(&client, &url).await;
146            sender.send(result);
147            ctx.request_repaint();
148        });
149
150        promise
151    }
152
153    fn fetch_metric_labels(
154        &self,
155        metric: &str,
156        ctx: &egui::Context,
157    ) -> Promise<MetricLabelsResult> {
158        let url = format!(
159            "{}/api/otlp/metrics/metric_labels?metric={}",
160            self.base_url,
161            url_encode(metric)
162        );
163
164        let (sender, promise) = promise_channel();
165        let ctx = ctx.clone();
166        let client = self.http_client.clone();
167
168        self.spawn(async move {
169            let result = fetch_json(&client, &url).await;
170            sender.send(result);
171            ctx.request_repaint();
172        });
173
174        promise
175    }
176
177    fn backend_type(&self) -> &'static str {
178        "otlp"
179    }
180
181    fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
182        let url = format!("{}/api/otlp/health", self.base_url);
183
184        let (sender, promise) = promise_channel();
185        let ctx = ctx.clone();
186        let client = self.http_client.clone();
187
188        self.spawn(async move {
189            let result: Result<BackendInfo, ClientError> = fetch_json(&client, &url).await;
190            sender.send(result);
191            ctx.request_repaint();
192        });
193
194        promise
195    }
196}
197
198/// Generic JSON fetch helper.
199async fn fetch_json<T: serde::de::DeserializeOwned>(
200    client: &reqwest::Client,
201    url: &str,
202) -> Result<T, ClientError> {
203    match client.get(url).send().await {
204        Ok(response) => {
205            let status = response.status();
206            if status.is_success() {
207                match response.bytes().await {
208                    Ok(bytes) => serde_json::from_slice::<T>(&bytes)
209                        .map_err(|e| ClientError::ParseError(e.to_string())),
210                    Err(e) => Err(ClientError::NetworkError(e.to_string())),
211                }
212            } else {
213                Err(ClientError::BackendError {
214                    status: status.as_u16(),
215                    message: status.canonical_reason().unwrap_or("Unknown").to_string(),
216                })
217            }
218        }
219        Err(e) => Err(ClientError::NetworkError(e.to_string())),
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    fn with_runtime<F: FnOnce()>(f: F) {
228        let rt = tokio::runtime::Runtime::new().unwrap();
229        let _guard = rt.enter();
230        f();
231    }
232
233    #[test]
234    fn test_new_normalizes_url() {
235        with_runtime(|| {
236            let client = OtlpHttpMetricsClient::new("localhost:3030/");
237            assert_eq!(client.base_url, "http://localhost:3030");
238        });
239    }
240
241    #[test]
242    fn test_backend_type() {
243        with_runtime(|| {
244            let client = OtlpHttpMetricsClient::new("http://localhost:3030");
245            assert_eq!(client.backend_type(), "otlp");
246        });
247    }
248}