1use std::collections::VecDeque;
8use std::sync::Arc;
9
10use parking_lot::RwLock;
11use rustc_hash::FxHashMap;
12
13use crate::logs::LogEntry;
14use crate::tracing::tempo::types::{Trace, TraceSummary};
15use crate::types::{MetricsBucket, MetricsGroup, QueryResponse, ResultType, Timestamp};
16
17#[derive(Debug, Clone)]
19pub struct StoreConfig {
20 pub max_traces: usize,
22 pub max_log_entries: usize,
24 pub max_metric_data_points: usize,
26}
27
28impl Default for StoreConfig {
29 fn default() -> Self {
30 Self {
31 max_traces: 1000,
32 max_log_entries: 50_000,
33 max_metric_data_points: 10_000,
34 }
35 }
36}
37
38pub struct TelemetryStore {
44 traces: RwLock<TraceStore>,
45 logs: RwLock<LogStore>,
46 metrics: RwLock<MetricsStore>,
47}
48
49impl TelemetryStore {
50 pub fn new(config: StoreConfig) -> Arc<Self> {
52 Arc::new(Self {
53 traces: RwLock::new(TraceStore::new(config.max_traces)),
54 logs: RwLock::new(LogStore::new(config.max_log_entries)),
55 metrics: RwLock::new(MetricsStore::new(config.max_metric_data_points)),
56 })
57 }
58
59 pub fn insert_trace(&self, trace: Trace) {
63 self.traces.write().insert(trace);
64 }
65
66 pub fn get_trace(&self, trace_id: &str) -> Option<Trace> {
68 self.traces.read().get(trace_id)
69 }
70
71 #[allow(clippy::too_many_arguments)]
73 pub fn search_traces(
74 &self,
75 service_name: Option<&str>,
76 operation_name: Option<&str>,
77 min_duration_us: Option<u64>,
78 max_duration_us: Option<u64>,
79 start_time_us: Option<u64>,
80 end_time_us: Option<u64>,
81 limit: usize,
82 ) -> Vec<TraceSummary> {
83 self.traces.read().search(
84 service_name,
85 operation_name,
86 min_duration_us,
87 max_duration_us,
88 start_time_us,
89 end_time_us,
90 limit,
91 )
92 }
93
94 pub fn trace_count(&self) -> usize {
96 self.traces.read().len()
97 }
98
99 pub fn insert_logs(&self, entries: Vec<LogEntry>) {
103 self.logs.write().insert_batch(entries);
104 }
105
106 pub fn query_logs(
108 &self,
109 start_ns: i64,
110 end_ns: i64,
111 labels: &FxHashMap<String, String>,
112 contains: Option<&str>,
113 limit: usize,
114 ) -> Vec<LogEntry> {
115 self.logs
116 .read()
117 .query(start_ns, end_ns, labels, contains, limit)
118 }
119
120 pub fn known_log_labels(&self) -> Vec<String> {
122 self.logs.read().known_labels()
123 }
124
125 pub fn log_count(&self) -> usize {
127 self.logs.read().len()
128 }
129
130 pub fn insert_metric_point(&self, point: MetricDataPoint) {
134 self.metrics.write().insert(point);
135 }
136
137 pub fn insert_metric_points(&self, points: Vec<MetricDataPoint>) {
139 let mut store = self.metrics.write();
140 for point in points {
141 store.insert(point);
142 }
143 }
144
145 pub fn metric_names(&self) -> Vec<String> {
147 self.metrics.read().metric_names()
148 }
149
150 pub fn metric_label_names(&self, metric: &str) -> Vec<String> {
152 self.metrics.read().label_names(metric)
153 }
154
155 pub fn metric_label_values(&self, metric: &str, label: &str) -> Vec<String> {
157 self.metrics.read().label_values(metric, label)
158 }
159
160 pub fn query_metric(
162 &self,
163 metric: &str,
164 labels: &FxHashMap<String, String>,
165 start_ns: u64,
166 end_ns: u64,
167 step_ns: u64,
168 ) -> QueryResponse {
169 self.metrics
170 .read()
171 .query(metric, labels, start_ns, end_ns, step_ns)
172 }
173
174 pub fn metric_series_count(&self) -> usize {
176 self.metrics.read().series_count()
177 }
178
179 pub fn metric_point_count(&self) -> usize {
181 self.metrics.read().point_count()
182 }
183}
184
185struct TraceStore {
190 traces: FxHashMap<String, Trace>,
192 order: VecDeque<String>,
194 max_traces: usize,
196}
197
198impl TraceStore {
199 fn new(max_traces: usize) -> Self {
200 Self {
201 traces: FxHashMap::default(),
202 order: VecDeque::new(),
203 max_traces,
204 }
205 }
206
207 fn insert(&mut self, trace: Trace) {
208 let trace_id = trace.trace_id.clone();
209
210 if let Some(existing) = self.traces.get_mut(&trace_id) {
212 *existing = trace;
213 return;
214 }
215
216 while self.traces.len() >= self.max_traces {
218 if let Some(old_id) = self.order.pop_front() {
219 self.traces.remove(&old_id);
220 } else {
221 break;
222 }
223 }
224
225 self.order.push_back(trace_id.clone());
226 self.traces.insert(trace_id, trace);
227 }
228
229 fn get(&self, trace_id: &str) -> Option<Trace> {
230 self.traces.get(trace_id).cloned()
231 }
232
233 fn len(&self) -> usize {
234 self.traces.len()
235 }
236
237 #[allow(clippy::too_many_arguments)]
238 fn search(
239 &self,
240 service_name: Option<&str>,
241 operation_name: Option<&str>,
242 min_duration_us: Option<u64>,
243 max_duration_us: Option<u64>,
244 start_time_us: Option<u64>,
245 end_time_us: Option<u64>,
246 limit: usize,
247 ) -> Vec<TraceSummary> {
248 let mut results: Vec<TraceSummary> = self
249 .traces
250 .values()
251 .filter(|trace| {
252 if let Some(start) = start_time_us {
254 if trace.start_time_us < start {
255 return false;
256 }
257 }
258 if let Some(end) = end_time_us {
259 if trace.start_time_us > end {
260 return false;
261 }
262 }
263 if let Some(min) = min_duration_us {
265 if trace.duration_us < min {
266 return false;
267 }
268 }
269 if let Some(max) = max_duration_us {
270 if trace.duration_us > max {
271 return false;
272 }
273 }
274 if let Some(svc) = service_name {
276 if !trace.services.iter().any(|s| s == svc) {
277 return false;
278 }
279 }
280 if let Some(op) = operation_name {
282 let has_op = trace.spans.iter().any(|s| s.operation_name == op);
283 if !has_op {
284 return false;
285 }
286 }
287 true
288 })
289 .map(|trace| {
290 let root = trace
291 .root_span_id
292 .as_ref()
293 .and_then(|id| trace.get_span(id));
294 let error_count = trace
295 .spans
296 .iter()
297 .filter(|s| s.status == crate::tracing::tempo::types::SpanStatus::Error)
298 .count();
299 TraceSummary {
300 trace_id: trace.trace_id.clone(),
301 root_service_name: root.map(|s| s.service_name.clone()).unwrap_or_default(),
302 root_operation_name: root.map(|s| s.operation_name.clone()).unwrap_or_default(),
303 start_time_us: trace.start_time_us,
304 duration_us: trace.duration_us,
305 span_count: trace.spans.len(),
306 error_count,
307 }
308 })
309 .collect();
310
311 results.sort_by(|a, b| b.start_time_us.cmp(&a.start_time_us));
313 results.truncate(limit);
314 results
315 }
316}
317
318#[derive(Debug, Clone)]
324pub struct MetricDataPoint {
325 pub name: String,
327 pub labels: FxHashMap<String, String>,
329 pub timestamp_ns: u64,
331 pub value: f64,
333}
334
335#[derive(Debug, Clone, PartialEq, Eq, Hash)]
337struct SeriesKey {
338 name: String,
339 labels_key: String,
341}
342
343#[derive(Debug, Clone)]
345struct StoredPoint {
346 timestamp_ns: u64,
347 value: f64,
348}
349
350struct MetricsStore {
351 series: FxHashMap<SeriesKey, TimeSeries>,
353 max_points_per_series: usize,
355}
356
357struct TimeSeries {
358 labels: FxHashMap<String, String>,
360 name: String,
362 points: VecDeque<StoredPoint>,
364}
365
366impl MetricsStore {
367 fn new(max_points_per_series: usize) -> Self {
368 Self {
369 series: FxHashMap::default(),
370 max_points_per_series,
371 }
372 }
373
374 fn make_key(name: &str, labels: &FxHashMap<String, String>) -> SeriesKey {
375 let mut pairs: Vec<_> = labels.iter().collect();
376 pairs.sort_by_key(|(k, _)| *k);
377 let labels_key = pairs
378 .iter()
379 .map(|(k, v)| format!("{k}={v}"))
380 .collect::<Vec<_>>()
381 .join(",");
382 SeriesKey {
383 name: name.to_string(),
384 labels_key,
385 }
386 }
387
388 fn insert(&mut self, point: MetricDataPoint) {
389 let key = Self::make_key(&point.name, &point.labels);
390 let series = self.series.entry(key).or_insert_with(|| TimeSeries {
391 labels: point.labels.clone(),
392 name: point.name.clone(),
393 points: VecDeque::new(),
394 });
395
396 let stored = StoredPoint {
398 timestamp_ns: point.timestamp_ns,
399 value: point.value,
400 };
401
402 if series
403 .points
404 .back()
405 .is_none_or(|last| last.timestamp_ns <= stored.timestamp_ns)
406 {
407 series.points.push_back(stored);
408 } else {
409 let pos = series
411 .points
412 .iter()
413 .position(|p| p.timestamp_ns > stored.timestamp_ns)
414 .unwrap_or(series.points.len());
415 series.points.insert(pos, stored);
416 }
417
418 while series.points.len() > self.max_points_per_series {
420 series.points.pop_front();
421 }
422 }
423
424 fn metric_names(&self) -> Vec<String> {
425 let mut names: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
426 for series in self.series.values() {
427 names.insert(series.name.clone());
428 }
429 let mut sorted: Vec<String> = names.into_iter().collect();
430 sorted.sort();
431 sorted
432 }
433
434 fn label_names(&self, metric: &str) -> Vec<String> {
435 let mut names: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
436 for series in self.series.values() {
437 if series.name == metric {
438 for key in series.labels.keys() {
439 names.insert(key.clone());
440 }
441 }
442 }
443 let mut sorted: Vec<String> = names.into_iter().collect();
444 sorted.sort();
445 sorted
446 }
447
448 fn label_values(&self, metric: &str, label: &str) -> Vec<String> {
449 let mut values: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
450 for series in self.series.values() {
451 if series.name == metric {
452 if let Some(v) = series.labels.get(label) {
453 values.insert(v.clone());
454 }
455 }
456 }
457 let mut sorted: Vec<String> = values.into_iter().collect();
458 sorted.sort();
459 sorted
460 }
461
462 fn query(
463 &self,
464 metric: &str,
465 label_filter: &FxHashMap<String, String>,
466 start_ns: u64,
467 end_ns: u64,
468 step_ns: u64,
469 ) -> QueryResponse {
470 let matching_series: Vec<&TimeSeries> = self
471 .series
472 .values()
473 .filter(|s| {
474 if s.name != metric {
475 return false;
476 }
477 for (k, v) in label_filter {
478 match s.labels.get(k) {
479 Some(sv) if sv == v => {}
480 _ => return false,
481 }
482 }
483 true
484 })
485 .collect();
486
487 let step_ns = if step_ns == 0 {
488 let range = end_ns.saturating_sub(start_ns);
490 (range / 200).max(1_000_000_000) } else {
492 step_ns
493 };
494
495 let groups: Vec<MetricsGroup> = matching_series
496 .iter()
497 .map(|series| {
498 let mut pairs: Vec<_> = series.labels.iter().collect();
500 pairs.sort_by_key(|(k, _)| *k);
501 let group = pairs
502 .iter()
503 .map(|(k, v)| format!("{k}:{v}"))
504 .collect::<Vec<_>>()
505 .join(", ");
506
507 let buckets = bucket_points(&series.points, start_ns, end_ns, step_ns);
509
510 MetricsGroup {
511 group: if group.is_empty() {
512 metric.to_string()
513 } else {
514 group
515 },
516 buckets,
517 }
518 })
519 .collect();
520
521 QueryResponse {
522 metric: metric.to_string(),
523 query: metric.to_string(),
524 parsed_agg: None,
525 parsed_filter: String::new(),
526 parsed_grouping: None,
527 parsed_time_range: None,
528 start: Some(start_ns as Timestamp),
529 end: Some(end_ns as Timestamp),
530 granularity_ns: step_ns as u128,
531 groups,
532 result_type: ResultType::Matrix,
533 }
534 }
535
536 fn series_count(&self) -> usize {
537 self.series.len()
538 }
539
540 fn point_count(&self) -> usize {
541 self.series.values().map(|s| s.points.len()).sum()
542 }
543}
544
545fn bucket_points(
550 points: &VecDeque<StoredPoint>,
551 start_ns: u64,
552 end_ns: u64,
553 step_ns: u64,
554) -> Vec<MetricsBucket> {
555 if points.is_empty() || start_ns >= end_ns || step_ns == 0 {
556 return Vec::new();
557 }
558
559 let num_buckets = (end_ns - start_ns).div_ceil(step_ns);
560 let mut bucket_data: Vec<Option<(f64, usize)>> = vec![None; num_buckets as usize];
561
562 for p in points.iter() {
563 if p.timestamp_ns < start_ns || p.timestamp_ns >= end_ns {
564 continue;
565 }
566 let idx = ((p.timestamp_ns - start_ns) / step_ns) as usize;
567 if idx < bucket_data.len() {
568 match &mut bucket_data[idx] {
569 Some((val, count)) => {
570 *val = p.value;
571 *count += 1;
572 }
573 slot @ None => {
574 *slot = Some((p.value, 1));
575 }
576 }
577 }
578 }
579
580 bucket_data
581 .into_iter()
582 .enumerate()
583 .filter_map(|(i, data)| {
584 let (value, count) = data?;
585 let bucket_start = start_ns + (i as u64) * step_ns;
586 let bucket_end = (bucket_start + step_ns).min(end_ns);
587 Some(MetricsBucket {
588 start: bucket_start as Timestamp,
589 end: bucket_end as Timestamp,
590 value,
591 count,
592 })
593 })
594 .collect()
595}
596
597struct LogStore {
602 entries: VecDeque<LogEntry>,
604 max_entries: usize,
606}
607
608impl LogStore {
609 fn new(max_entries: usize) -> Self {
610 Self {
611 entries: VecDeque::new(),
612 max_entries,
613 }
614 }
615
616 fn insert_batch(&mut self, entries: Vec<LogEntry>) {
617 for entry in entries {
618 if self.entries.len() >= self.max_entries {
619 self.entries.pop_front();
620 }
621 self.entries.push_back(entry);
622 }
623 }
624
625 fn query(
626 &self,
627 start_ns: i64,
628 end_ns: i64,
629 labels: &FxHashMap<String, String>,
630 contains: Option<&str>,
631 limit: usize,
632 ) -> Vec<LogEntry> {
633 let contains_lower = contains.map(|t| t.to_lowercase());
634 self.entries
635 .iter()
636 .filter(|entry| {
637 if entry.timestamp_ns < start_ns || entry.timestamp_ns > end_ns {
639 return false;
640 }
641 for (key, value) in labels {
643 match entry.labels.get(key) {
644 Some(v) if v == value => {}
645 _ => return false,
646 }
647 }
648 if let Some(ref pattern) = contains_lower {
650 if !entry.message.to_lowercase().contains(pattern) {
651 return false;
652 }
653 }
654 true
655 })
656 .rev() .take(limit)
658 .cloned()
659 .collect()
660 }
661
662 fn known_labels(&self) -> Vec<String> {
663 let mut labels: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
664 for entry in &self.entries {
665 for key in entry.labels.keys() {
666 labels.insert(key.clone());
667 }
668 }
669 let mut sorted: Vec<String> = labels.into_iter().collect();
670 sorted.sort();
671 sorted
672 }
673
674 fn len(&self) -> usize {
675 self.entries.len()
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682 use crate::tracing::tempo::types::{Span, SpanStatus};
683
684 fn make_trace(id: &str, service: &str, duration_us: u64) -> Trace {
685 let spans = vec![Span {
686 span_id: format!("{id}-span1"),
687 trace_id: id.to_string(),
688 parent_span_id: None,
689 operation_name: "root".to_string(),
690 service_name: service.to_string(),
691 start_time_us: 1_000_000,
692 duration_us,
693 status: SpanStatus::Ok,
694 tags: FxHashMap::default(),
695 logs: vec![],
696 depth: 0,
697 }];
698 Trace::from_spans(id.to_string(), spans)
699 }
700
701 #[test]
702 fn test_store_insert_and_get_trace() {
703 let store = TelemetryStore::new(StoreConfig::default());
704 let trace = make_trace("trace1", "svc-a", 5000);
705
706 store.insert_trace(trace);
707 assert_eq!(store.trace_count(), 1);
708
709 let fetched = store.get_trace("trace1").unwrap();
710 assert_eq!(fetched.trace_id, "trace1");
711 }
712
713 #[test]
714 fn test_store_evicts_oldest() {
715 let store = TelemetryStore::new(StoreConfig {
716 max_traces: 2,
717 ..Default::default()
718 });
719
720 store.insert_trace(make_trace("t1", "svc", 100));
721 store.insert_trace(make_trace("t2", "svc", 200));
722 store.insert_trace(make_trace("t3", "svc", 300));
723
724 assert_eq!(store.trace_count(), 2);
725 assert!(store.get_trace("t1").is_none()); assert!(store.get_trace("t2").is_some());
727 assert!(store.get_trace("t3").is_some());
728 }
729
730 #[test]
731 fn test_log_store_query() {
732 let store = TelemetryStore::new(StoreConfig::default());
733
734 let mut labels = FxHashMap::default();
735 labels.insert("service".to_string(), "api".to_string());
736
737 store.insert_logs(vec![
738 LogEntry {
739 timestamp_ns: 1000,
740 message: "hello world".to_string(),
741 labels: labels.clone(),
742 level: None,
743 },
744 LogEntry {
745 timestamp_ns: 2000,
746 message: "error occurred".to_string(),
747 labels: labels.clone(),
748 level: None,
749 },
750 ]);
751
752 let results = store.query_logs(0, 3000, &FxHashMap::default(), None, 100);
753 assert_eq!(results.len(), 2);
754
755 let results = store.query_logs(0, 3000, &FxHashMap::default(), Some("error"), 100);
757 assert_eq!(results.len(), 1);
758 assert!(results[0].message.contains("error"));
759 }
760
761 #[test]
762 fn test_search_by_service_name() {
763 let store = TelemetryStore::new(StoreConfig::default());
764 store.insert_trace(make_trace("t1", "api", 5000));
765 store.insert_trace(make_trace("t2", "worker", 3000));
766 store.insert_trace(make_trace("t3", "api", 7000));
767
768 let results = store.search_traces(Some("api"), None, None, None, None, None, 100);
769 assert_eq!(results.len(), 2);
770 assert!(results.iter().all(|r| r.root_service_name == "api"));
771 }
772
773 #[test]
774 fn test_search_by_duration_range() {
775 let store = TelemetryStore::new(StoreConfig::default());
776 store.insert_trace(make_trace("t1", "svc", 1000));
777 store.insert_trace(make_trace("t2", "svc", 5000));
778 store.insert_trace(make_trace("t3", "svc", 10000));
779
780 let results = store.search_traces(None, None, Some(4000), None, None, None, 100);
782 assert_eq!(results.len(), 2);
783
784 let results = store.search_traces(None, None, None, Some(6000), None, None, 100);
786 assert_eq!(results.len(), 2);
787
788 let results = store.search_traces(None, None, Some(2000), Some(8000), None, None, 100);
790 assert_eq!(results.len(), 1);
791 assert_eq!(results[0].trace_id, "t2");
792 }
793
794 #[test]
795 fn test_search_with_limit() {
796 let store = TelemetryStore::new(StoreConfig::default());
797 for i in 0..10 {
798 store.insert_trace(make_trace(&format!("t{i}"), "svc", 1000));
799 }
800
801 let results = store.search_traces(None, None, None, None, None, None, 3);
802 assert_eq!(results.len(), 3);
803 }
804
805 #[test]
806 fn test_search_returns_newest_first() {
807 let store = TelemetryStore::new(StoreConfig::default());
808
809 let mut spans = vec![Span {
812 span_id: "s1".to_string(),
813 trace_id: "early".to_string(),
814 parent_span_id: None,
815 operation_name: "root".to_string(),
816 service_name: "svc".to_string(),
817 start_time_us: 100,
818 duration_us: 1000,
819 status: SpanStatus::Ok,
820 tags: FxHashMap::default(),
821 logs: vec![],
822 depth: 0,
823 }];
824 store.insert_trace(Trace::from_spans("early".to_string(), spans.clone()));
825
826 spans[0].trace_id = "late".to_string();
827 spans[0].span_id = "s2".to_string();
828 spans[0].start_time_us = 999_000;
829 store.insert_trace(Trace::from_spans("late".to_string(), spans));
830
831 let results = store.search_traces(None, None, None, None, None, None, 100);
832 assert_eq!(results[0].trace_id, "late"); assert_eq!(results[1].trace_id, "early");
834 }
835
836 #[test]
837 fn test_trace_update_in_place() {
838 let store = TelemetryStore::new(StoreConfig::default());
839 store.insert_trace(make_trace("t1", "svc-v1", 1000));
840 store.insert_trace(make_trace("t1", "svc-v2", 2000));
841
842 assert_eq!(store.trace_count(), 1); let trace = store.get_trace("t1").unwrap();
844 assert_eq!(trace.duration_us, 2000);
845 }
846
847 #[test]
848 fn test_log_query_with_label_filter() {
849 let store = TelemetryStore::new(StoreConfig::default());
850
851 let mut api_labels = FxHashMap::default();
852 api_labels.insert("service".to_string(), "api".to_string());
853
854 let mut worker_labels = FxHashMap::default();
855 worker_labels.insert("service".to_string(), "worker".to_string());
856
857 store.insert_logs(vec![
858 LogEntry {
859 timestamp_ns: 1000,
860 message: "api log".to_string(),
861 labels: api_labels.clone(),
862 level: None,
863 },
864 LogEntry {
865 timestamp_ns: 2000,
866 message: "worker log".to_string(),
867 labels: worker_labels,
868 level: None,
869 },
870 ]);
871
872 let results = store.query_logs(0, 5000, &api_labels, None, 100);
873 assert_eq!(results.len(), 1);
874 assert_eq!(results[0].message, "api log");
875 }
876
877 #[test]
878 fn test_known_log_labels_sorted() {
879 let store = TelemetryStore::new(StoreConfig::default());
880
881 let mut labels = FxHashMap::default();
882 labels.insert("service".to_string(), "api".to_string());
883 labels.insert("env".to_string(), "prod".to_string());
884 labels.insert("app".to_string(), "web".to_string());
885
886 store.insert_logs(vec![LogEntry {
887 timestamp_ns: 1000,
888 message: "msg".to_string(),
889 labels,
890 level: None,
891 }]);
892
893 let known = store.known_log_labels();
894 assert_eq!(known, vec!["app", "env", "service"]);
895 }
896
897 #[test]
898 fn test_log_query_case_insensitive_contains() {
899 let store = TelemetryStore::new(StoreConfig::default());
900 store.insert_logs(vec![LogEntry {
901 timestamp_ns: 1000,
902 message: "ERROR: Something failed".to_string(),
903 labels: FxHashMap::default(),
904 level: None,
905 }]);
906
907 let results = store.query_logs(0, 5000, &FxHashMap::default(), Some("error"), 100);
908 assert_eq!(results.len(), 1);
909
910 let results = store.query_logs(0, 5000, &FxHashMap::default(), Some("ERROR"), 100);
911 assert_eq!(results.len(), 1);
912 }
913
914 #[test]
915 fn test_log_store_evicts() {
916 let store = TelemetryStore::new(StoreConfig {
917 max_log_entries: 2,
918 ..Default::default()
919 });
920
921 store.insert_logs(vec![
922 LogEntry {
923 timestamp_ns: 1000,
924 message: "first".to_string(),
925 labels: FxHashMap::default(),
926 level: None,
927 },
928 LogEntry {
929 timestamp_ns: 2000,
930 message: "second".to_string(),
931 labels: FxHashMap::default(),
932 level: None,
933 },
934 LogEntry {
935 timestamp_ns: 3000,
936 message: "third".to_string(),
937 labels: FxHashMap::default(),
938 level: None,
939 },
940 ]);
941
942 assert_eq!(store.log_count(), 2);
943 let results = store.query_logs(0, 5000, &FxHashMap::default(), None, 100);
944 assert_eq!(results.len(), 2);
945 assert!(results.iter().any(|e| e.message == "second"));
947 assert!(results.iter().any(|e| e.message == "third"));
948 }
949}