Skip to main content

enya_client/otlp/
http_logs_client.rs

1//! HTTP-based OTLP logs client.
2//!
3//! Queries the agent daemon's OTLP store over HTTP, following the same
4//! promise-based async pattern as [`LokiClient`](crate::logs::LokiClient).
5
6use poll_promise::Promise;
7
8use crate::error::ClientError;
9use crate::logs::{LogsClient, LogsQuery, LogsResponse, LogsResult, StreamsResult};
10use crate::normalize_url;
11use crate::promise::promise_channel;
12use crate::url_encode;
13use crate::{BackendInfo, HealthCheckResult};
14
15/// HTTP client for querying the agent's OTLP log store.
16///
17/// Sends GET requests to `/api/otlp/logs/query`, `/api/otlp/labels`,
18/// and `/api/otlp/health` endpoints on the agent daemon.
19pub struct OtlpHttpLogsClient {
20    base_url: String,
21    http_client: reqwest::Client,
22    #[cfg(not(target_arch = "wasm32"))]
23    runtime_handle: tokio::runtime::Handle,
24}
25
26impl OtlpHttpLogsClient {
27    /// Create a new OTLP HTTP logs client.
28    ///
29    /// # Arguments
30    ///
31    /// * `base_url` - The agent daemon URL (e.g., "http://localhost:3030")
32    ///
33    /// # Panics
34    ///
35    /// On native, panics if called outside a tokio runtime context.
36    #[must_use]
37    pub fn new(base_url: impl Into<String>) -> Self {
38        Self {
39            base_url: normalize_url(base_url),
40            http_client: reqwest::Client::new(),
41            #[cfg(not(target_arch = "wasm32"))]
42            runtime_handle: tokio::runtime::Handle::current(),
43        }
44    }
45
46    /// Create a new client with an explicit runtime handle.
47    #[cfg(not(target_arch = "wasm32"))]
48    #[must_use]
49    pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
50        Self {
51            base_url: normalize_url(base_url),
52            http_client: reqwest::Client::new(),
53            runtime_handle: handle,
54        }
55    }
56
57    #[cfg(not(target_arch = "wasm32"))]
58    fn spawn<F>(&self, future: F)
59    where
60        F: std::future::Future<Output = ()> + Send + 'static,
61    {
62        self.runtime_handle.spawn(future);
63    }
64
65    #[cfg(target_arch = "wasm32")]
66    fn spawn<F>(&self, future: F)
67    where
68        F: std::future::Future<Output = ()> + 'static,
69    {
70        wasm_bindgen_futures::spawn_local(future);
71    }
72
73    fn build_query_url(&self, query: &LogsQuery) -> String {
74        let mut url = format!(
75            "{}/api/otlp/logs/query?start_ns={}&end_ns={}&limit={}",
76            self.base_url, query.start_ns, query.end_ns, query.limit
77        );
78
79        if let Some(ref text) = query.contains {
80            url.push_str("&contains=");
81            url.push_str(&url_encode(text));
82        }
83
84        if !query.labels.is_empty() {
85            if let Ok(labels_json) = serde_json::to_string(&query.labels) {
86                url.push_str("&labels=");
87                url.push_str(&url_encode(&labels_json));
88            }
89        }
90
91        url
92    }
93}
94
95impl LogsClient for OtlpHttpLogsClient {
96    fn query_logs(&self, query: LogsQuery, ctx: &egui::Context) -> Promise<LogsResult> {
97        let url = self.build_query_url(&query);
98
99        log::debug!("OTLP HTTP query_logs: {url}");
100
101        let (sender, promise) = promise_channel();
102        let ctx = ctx.clone();
103        let client = self.http_client.clone();
104
105        self.spawn(async move {
106            let result = match client.get(&url).send().await {
107                Ok(response) => {
108                    let status = response.status();
109                    if status.is_success() {
110                        match response.bytes().await {
111                            Ok(bytes) => serde_json::from_slice::<LogsResponse>(&bytes)
112                                .map_err(|e| ClientError::ParseError(e.to_string())),
113                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
114                        }
115                    } else {
116                        Err(ClientError::BackendError {
117                            status: status.as_u16(),
118                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
119                        })
120                    }
121                }
122                Err(e) => Err(ClientError::NetworkError(e.to_string())),
123            };
124            sender.send(result);
125            ctx.request_repaint();
126        });
127
128        promise
129    }
130
131    fn fetch_streams(&self, ctx: &egui::Context) -> Promise<StreamsResult> {
132        let url = format!("{}/api/otlp/labels", self.base_url);
133
134        let (sender, promise) = promise_channel();
135        let ctx = ctx.clone();
136        let client = self.http_client.clone();
137
138        self.spawn(async move {
139            let result = match client.get(&url).send().await {
140                Ok(response) => {
141                    let status = response.status();
142                    if status.is_success() {
143                        match response.bytes().await {
144                            Ok(bytes) => serde_json::from_slice::<Vec<String>>(&bytes)
145                                .map_err(|e| ClientError::ParseError(e.to_string())),
146                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
147                        }
148                    } else {
149                        Err(ClientError::BackendError {
150                            status: status.as_u16(),
151                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
152                        })
153                    }
154                }
155                Err(e) => Err(ClientError::NetworkError(e.to_string())),
156            };
157            sender.send(result);
158            ctx.request_repaint();
159        });
160
161        promise
162    }
163
164    fn backend_type(&self) -> &'static str {
165        "otlp"
166    }
167
168    fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
169        let url = format!("{}/api/otlp/health", self.base_url);
170
171        let (sender, promise) = promise_channel();
172        let ctx = ctx.clone();
173        let client = self.http_client.clone();
174
175        self.spawn(async move {
176            let result = match client.get(&url).send().await {
177                Ok(response) => {
178                    let status = response.status();
179                    if status.is_success() {
180                        match response.bytes().await {
181                            Ok(bytes) => serde_json::from_slice::<BackendInfo>(&bytes)
182                                .map_err(|e| ClientError::ParseError(e.to_string())),
183                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
184                        }
185                    } else {
186                        Err(ClientError::BackendError {
187                            status: status.as_u16(),
188                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
189                        })
190                    }
191                }
192                Err(e) => Err(ClientError::NetworkError(e.to_string())),
193            };
194            sender.send(result);
195            ctx.request_repaint();
196        });
197
198        promise
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::url_encode;
206    use rustc_hash::FxHashMap;
207
208    fn with_runtime<F: FnOnce()>(f: F) {
209        let rt = tokio::runtime::Runtime::new().unwrap();
210        let _guard = rt.enter();
211        f();
212    }
213
214    #[test]
215    fn test_new_removes_trailing_slash() {
216        with_runtime(|| {
217            let client = OtlpHttpLogsClient::new("http://localhost:3030/");
218            assert_eq!(client.base_url, "http://localhost:3030");
219        });
220    }
221
222    #[test]
223    fn test_new_adds_http_protocol() {
224        with_runtime(|| {
225            let client = OtlpHttpLogsClient::new("localhost:3030");
226            assert_eq!(client.base_url, "http://localhost:3030");
227        });
228    }
229
230    #[test]
231    fn test_new_preserves_https() {
232        with_runtime(|| {
233            let client = OtlpHttpLogsClient::new("https://agent.example.com");
234            assert_eq!(client.base_url, "https://agent.example.com");
235        });
236    }
237
238    #[test]
239    fn test_backend_type() {
240        with_runtime(|| {
241            let client = OtlpHttpLogsClient::new("http://localhost:3030");
242            assert_eq!(client.backend_type(), "otlp");
243        });
244    }
245
246    #[test]
247    fn test_build_query_url_minimal() {
248        with_runtime(|| {
249            let client = OtlpHttpLogsClient::new("http://localhost:3030");
250            let query = LogsQuery {
251                query: None,
252                labels: FxHashMap::default(),
253                contains: None,
254                start_ns: 1000,
255                end_ns: 2000,
256                limit: 100,
257                direction: crate::logs::QueryDirection::Backward,
258            };
259            let url = client.build_query_url(&query);
260            assert!(url.starts_with("http://localhost:3030/api/otlp/logs/query?"));
261            assert!(url.contains("start_ns=1000"));
262            assert!(url.contains("end_ns=2000"));
263            assert!(url.contains("limit=100"));
264            assert!(!url.contains("contains="));
265            assert!(!url.contains("labels="));
266        });
267    }
268
269    #[test]
270    fn test_build_query_url_with_contains() {
271        with_runtime(|| {
272            let client = OtlpHttpLogsClient::new("http://localhost:3030");
273            let query = LogsQuery {
274                query: None,
275                labels: FxHashMap::default(),
276                contains: Some("error".to_string()),
277                start_ns: 0,
278                end_ns: 1000,
279                limit: 50,
280                direction: crate::logs::QueryDirection::Backward,
281            };
282            let url = client.build_query_url(&query);
283            assert!(url.contains("contains=error"));
284        });
285    }
286
287    #[test]
288    fn test_build_query_url_with_labels() {
289        with_runtime(|| {
290            let client = OtlpHttpLogsClient::new("http://localhost:3030");
291            let mut labels = FxHashMap::default();
292            labels.insert("service".to_string(), "api".to_string());
293            let query = LogsQuery {
294                query: None,
295                labels,
296                contains: None,
297                start_ns: 0,
298                end_ns: 1000,
299                limit: 50,
300                direction: crate::logs::QueryDirection::Backward,
301            };
302            let url = client.build_query_url(&query);
303            assert!(url.contains("labels="));
304            // Labels are JSON-encoded, so should contain %7B (encoded '{')
305            assert!(url.contains("%7B"));
306        });
307    }
308
309    #[test]
310    fn test_url_encode_simple() {
311        assert_eq!(url_encode("simple"), "simple");
312        assert_eq!(url_encode("hello world"), "hello%20world");
313    }
314
315    #[test]
316    fn test_url_encode_special_chars() {
317        assert_eq!(url_encode("{\"key\":\"val\"}"), "%7B%22key%22:%22val%22%7D");
318        assert_eq!(url_encode("a&b=c"), "a%26b%3Dc");
319        assert_eq!(url_encode("[1,2]"), "%5B1,2%5D");
320        assert_eq!(url_encode("a+b"), "a%2Bb");
321    }
322}