1pub mod demo;
36pub mod error;
37pub mod logs;
38pub mod otlp;
39pub mod prometheus;
40pub mod promise;
41pub mod request;
42pub mod tracing;
43pub mod types;
44
45pub use demo::DemoMetricsClient;
46pub use error::ClientError;
47pub use poll_promise::Promise;
48pub use promise::promise_channel;
49pub use request::QueryRequest;
50pub use types::{MetricsBucket, MetricsGroup, QueryResponse, ResultType, Timestamp};
51
52pub use prometheus::response::MetricLabels;
54
55pub use logs::{
57 DemoLogsClient, LogEntry, LogLevel, LogsClient, LogsQuery, LogsResponse, LogsResult,
58 LokiClient, QueryDirection, StreamsResult,
59};
60
61#[inline]
64pub fn now_unix_secs() -> u64 {
65 #[cfg(target_arch = "wasm32")]
66 {
67 use web_time::SystemTime;
68 SystemTime::now()
69 .duration_since(SystemTime::UNIX_EPOCH)
70 .map(|d| d.as_secs())
71 .unwrap_or(0)
72 }
73 #[cfg(not(target_arch = "wasm32"))]
74 {
75 #[allow(clippy::disallowed_types)]
76 std::time::SystemTime::now()
77 .duration_since(std::time::UNIX_EPOCH)
78 .map(|d| d.as_secs())
79 .unwrap_or(0)
80 }
81}
82
83pub type QueryResult = Result<QueryResponse, ClientError>;
85
86pub type LabelsResult = Result<Vec<String>, ClientError>;
88
89pub type MetricLabelsResult = Result<MetricLabels, ClientError>;
91
92pub type HealthCheckResult = Result<BackendInfo, ClientError>;
94
95#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
97pub struct BackendInfo {
98 pub backend_type: String,
100 pub version: String,
102}
103
104pub trait MetricsClient {
109 fn query(&self, request: QueryRequest, ctx: &egui::Context) -> Promise<QueryResult>;
114
115 fn fetch_label_names(&self, ctx: &egui::Context) -> Promise<LabelsResult>;
119
120 fn fetch_label_values(&self, label: &str, ctx: &egui::Context) -> Promise<LabelsResult>;
124
125 fn fetch_metric_names(&self, ctx: &egui::Context) -> Promise<LabelsResult>;
129
130 fn fetch_metric_labels(&self, metric: &str, ctx: &egui::Context)
135 -> Promise<MetricLabelsResult>;
136
137 fn backend_type(&self) -> &'static str;
139
140 fn health_check(&self, ctx: &egui::Context) -> Promise<HealthCheckResult>;
145}
146
147pub fn normalize_url(url: impl Into<String>) -> String {
149 let mut url = url.into();
150 if !url.starts_with("http://") && !url.starts_with("https://") {
151 url = format!("http://{url}");
152 }
153 if url.ends_with('/') {
154 url.pop();
155 }
156 url
157}
158
159pub fn url_encode(s: &str) -> String {
164 let mut result = String::with_capacity(s.len() * 2);
165 for c in s.chars() {
166 match c {
167 ' ' => result.push_str("%20"),
168 '"' => result.push_str("%22"),
169 '#' => result.push_str("%23"),
170 '%' => result.push_str("%25"),
171 '&' => result.push_str("%26"),
172 '+' => result.push_str("%2B"),
173 '=' => result.push_str("%3D"),
174 '{' => result.push_str("%7B"),
175 '}' => result.push_str("%7D"),
176 '[' => result.push_str("%5B"),
177 ']' => result.push_str("%5D"),
178 '|' => result.push_str("%7C"),
179 '~' => result.push_str("%7E"),
180 _ => result.push(c),
181 }
182 }
183 result
184}
185
186pub const DEFAULT_QUERY_TIMEOUT_SECS: u64 = 30;
189
190struct PendingQuery {
192 promise: Promise<QueryResult>,
194 started_at: u64,
196}
197
198pub struct QueryManager {
220 pending: rustc_hash::FxHashMap<usize, PendingQuery>,
222 timeout_secs: u64,
224}
225
226impl Default for QueryManager {
227 fn default() -> Self {
228 Self::new()
229 }
230}
231
232impl QueryManager {
233 #[must_use]
235 pub fn new() -> Self {
236 Self {
237 pending: rustc_hash::FxHashMap::default(),
238 timeout_secs: DEFAULT_QUERY_TIMEOUT_SECS,
239 }
240 }
241
242 #[must_use]
244 pub fn with_timeout(timeout_secs: u64) -> Self {
245 Self {
246 pending: rustc_hash::FxHashMap::default(),
247 timeout_secs,
248 }
249 }
250
251 #[must_use]
253 pub fn is_querying(&self) -> bool {
254 !self.pending.is_empty()
255 }
256
257 #[must_use]
259 pub fn is_querying_id(&self, id: usize) -> bool {
260 self.pending.contains_key(&id)
261 }
262
263 #[must_use]
265 pub fn pending_count(&self) -> usize {
266 self.pending.len()
267 }
268
269 pub fn execute<C: MetricsClient + ?Sized>(
274 &mut self,
275 id: usize,
276 client: &C,
277 request: QueryRequest,
278 ctx: &egui::Context,
279 ) {
280 let promise = client.query(request, ctx);
281 self.pending.insert(
282 id,
283 PendingQuery {
284 promise,
285 started_at: now_unix_secs(),
286 },
287 );
288 }
289
290 pub fn poll_all(&mut self) -> Vec<(usize, QueryResult)> {
295 let now = now_unix_secs();
296 let mut completed = Vec::new();
297 let mut to_remove = Vec::new();
298
299 for (&id, pending) in &self.pending {
300 if let Some(result) = pending.promise.ready() {
302 completed.push((id, result.clone()));
303 to_remove.push(id);
304 continue;
305 }
306
307 let elapsed = now.saturating_sub(pending.started_at);
309 if elapsed >= self.timeout_secs {
310 log::warn!(
311 "Query {id} timed out after {elapsed} seconds (timeout: {}s)",
312 self.timeout_secs
313 );
314 completed.push((
315 id,
316 Err(ClientError::Timeout {
317 elapsed_secs: elapsed,
318 timeout_secs: self.timeout_secs,
319 }),
320 ));
321 to_remove.push(id);
322 }
323 }
324
325 for id in to_remove {
327 self.pending.remove(&id);
328 }
329
330 completed
331 }
332
333 pub fn cancel(&mut self, id: usize) {
338 self.pending.remove(&id);
339 }
340
341 pub fn cancel_all(&mut self) {
343 self.pending.clear();
344 }
345}
346
347pub struct LabelsManager {
369 promise: Option<Promise<LabelsResult>>,
371}
372
373impl Default for LabelsManager {
374 fn default() -> Self {
375 Self::new()
376 }
377}
378
379impl LabelsManager {
380 #[must_use]
382 pub fn new() -> Self {
383 Self { promise: None }
384 }
385
386 #[must_use]
388 pub fn is_fetching(&self) -> bool {
389 self.promise.is_some()
390 }
391
392 pub fn fetch_label_names<C: MetricsClient + ?Sized>(
396 &mut self,
397 client: &C,
398 ctx: &egui::Context,
399 ) {
400 if self.promise.is_some() {
401 return;
402 }
403
404 self.promise = Some(client.fetch_label_names(ctx));
405 }
406
407 pub fn fetch_label_values<C: MetricsClient + ?Sized>(
411 &mut self,
412 client: &C,
413 label: &str,
414 ctx: &egui::Context,
415 ) {
416 if self.promise.is_some() {
417 return;
418 }
419
420 self.promise = Some(client.fetch_label_values(label, ctx));
421 }
422
423 pub fn fetch_metric_names<C: MetricsClient + ?Sized>(
427 &mut self,
428 client: &C,
429 ctx: &egui::Context,
430 ) {
431 if self.promise.is_some() {
432 return;
433 }
434
435 self.promise = Some(client.fetch_metric_names(ctx));
436 }
437
438 pub fn poll(&mut self) -> Option<LabelsResult> {
442 let promise = self.promise.as_ref()?;
443 if let Some(result) = promise.ready() {
444 let result = result.clone();
445 self.promise = None;
446 Some(result)
447 } else {
448 None
449 }
450 }
451
452 pub fn cancel(&mut self) {
454 self.promise = None;
455 }
456}
457
458pub struct MetricLabelsManager {
463 promise: Option<Promise<MetricLabelsResult>>,
465 metric: Option<String>,
467}
468
469impl Default for MetricLabelsManager {
470 fn default() -> Self {
471 Self::new()
472 }
473}
474
475impl MetricLabelsManager {
476 #[must_use]
478 pub fn new() -> Self {
479 Self {
480 promise: None,
481 metric: None,
482 }
483 }
484
485 #[must_use]
487 pub fn is_fetching(&self) -> bool {
488 self.promise.is_some()
489 }
490
491 #[must_use]
493 pub fn fetching_metric(&self) -> Option<&str> {
494 self.metric.as_deref()
495 }
496
497 pub fn fetch<C: MetricsClient + ?Sized>(
501 &mut self,
502 client: &C,
503 metric: &str,
504 ctx: &egui::Context,
505 ) {
506 if self.metric.as_deref() == Some(metric) && self.promise.is_some() {
508 return;
509 }
510
511 self.cancel();
513
514 self.metric = Some(metric.to_string());
515 self.promise = Some(client.fetch_metric_labels(metric, ctx));
516 }
517
518 pub fn poll(&mut self) -> Option<(String, MetricLabelsResult)> {
522 let promise = self.promise.as_ref()?;
523 if let Some(result) = promise.ready() {
524 let result = result.clone();
525 let metric = self.metric.take().unwrap_or_default();
526 self.promise = None;
527 Some((metric, result))
528 } else {
529 None
530 }
531 }
532
533 pub fn cancel(&mut self) {
535 self.promise = None;
536 self.metric = None;
537 }
538}
539
540pub struct HealthCheckManager {
545 promise: Option<Promise<HealthCheckResult>>,
547}
548
549impl Default for HealthCheckManager {
550 fn default() -> Self {
551 Self::new()
552 }
553}
554
555impl HealthCheckManager {
556 #[must_use]
558 pub fn new() -> Self {
559 Self { promise: None }
560 }
561
562 #[must_use]
564 pub fn is_checking(&self) -> bool {
565 self.promise.is_some()
566 }
567
568 pub fn check<C: MetricsClient + ?Sized>(&mut self, client: &C, ctx: &egui::Context) {
572 if self.promise.is_some() {
573 return;
574 }
575
576 self.promise = Some(client.health_check(ctx));
577 }
578
579 pub fn poll(&mut self) -> Option<HealthCheckResult> {
583 let promise = self.promise.as_ref()?;
584 if let Some(result) = promise.ready() {
585 let result = result.clone();
586 self.promise = None;
587 Some(result)
588 } else {
589 None
590 }
591 }
592
593 pub fn cancel(&mut self) {
595 self.promise = None;
596 }
597}
598
599struct PendingLogsQuery {
601 promise: Promise<LogsResult>,
603 started_at: u64,
605}
606
607pub struct LogsQueryManager {
629 pending: rustc_hash::FxHashMap<usize, PendingLogsQuery>,
631 timeout_secs: u64,
633}
634
635impl Default for LogsQueryManager {
636 fn default() -> Self {
637 Self::new()
638 }
639}
640
641impl LogsQueryManager {
642 #[must_use]
644 pub fn new() -> Self {
645 Self {
646 pending: rustc_hash::FxHashMap::default(),
647 timeout_secs: DEFAULT_QUERY_TIMEOUT_SECS,
648 }
649 }
650
651 #[must_use]
653 pub fn with_timeout(timeout_secs: u64) -> Self {
654 Self {
655 pending: rustc_hash::FxHashMap::default(),
656 timeout_secs,
657 }
658 }
659
660 #[must_use]
662 pub fn is_querying(&self) -> bool {
663 !self.pending.is_empty()
664 }
665
666 #[must_use]
668 pub fn is_querying_id(&self, id: usize) -> bool {
669 self.pending.contains_key(&id)
670 }
671
672 #[must_use]
674 pub fn pending_count(&self) -> usize {
675 self.pending.len()
676 }
677
678 pub fn execute<C: LogsClient + ?Sized>(
683 &mut self,
684 id: usize,
685 client: &C,
686 query: LogsQuery,
687 ctx: &egui::Context,
688 ) {
689 let promise = client.query_logs(query, ctx);
690 self.pending.insert(
691 id,
692 PendingLogsQuery {
693 promise,
694 started_at: now_unix_secs(),
695 },
696 );
697 }
698
699 pub fn poll_all(&mut self) -> Vec<(usize, LogsResult)> {
704 let now = now_unix_secs();
705 let mut completed = Vec::new();
706 let mut to_remove = Vec::new();
707
708 for (&id, pending) in &self.pending {
709 if let Some(result) = pending.promise.ready() {
711 completed.push((id, result.clone()));
712 to_remove.push(id);
713 continue;
714 }
715
716 let elapsed = now.saturating_sub(pending.started_at);
718 if elapsed >= self.timeout_secs {
719 log::warn!(
720 "Logs query {id} timed out after {elapsed} seconds (timeout: {}s)",
721 self.timeout_secs
722 );
723 completed.push((
724 id,
725 Err(ClientError::Timeout {
726 elapsed_secs: elapsed,
727 timeout_secs: self.timeout_secs,
728 }),
729 ));
730 to_remove.push(id);
731 }
732 }
733
734 for id in to_remove {
736 self.pending.remove(&id);
737 }
738
739 completed
740 }
741
742 pub fn cancel(&mut self, id: usize) {
747 self.pending.remove(&id);
748 }
749
750 pub fn cancel_all(&mut self) {
752 self.pending.clear();
753 }
754}
755
756#[cfg(test)]
757mod tests {
758 use super::*;
759
760 #[test]
761 fn test_query_request_builder() {
762 let request = QueryRequest::new("cpu_usage", "sum(env:prod)")
763 .with_step(30)
764 .with_range(1000, 2000);
765
766 assert_eq!(request.metric, "cpu_usage");
767 assert_eq!(request.query, "sum(env:prod)");
768 assert_eq!(request.step_secs, 30);
769 assert_eq!(request.start, Some(1000));
770 assert_eq!(request.end, Some(2000));
771 }
772
773 #[test]
774 fn test_query_manager_initial_state() {
775 let manager = QueryManager::new();
776 assert!(!manager.is_querying());
777 }
778
779 #[test]
780 fn test_labels_manager_initial_state() {
781 let manager = LabelsManager::new();
782 assert!(!manager.is_fetching());
783 }
784
785 #[test]
786 fn test_logs_query_manager_initial_state() {
787 let manager = LogsQueryManager::new();
788 assert!(!manager.is_querying());
789 assert_eq!(manager.pending_count(), 0);
790 }
791
792 #[test]
793 fn test_normalize_url_adds_http() {
794 assert_eq!(normalize_url("localhost:9090"), "http://localhost:9090");
795 }
796
797 #[test]
798 fn test_normalize_url_preserves_https() {
799 assert_eq!(normalize_url("https://example.com"), "https://example.com");
800 }
801
802 #[test]
803 fn test_normalize_url_strips_trailing_slash() {
804 assert_eq!(
805 normalize_url("http://localhost:9090/"),
806 "http://localhost:9090"
807 );
808 }
809
810 #[test]
811 fn test_normalize_url_no_change_needed() {
812 assert_eq!(
813 normalize_url("http://localhost:9090"),
814 "http://localhost:9090"
815 );
816 }
817
818 #[test]
819 fn test_url_encode_simple() {
820 assert_eq!(url_encode("simple"), "simple");
821 assert_eq!(url_encode("hello world"), "hello%20world");
822 }
823
824 #[test]
825 fn test_url_encode_special_chars() {
826 assert_eq!(url_encode("{app=\"test\"}"), "%7Bapp%3D%22test%22%7D");
827 assert_eq!(url_encode("a&b=c"), "a%26b%3Dc");
828 assert_eq!(url_encode("[1,2]"), "%5B1,2%5D");
829 assert_eq!(url_encode("a+b"), "a%2Bb");
830 assert_eq!(url_encode("a|b"), "a%7Cb");
831 assert_eq!(url_encode("100%"), "100%25");
832 }
833
834 #[test]
835 fn test_client_error_display() {
836 let err = ClientError::TranslationError("OR not supported".to_string());
837 assert_eq!(
838 err.to_string(),
839 "query translation failed: OR not supported"
840 );
841
842 let err = ClientError::BackendError {
843 status: 400,
844 message: "bad query".to_string(),
845 };
846 assert_eq!(err.to_string(), "backend error (HTTP 400): bad query");
847 }
848}