enya_client/otlp/
http_logs_client.rs1use poll_promise::Promise;
7
8use crate::error::ClientError;
9use crate::logs::{LogsClient, LogsQuery, LogsResponse, LogsResult, StreamsResult};
10use crate::normalize_url;
11use crate::promise::promise_channel;
12use crate::url_encode;
13use crate::{BackendInfo, HealthCheckResult};
14
15pub struct OtlpHttpLogsClient {
20 base_url: String,
21 http_client: reqwest::Client,
22 #[cfg(not(target_arch = "wasm32"))]
23 runtime_handle: tokio::runtime::Handle,
24}
25
26impl OtlpHttpLogsClient {
27 #[must_use]
37 pub fn new(base_url: impl Into<String>) -> Self {
38 Self {
39 base_url: normalize_url(base_url),
40 http_client: reqwest::Client::new(),
41 #[cfg(not(target_arch = "wasm32"))]
42 runtime_handle: tokio::runtime::Handle::current(),
43 }
44 }
45
46 #[cfg(not(target_arch = "wasm32"))]
48 #[must_use]
49 pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
50 Self {
51 base_url: normalize_url(base_url),
52 http_client: reqwest::Client::new(),
53 runtime_handle: handle,
54 }
55 }
56
57 #[cfg(not(target_arch = "wasm32"))]
58 fn spawn<F>(&self, future: F)
59 where
60 F: std::future::Future<Output = ()> + Send + 'static,
61 {
62 self.runtime_handle.spawn(future);
63 }
64
65 #[cfg(target_arch = "wasm32")]
66 fn spawn<F>(&self, future: F)
67 where
68 F: std::future::Future<Output = ()> + 'static,
69 {
70 wasm_bindgen_futures::spawn_local(future);
71 }
72
73 fn build_query_url(&self, query: &LogsQuery) -> String {
74 let mut url = format!(
75 "{}/api/otlp/logs/query?start_ns={}&end_ns={}&limit={}",
76 self.base_url, query.start_ns, query.end_ns, query.limit
77 );
78
79 if let Some(ref text) = query.contains {
80 url.push_str("&contains=");
81 url.push_str(&url_encode(text));
82 }
83
84 if !query.labels.is_empty() {
85 if let Ok(labels_json) = serde_json::to_string(&query.labels) {
86 url.push_str("&labels=");
87 url.push_str(&url_encode(&labels_json));
88 }
89 }
90
91 url
92 }
93}
94
95impl LogsClient for OtlpHttpLogsClient {
96 fn query_logs(&self, query: LogsQuery, ctx: &egui::Context) -> Promise<LogsResult> {
97 let url = self.build_query_url(&query);
98
99 log::debug!("OTLP HTTP query_logs: {url}");
100
101 let (sender, promise) = promise_channel();
102 let ctx = ctx.clone();
103 let client = self.http_client.clone();
104
105 self.spawn(async move {
106 let result = match client.get(&url).send().await {
107 Ok(response) => {
108 let status = response.status();
109 if status.is_success() {
110 match response.bytes().await {
111 Ok(bytes) => serde_json::from_slice::<LogsResponse>(&bytes)
112 .map_err(|e| ClientError::ParseError(e.to_string())),
113 Err(e) => Err(ClientError::NetworkError(e.to_string())),
114 }
115 } else {
116 Err(ClientError::BackendError {
117 status: status.as_u16(),
118 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
119 })
120 }
121 }
122 Err(e) => Err(ClientError::NetworkError(e.to_string())),
123 };
124 sender.send(result);
125 ctx.request_repaint();
126 });
127
128 promise
129 }
130
131 fn fetch_streams(&self, ctx: &egui::Context) -> Promise<StreamsResult> {
132 let url = format!("{}/api/otlp/labels", self.base_url);
133
134 let (sender, promise) = promise_channel();
135 let ctx = ctx.clone();
136 let client = self.http_client.clone();
137
138 self.spawn(async move {
139 let result = match client.get(&url).send().await {
140 Ok(response) => {
141 let status = response.status();
142 if status.is_success() {
143 match response.bytes().await {
144 Ok(bytes) => serde_json::from_slice::<Vec<String>>(&bytes)
145 .map_err(|e| ClientError::ParseError(e.to_string())),
146 Err(e) => Err(ClientError::NetworkError(e.to_string())),
147 }
148 } else {
149 Err(ClientError::BackendError {
150 status: status.as_u16(),
151 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
152 })
153 }
154 }
155 Err(e) => Err(ClientError::NetworkError(e.to_string())),
156 };
157 sender.send(result);
158 ctx.request_repaint();
159 });
160
161 promise
162 }
163
164 fn backend_type(&self) -> &'static str {
165 "otlp"
166 }
167
168 fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
169 let url = format!("{}/api/otlp/health", self.base_url);
170
171 let (sender, promise) = promise_channel();
172 let ctx = ctx.clone();
173 let client = self.http_client.clone();
174
175 self.spawn(async move {
176 let result = match client.get(&url).send().await {
177 Ok(response) => {
178 let status = response.status();
179 if status.is_success() {
180 match response.bytes().await {
181 Ok(bytes) => serde_json::from_slice::<BackendInfo>(&bytes)
182 .map_err(|e| ClientError::ParseError(e.to_string())),
183 Err(e) => Err(ClientError::NetworkError(e.to_string())),
184 }
185 } else {
186 Err(ClientError::BackendError {
187 status: status.as_u16(),
188 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
189 })
190 }
191 }
192 Err(e) => Err(ClientError::NetworkError(e.to_string())),
193 };
194 sender.send(result);
195 ctx.request_repaint();
196 });
197
198 promise
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::url_encode;
206 use rustc_hash::FxHashMap;
207
208 fn with_runtime<F: FnOnce()>(f: F) {
209 let rt = tokio::runtime::Runtime::new().unwrap();
210 let _guard = rt.enter();
211 f();
212 }
213
214 #[test]
215 fn test_new_removes_trailing_slash() {
216 with_runtime(|| {
217 let client = OtlpHttpLogsClient::new("http://localhost:3030/");
218 assert_eq!(client.base_url, "http://localhost:3030");
219 });
220 }
221
222 #[test]
223 fn test_new_adds_http_protocol() {
224 with_runtime(|| {
225 let client = OtlpHttpLogsClient::new("localhost:3030");
226 assert_eq!(client.base_url, "http://localhost:3030");
227 });
228 }
229
230 #[test]
231 fn test_new_preserves_https() {
232 with_runtime(|| {
233 let client = OtlpHttpLogsClient::new("https://agent.example.com");
234 assert_eq!(client.base_url, "https://agent.example.com");
235 });
236 }
237
238 #[test]
239 fn test_backend_type() {
240 with_runtime(|| {
241 let client = OtlpHttpLogsClient::new("http://localhost:3030");
242 assert_eq!(client.backend_type(), "otlp");
243 });
244 }
245
246 #[test]
247 fn test_build_query_url_minimal() {
248 with_runtime(|| {
249 let client = OtlpHttpLogsClient::new("http://localhost:3030");
250 let query = LogsQuery {
251 query: None,
252 labels: FxHashMap::default(),
253 contains: None,
254 start_ns: 1000,
255 end_ns: 2000,
256 limit: 100,
257 direction: crate::logs::QueryDirection::Backward,
258 };
259 let url = client.build_query_url(&query);
260 assert!(url.starts_with("http://localhost:3030/api/otlp/logs/query?"));
261 assert!(url.contains("start_ns=1000"));
262 assert!(url.contains("end_ns=2000"));
263 assert!(url.contains("limit=100"));
264 assert!(!url.contains("contains="));
265 assert!(!url.contains("labels="));
266 });
267 }
268
269 #[test]
270 fn test_build_query_url_with_contains() {
271 with_runtime(|| {
272 let client = OtlpHttpLogsClient::new("http://localhost:3030");
273 let query = LogsQuery {
274 query: None,
275 labels: FxHashMap::default(),
276 contains: Some("error".to_string()),
277 start_ns: 0,
278 end_ns: 1000,
279 limit: 50,
280 direction: crate::logs::QueryDirection::Backward,
281 };
282 let url = client.build_query_url(&query);
283 assert!(url.contains("contains=error"));
284 });
285 }
286
287 #[test]
288 fn test_build_query_url_with_labels() {
289 with_runtime(|| {
290 let client = OtlpHttpLogsClient::new("http://localhost:3030");
291 let mut labels = FxHashMap::default();
292 labels.insert("service".to_string(), "api".to_string());
293 let query = LogsQuery {
294 query: None,
295 labels,
296 contains: None,
297 start_ns: 0,
298 end_ns: 1000,
299 limit: 50,
300 direction: crate::logs::QueryDirection::Backward,
301 };
302 let url = client.build_query_url(&query);
303 assert!(url.contains("labels="));
304 assert!(url.contains("%7B"));
306 });
307 }
308
309 #[test]
310 fn test_url_encode_simple() {
311 assert_eq!(url_encode("simple"), "simple");
312 assert_eq!(url_encode("hello world"), "hello%20world");
313 }
314
315 #[test]
316 fn test_url_encode_special_chars() {
317 assert_eq!(url_encode("{\"key\":\"val\"}"), "%7B%22key%22:%22val%22%7D");
318 assert_eq!(url_encode("a&b=c"), "a%26b%3Dc");
319 assert_eq!(url_encode("[1,2]"), "%5B1,2%5D");
320 assert_eq!(url_encode("a+b"), "a%2Bb");
321 }
322}