1use poll_promise::Promise;
4
5use crate::error::ClientError;
6use crate::logs::{LogsClient, LogsQuery, LogsResult, QueryDirection, StreamsResult};
7use crate::normalize_url;
8use crate::promise::promise_channel;
9use crate::url_encode;
10use crate::{BackendInfo, HealthCheckResult};
11
12use super::response::{parse_buildinfo_response, parse_labels_response, parse_logs_response};
13
14pub struct LokiClient {
31 base_url: String,
32 http_client: reqwest::Client,
33 #[cfg(not(target_arch = "wasm32"))]
34 runtime_handle: tokio::runtime::Handle,
35}
36
37impl LokiClient {
38 #[must_use]
50 pub fn new(base_url: impl Into<String>) -> Self {
51 Self {
52 base_url: normalize_url(base_url),
53 http_client: reqwest::Client::new(),
54 #[cfg(not(target_arch = "wasm32"))]
55 runtime_handle: tokio::runtime::Handle::current(),
56 }
57 }
58
59 #[cfg(not(target_arch = "wasm32"))]
63 #[must_use]
64 pub fn with_runtime(base_url: impl Into<String>, handle: tokio::runtime::Handle) -> Self {
65 Self {
66 base_url: normalize_url(base_url),
67 http_client: reqwest::Client::new(),
68 runtime_handle: handle,
69 }
70 }
71
72 #[cfg(not(target_arch = "wasm32"))]
74 fn spawn<F>(&self, future: F)
75 where
76 F: std::future::Future<Output = ()> + Send + 'static,
77 {
78 self.runtime_handle.spawn(future);
79 }
80
81 #[cfg(target_arch = "wasm32")]
83 fn spawn<F>(&self, future: F)
84 where
85 F: std::future::Future<Output = ()> + 'static,
86 {
87 wasm_bindgen_futures::spawn_local(future);
88 }
89
90 fn build_query_url(&self, query: &LogsQuery) -> String {
92 let start_secs = query.start_ns / 1_000_000_000;
94 let end_secs = query.end_ns / 1_000_000_000;
95
96 let logql = self.build_logql(query);
98 let encoded_query = url_encode(&logql);
99
100 let direction = match query.direction {
101 QueryDirection::Forward => "forward",
102 QueryDirection::Backward => "backward",
103 };
104
105 format!(
106 "{}/loki/api/v1/query_range?query={}&start={}&end={}&limit={}&direction={}",
107 self.base_url, encoded_query, start_secs, end_secs, query.limit, direction
108 )
109 }
110
111 fn build_logql(&self, query: &LogsQuery) -> String {
113 if let Some(ref raw) = query.query {
115 return raw.clone();
116 }
117
118 let mut selector_parts: Vec<String> = query
120 .labels
121 .iter()
122 .map(|(k, v)| format!("{k}=\"{v}\""))
123 .collect();
124
125 if selector_parts.is_empty() {
127 selector_parts.push("__name__=~\".+\"".to_string());
128 }
129
130 let selector = format!("{{{}}}", selector_parts.join(", "));
131
132 if let Some(ref text) = query.contains {
134 format!("{selector} |= \"{text}\"")
135 } else {
136 selector
137 }
138 }
139}
140
141impl LogsClient for LokiClient {
142 fn query_logs(&self, query: LogsQuery, ctx: &egui::Context) -> Promise<LogsResult> {
143 let url = self.build_query_url(&query);
144
145 log::debug!("Loki query: {url}");
146
147 let (sender, promise) = promise_channel();
148 let ctx = ctx.clone();
149 let client = self.http_client.clone();
150
151 self.spawn(async move {
152 let result = match client.get(&url).send().await {
153 Ok(response) => {
154 let status = response.status();
155 if status.is_success() {
156 match response.bytes().await {
157 Ok(bytes) => parse_logs_response(&bytes),
158 Err(e) => Err(ClientError::NetworkError(e.to_string())),
159 }
160 } else {
161 Err(ClientError::BackendError {
162 status: status.as_u16(),
163 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
164 })
165 }
166 }
167 Err(e) => Err(ClientError::NetworkError(e.to_string())),
168 };
169 sender.send(result);
170 ctx.request_repaint();
171 });
172
173 promise
174 }
175
176 fn fetch_streams(&self, ctx: &egui::Context) -> Promise<StreamsResult> {
177 let url = format!("{}/loki/api/v1/labels", self.base_url);
178
179 log::debug!("Loki fetch labels: {url}");
180
181 let (sender, promise) = promise_channel();
182 let ctx = ctx.clone();
183 let client = self.http_client.clone();
184
185 self.spawn(async move {
186 let result = match client.get(&url).send().await {
187 Ok(response) => {
188 let status = response.status();
189 if status.is_success() {
190 match response.bytes().await {
191 Ok(bytes) => parse_labels_response(&bytes),
192 Err(e) => Err(ClientError::NetworkError(e.to_string())),
193 }
194 } else {
195 Err(ClientError::BackendError {
196 status: status.as_u16(),
197 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
198 })
199 }
200 }
201 Err(e) => Err(ClientError::NetworkError(e.to_string())),
202 };
203 sender.send(result);
204 ctx.request_repaint();
205 });
206
207 promise
208 }
209
210 fn backend_type(&self) -> &'static str {
211 "loki"
212 }
213
214 fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult> {
215 let url = format!("{}/loki/api/v1/status/buildinfo", self.base_url);
216
217 log::debug!("Loki health check: {url}");
218
219 let (sender, promise) = promise_channel();
220 let ctx = ctx.clone();
221 let client = self.http_client.clone();
222
223 self.spawn(async move {
224 let result = match client.get(&url).send().await {
225 Ok(response) => {
226 let status = response.status();
227 if status.is_success() {
228 match response.bytes().await {
229 Ok(bytes) => parse_buildinfo_response(&bytes).map(|info| BackendInfo {
230 backend_type: "loki".to_string(),
231 version: info.version,
232 }),
233 Err(e) => Err(ClientError::NetworkError(e.to_string())),
234 }
235 } else {
236 Err(ClientError::BackendError {
237 status: status.as_u16(),
238 message: status.canonical_reason().unwrap_or("Unknown").to_string(),
239 })
240 }
241 }
242 Err(e) => Err(ClientError::NetworkError(e.to_string())),
243 };
244 sender.send(result);
245 ctx.request_repaint();
246 });
247
248 promise
249 }
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use rustc_hash::FxHashMap;
256
257 fn with_runtime<F: FnOnce()>(f: F) {
259 let rt = tokio::runtime::Runtime::new().unwrap();
260 let _guard = rt.enter();
261 f();
262 }
263
264 #[test]
265 fn test_new_removes_trailing_slash() {
266 with_runtime(|| {
267 let client = LokiClient::new("http://localhost:3100/");
268 assert_eq!(client.base_url, "http://localhost:3100");
269 });
270 }
271
272 #[test]
273 fn test_new_adds_http_protocol() {
274 with_runtime(|| {
275 let client = LokiClient::new("localhost:3100");
276 assert_eq!(client.base_url, "http://localhost:3100");
277 });
278 }
279
280 #[test]
281 fn test_new_preserves_https() {
282 with_runtime(|| {
283 let client = LokiClient::new("https://loki.example.com");
284 assert_eq!(client.base_url, "https://loki.example.com");
285 });
286 }
287
288 #[test]
289 fn test_build_logql_with_labels() {
290 with_runtime(|| {
291 let client = LokiClient::new("http://localhost:3100");
292 let mut labels = FxHashMap::default();
293 labels.insert("app".to_string(), "myservice".to_string());
294 labels.insert("env".to_string(), "prod".to_string());
295
296 let query = LogsQuery {
297 query: None,
298 labels,
299 contains: None,
300 start_ns: 1000000000,
301 end_ns: 2000000000,
302 limit: 100,
303 direction: QueryDirection::Backward,
304 };
305
306 let logql = client.build_logql(&query);
307 assert!(logql.contains("app=\"myservice\""));
309 assert!(logql.contains("env=\"prod\""));
310 assert!(logql.starts_with('{'));
311 assert!(logql.ends_with('}'));
312 });
313 }
314
315 #[test]
316 fn test_build_logql_with_contains() {
317 with_runtime(|| {
318 let client = LokiClient::new("http://localhost:3100");
319 let mut labels = FxHashMap::default();
320 labels.insert("app".to_string(), "myservice".to_string());
321
322 let query = LogsQuery {
323 query: None,
324 labels,
325 contains: Some("SELECT".to_string()),
326 start_ns: 1000000000,
327 end_ns: 2000000000,
328 limit: 100,
329 direction: QueryDirection::Backward,
330 };
331
332 let logql = client.build_logql(&query);
333 assert!(logql.contains("|= \"SELECT\""));
334 });
335 }
336
337 #[test]
338 fn test_build_logql_raw_query() {
339 with_runtime(|| {
340 let client = LokiClient::new("http://localhost:3100");
341
342 let query = LogsQuery {
343 query: Some("{app=\"test\"} |~ \"error|warn\"".to_string()),
344 labels: FxHashMap::default(),
345 contains: None,
346 start_ns: 1000000000,
347 end_ns: 2000000000,
348 limit: 100,
349 direction: QueryDirection::Backward,
350 };
351
352 let logql = client.build_logql(&query);
353 assert_eq!(logql, "{app=\"test\"} |~ \"error|warn\"");
354 });
355 }
356
357 #[test]
358 fn test_build_query_url() {
359 with_runtime(|| {
360 let client = LokiClient::new("http://localhost:3100");
361
362 let query = LogsQuery::new(1000000000000, 2000000000000)
363 .with_label("app", "myservice")
364 .with_limit(500)
365 .with_direction(QueryDirection::Forward);
366
367 let url = client.build_query_url(&query);
368 assert!(url.starts_with("http://localhost:3100/loki/api/v1/query_range?"));
369 assert!(url.contains("start=1000"));
370 assert!(url.contains("end=2000"));
371 assert!(url.contains("limit=500"));
372 assert!(url.contains("direction=forward"));
373 });
374 }
375
376 #[test]
377 fn test_url_encode() {
378 assert_eq!(url_encode("simple"), "simple");
379 assert_eq!(url_encode("{app=\"test\"}"), "%7Bapp%3D%22test%22%7D");
380 assert_eq!(url_encode("a|b"), "a%7Cb");
381 }
382
383 #[test]
384 fn test_backend_type() {
385 with_runtime(|| {
386 let client = LokiClient::new("http://localhost:3100");
387 assert_eq!(client.backend_type(), "loki");
388 });
389 }
390}