Skip to main content

enya_client/logs/loki/
client.rs

1//! Loki HTTP client implementation.
2
3use poll_promise::Promise;
4
5use crate::error::ClientError;
6use crate::logs::{LogsClient, LogsQuery, LogsResult, QueryDirection, StreamsResult};
7use crate::normalize_url;
8use crate::promise::promise_channel;
9use crate::url_encode;
10use crate::{BackendInfo, HealthCheckResult};
11
12use super::response::{parse_buildinfo_response, parse_labels_response, parse_logs_response};
13
14/// Client for querying Loki via its HTTP API.
15///
16/// Executes LogQL queries against the `/loki/api/v1/query_range` endpoint.
17/// Uses `reqwest` for HTTP requests on both native (with tokio) and WASM
18/// (with wasm-bindgen-futures).
19///
20/// # Example
21///
22/// ```ignore
23/// use enya_client::logs::{LokiClient, LogsClient, LogsQuery};
24///
25/// let client = LokiClient::new("http://localhost:3100");
26/// let query = LogsQuery::new(start_ns, end_ns)
27///     .with_label("app", "myservice");
28/// let promise = client.query_logs(query, &ctx);
29/// ```
30pub struct LokiClient {
31    base_url: String,
32    http_client: reqwest::Client,
33    #[cfg(not(target_arch = "wasm32"))]
34    runtime_handle: tokio::runtime::Handle,
35}
36
37impl LokiClient {
38    /// Create a new Loki client.
39    ///
40    /// # Arguments
41    ///
42    /// * `base_url` - The base URL of the Loki server (e.g., "http://localhost:3100")
43    ///
44    /// If no protocol is specified, `http://` is assumed.
45    ///
46    /// # Panics
47    ///
48    /// On native, panics if called outside a tokio runtime context.
49    #[must_use]
50    pub fn new(base_url: impl Into<String>) -> Self {
51        Self {
52            base_url: normalize_url(base_url),
53            http_client: reqwest::Client::new(),
54            #[cfg(not(target_arch = "wasm32"))]
55            runtime_handle: tokio::runtime::Handle::current(),
56        }
57    }
58
59    /// Create a new Loki client with an explicit runtime handle.
60    ///
61    /// Use this when creating the client outside a tokio runtime context.
62    #[cfg(not(target_arch = "wasm32"))]
63    #[must_use]
64    pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
65        Self {
66            base_url: normalize_url(base_url),
67            http_client: reqwest::Client::new(),
68            runtime_handle: handle,
69        }
70    }
71
72    /// Spawn an async task on the runtime.
73    #[cfg(not(target_arch = "wasm32"))]
74    fn spawn<F>(&self, future: F)
75    where
76        F: std::future::Future<Output = ()> + Send + 'static,
77    {
78        self.runtime_handle.spawn(future);
79    }
80
81    /// Spawn an async task using wasm-bindgen-futures.
82    #[cfg(target_arch = "wasm32")]
83    fn spawn<F>(&self, future: F)
84    where
85        F: std::future::Future<Output = ()> + 'static,
86    {
87        wasm_bindgen_futures::spawn_local(future);
88    }
89
90    /// Build the query_range URL for a logs query.
91    fn build_query_url(&self, query: &LogsQuery) -> String {
92        // Convert nanoseconds to seconds for the API
93        let start_secs = query.start_ns / 1_000_000_000;
94        let end_secs = query.end_ns / 1_000_000_000;
95
96        // Build LogQL query
97        let logql = self.build_logql(query);
98        let encoded_query = url_encode(&logql);
99
100        let direction = match query.direction {
101            QueryDirection::Forward => "forward",
102            QueryDirection::Backward => "backward",
103        };
104
105        format!(
106            "{}/loki/api/v1/query_range?query={}&start={}&end={}&limit={}&direction={}",
107            self.base_url, encoded_query, start_secs, end_secs, query.limit, direction
108        )
109    }
110
111    /// Build a LogQL query string from query parameters.
112    fn build_logql(&self, query: &LogsQuery) -> String {
113        // If a raw query is provided, use it directly
114        if let Some(ref raw) = query.query {
115            return raw.clone();
116        }
117
118        // Build stream selector from labels
119        let mut selector_parts: Vec<String> = query
120            .labels
121            .iter()
122            .map(|(k, v)| format!("{k}=\"{v}\""))
123            .collect();
124
125        // If no labels specified, match all streams
126        if selector_parts.is_empty() {
127            selector_parts.push("__name__=~\".+\"".to_string());
128        }
129
130        let selector = format!("{{{}}}", selector_parts.join(", "));
131
132        // Add line filter if contains is specified
133        if let Some(ref text) = query.contains {
134            format!("{selector} |= \"{text}\"")
135        } else {
136            selector
137        }
138    }
139}
140
141impl LogsClient for LokiClient {
142    fn query_logs(&self, query: LogsQuery, ctx: &egui::Context) -> Promise<LogsResult> {
143        let url = self.build_query_url(&query);
144
145        log::debug!("Loki query: {url}");
146
147        let (sender, promise) = promise_channel();
148        let ctx = ctx.clone();
149        let client = self.http_client.clone();
150
151        self.spawn(async move {
152            let result = match client.get(&url).send().await {
153                Ok(response) => {
154                    let status = response.status();
155                    if status.is_success() {
156                        match response.bytes().await {
157                            Ok(bytes) => parse_logs_response(&bytes),
158                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
159                        }
160                    } else {
161                        Err(ClientError::BackendError {
162                            status: status.as_u16(),
163                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
164                        })
165                    }
166                }
167                Err(e) => Err(ClientError::NetworkError(e.to_string())),
168            };
169            sender.send(result);
170            ctx.request_repaint();
171        });
172
173        promise
174    }
175
176    fn fetch_streams(&self, ctx: &egui::Context) -> Promise<StreamsResult> {
177        let url = format!("{}/loki/api/v1/labels", self.base_url);
178
179        log::debug!("Loki fetch labels: {url}");
180
181        let (sender, promise) = promise_channel();
182        let ctx = ctx.clone();
183        let client = self.http_client.clone();
184
185        self.spawn(async move {
186            let result = match client.get(&url).send().await {
187                Ok(response) => {
188                    let status = response.status();
189                    if status.is_success() {
190                        match response.bytes().await {
191                            Ok(bytes) => parse_labels_response(&bytes),
192                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
193                        }
194                    } else {
195                        Err(ClientError::BackendError {
196                            status: status.as_u16(),
197                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
198                        })
199                    }
200                }
201                Err(e) => Err(ClientError::NetworkError(e.to_string())),
202            };
203            sender.send(result);
204            ctx.request_repaint();
205        });
206
207        promise
208    }
209
210    fn backend_type(&self) -> &'static str {
211        "loki"
212    }
213
214    fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
215        let url = format!("{}/loki/api/v1/status/buildinfo", self.base_url);
216
217        log::debug!("Loki health check: {url}");
218
219        let (sender, promise) = promise_channel();
220        let ctx = ctx.clone();
221        let client = self.http_client.clone();
222
223        self.spawn(async move {
224            let result = match client.get(&url).send().await {
225                Ok(response) => {
226                    let status = response.status();
227                    if status.is_success() {
228                        match response.bytes().await {
229                            Ok(bytes) => parse_buildinfo_response(&bytes).map(|info| BackendInfo {
230                                backend_type: "loki".to_string(),
231                                version: info.version,
232                            }),
233                            Err(e) => Err(ClientError::NetworkError(e.to_string())),
234                        }
235                    } else {
236                        Err(ClientError::BackendError {
237                            status: status.as_u16(),
238                            message: status.canonical_reason().unwrap_or("Unknown").to_string(),
239                        })
240                    }
241                }
242                Err(e) => Err(ClientError::NetworkError(e.to_string())),
243            };
244            sender.send(result);
245            ctx.request_repaint();
246        });
247
248        promise
249    }
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use rustc_hash::FxHashMap;
256
257    /// Helper to create a tokio runtime for tests
258    fn with_runtime<F: FnOnce()>(f: F) {
259        let rt = tokio::runtime::Runtime::new().unwrap();
260        let _guard = rt.enter();
261        f();
262    }
263
264    #[test]
265    fn test_new_removes_trailing_slash() {
266        with_runtime(|| {
267            let client = LokiClient::new("http://localhost:3100/");
268            assert_eq!(client.base_url, "http://localhost:3100");
269        });
270    }
271
272    #[test]
273    fn test_new_adds_http_protocol() {
274        with_runtime(|| {
275            let client = LokiClient::new("localhost:3100");
276            assert_eq!(client.base_url, "http://localhost:3100");
277        });
278    }
279
280    #[test]
281    fn test_new_preserves_https() {
282        with_runtime(|| {
283            let client = LokiClient::new("https://loki.example.com");
284            assert_eq!(client.base_url, "https://loki.example.com");
285        });
286    }
287
288    #[test]
289    fn test_build_logql_with_labels() {
290        with_runtime(|| {
291            let client = LokiClient::new("http://localhost:3100");
292            let mut labels = FxHashMap::default();
293            labels.insert("app".to_string(), "myservice".to_string());
294            labels.insert("env".to_string(), "prod".to_string());
295
296            let query = LogsQuery {
297                query: None,
298                labels,
299                contains: None,
300                start_ns: 1000000000,
301                end_ns: 2000000000,
302                limit: 100,
303                direction: QueryDirection::Backward,
304            };
305
306            let logql = client.build_logql(&query);
307            // Labels may be in any order due to HashMap
308            assert!(logql.contains("app=\"myservice\""));
309            assert!(logql.contains("env=\"prod\""));
310            assert!(logql.starts_with('{'));
311            assert!(logql.ends_with('}'));
312        });
313    }
314
315    #[test]
316    fn test_build_logql_with_contains() {
317        with_runtime(|| {
318            let client = LokiClient::new("http://localhost:3100");
319            let mut labels = FxHashMap::default();
320            labels.insert("app".to_string(), "myservice".to_string());
321
322            let query = LogsQuery {
323                query: None,
324                labels,
325                contains: Some("SELECT".to_string()),
326                start_ns: 1000000000,
327                end_ns: 2000000000,
328                limit: 100,
329                direction: QueryDirection::Backward,
330            };
331
332            let logql = client.build_logql(&query);
333            assert!(logql.contains("|= \"SELECT\""));
334        });
335    }
336
337    #[test]
338    fn test_build_logql_raw_query() {
339        with_runtime(|| {
340            let client = LokiClient::new("http://localhost:3100");
341
342            let query = LogsQuery {
343                query: Some("{app=\"test\"} |~ \"error|warn\"".to_string()),
344                labels: FxHashMap::default(),
345                contains: None,
346                start_ns: 1000000000,
347                end_ns: 2000000000,
348                limit: 100,
349                direction: QueryDirection::Backward,
350            };
351
352            let logql = client.build_logql(&query);
353            assert_eq!(logql, "{app=\"test\"} |~ \"error|warn\"");
354        });
355    }
356
357    #[test]
358    fn test_build_query_url() {
359        with_runtime(|| {
360            let client = LokiClient::new("http://localhost:3100");
361
362            let query = LogsQuery::new(1000000000000, 2000000000000)
363                .with_label("app", "myservice")
364                .with_limit(500)
365                .with_direction(QueryDirection::Forward);
366
367            let url = client.build_query_url(&query);
368            assert!(url.starts_with("http://localhost:3100/loki/api/v1/query_range?"));
369            assert!(url.contains("start=1000"));
370            assert!(url.contains("end=2000"));
371            assert!(url.contains("limit=500"));
372            assert!(url.contains("direction=forward"));
373        });
374    }
375
376    #[test]
377    fn test_url_encode() {
378        assert_eq!(url_encode("simple"), "simple");
379        assert_eq!(url_encode("{app=\"test\"}"), "%7Bapp%3D%22test%22%7D");
380        assert_eq!(url_encode("a|b"), "a%7Cb");
381    }
382
383    #[test]
384    fn test_backend_type() {
385        with_runtime(|| {
386            let client = LokiClient::new("http://localhost:3100");
387            assert_eq!(client.backend_type(), "loki");
388        });
389    }
390}