Skip to main content

enya_client/otlp/
store.rs

1//! In-memory telemetry store for OTLP data.
2//!
3//! Provides thread-safe bounded storage for traces and logs received via OTLP.
4//! Uses `parking_lot::RwLock` for low-contention concurrent access between
5//! the OTLP receiver (writer) and the editor clients (readers).
6
7use std::collections::VecDeque;
8use std::sync::Arc;
9
10use parking_lot::RwLock;
11use rustc_hash::FxHashMap;
12
13use crate::logs::LogEntry;
14use crate::tracing::tempo::types::{Trace, TraceSummary};
15use crate::types::{MetricsBucket, MetricsGroup, QueryResponse, ResultType, Timestamp};
16
17/// Configuration for the telemetry store's bounded capacity.
18#[derive(Debug, Clone)]
19pub struct StoreConfig {
20    /// Maximum number of traces to retain (default: 1000).
21    pub max_traces: usize,
22    /// Maximum number of log entries to retain (default: 50_000).
23    pub max_log_entries: usize,
24    /// Maximum number of data points per metric time series (default: 10_000).
25    pub max_metric_data_points: usize,
26}
27
28impl Default for StoreConfig {
29    fn default() -> Self {
30        Self {
31            max_traces: 1000,
32            max_log_entries: 50_000,
33            max_metric_data_points: 10_000,
34        }
35    }
36}
37
38/// Thread-safe in-memory telemetry store.
39///
40/// Stores recently-received OTLP data in bounded collections.
41/// The OTLP receiver writes to this store, and the OTLP client trait
42/// implementations read from it.
43pub struct TelemetryStore {
44    traces: RwLock<TraceStore>,
45    logs: RwLock<LogStore>,
46    metrics: RwLock<MetricsStore>,
47}
48
49impl TelemetryStore {
50    /// Create a new telemetry store with the given configuration.
51    pub fn new(config: StoreConfig) -> Arc<Self> {
52        Arc::new(Self {
53            traces: RwLock::new(TraceStore::new(config.max_traces)),
54            logs: RwLock::new(LogStore::new(config.max_log_entries)),
55            metrics: RwLock::new(MetricsStore::new(config.max_metric_data_points)),
56        })
57    }
58
59    // === Trace operations ===
60
61    /// Insert or update a trace in the store.
62    pub fn insert_trace(&self, trace: Trace) {
63        self.traces.write().insert(trace);
64    }
65
66    /// Get a trace by its ID.
67    pub fn get_trace(&self, trace_id: &str) -> Option<Trace> {
68        self.traces.read().get(trace_id)
69    }
70
71    /// Search traces matching the given criteria. Returns summaries.
72    #[allow(clippy::too_many_arguments)]
73    pub fn search_traces(
74        &self,
75        service_name: Option<&str>,
76        operation_name: Option<&str>,
77        min_duration_us: Option<u64>,
78        max_duration_us: Option<u64>,
79        start_time_us: Option<u64>,
80        end_time_us: Option<u64>,
81        limit: usize,
82    ) -> Vec<TraceSummary> {
83        self.traces.read().search(
84            service_name,
85            operation_name,
86            min_duration_us,
87            max_duration_us,
88            start_time_us,
89            end_time_us,
90            limit,
91        )
92    }
93
94    /// Get the number of stored traces.
95    pub fn trace_count(&self) -> usize {
96        self.traces.read().len()
97    }
98
99    // === Log operations ===
100
101    /// Insert log entries into the store.
102    pub fn insert_logs(&self, entries: Vec<LogEntry>) {
103        self.logs.write().insert_batch(entries);
104    }
105
106    /// Query logs matching the given filters.
107    pub fn query_logs(
108        &self,
109        start_ns: i64,
110        end_ns: i64,
111        labels: &FxHashMap<String, String>,
112        contains: Option<&str>,
113        limit: usize,
114    ) -> Vec<LogEntry> {
115        self.logs
116            .read()
117            .query(start_ns, end_ns, labels, contains, limit)
118    }
119
120    /// Get all known label keys from stored logs.
121    pub fn known_log_labels(&self) -> Vec<String> {
122        self.logs.read().known_labels()
123    }
124
125    /// Get the number of stored log entries.
126    pub fn log_count(&self) -> usize {
127        self.logs.read().len()
128    }
129
130    // === Metrics operations ===
131
132    /// Insert a metric data point into the store.
133    pub fn insert_metric_point(&self, point: MetricDataPoint) {
134        self.metrics.write().insert(point);
135    }
136
137    /// Insert a batch of metric data points.
138    pub fn insert_metric_points(&self, points: Vec<MetricDataPoint>) {
139        let mut store = self.metrics.write();
140        for point in points {
141            store.insert(point);
142        }
143    }
144
145    /// Get all known metric names.
146    pub fn metric_names(&self) -> Vec<String> {
147        self.metrics.read().metric_names()
148    }
149
150    /// Get all known label names for a specific metric.
151    pub fn metric_label_names(&self, metric: &str) -> Vec<String> {
152        self.metrics.read().label_names(metric)
153    }
154
155    /// Get all known label values for a specific metric and label name.
156    pub fn metric_label_values(&self, metric: &str, label: &str) -> Vec<String> {
157        self.metrics.read().label_values(metric, label)
158    }
159
160    /// Query a metric and return a QueryResponse compatible with the Prometheus format.
161    pub fn query_metric(
162        &self,
163        metric: &str,
164        labels: &FxHashMap<String, String>,
165        start_ns: u64,
166        end_ns: u64,
167        step_ns: u64,
168    ) -> QueryResponse {
169        self.metrics
170            .read()
171            .query(metric, labels, start_ns, end_ns, step_ns)
172    }
173
174    /// Get the number of stored metric time series.
175    pub fn metric_series_count(&self) -> usize {
176        self.metrics.read().series_count()
177    }
178
179    /// Get the total number of stored metric data points.
180    pub fn metric_point_count(&self) -> usize {
181        self.metrics.read().point_count()
182    }
183}
184
185// ============================================================================
186// TraceStore: bounded trace storage with LRU eviction
187// ============================================================================
188
189struct TraceStore {
190    /// Traces indexed by trace_id.
191    traces: FxHashMap<String, Trace>,
192    /// Insertion order for LRU eviction (oldest first).
193    order: VecDeque<String>,
194    /// Maximum number of traces to retain.
195    max_traces: usize,
196}
197
198impl TraceStore {
199    fn new(max_traces: usize) -> Self {
200        Self {
201            traces: FxHashMap::default(),
202            order: VecDeque::new(),
203            max_traces,
204        }
205    }
206
207    fn insert(&mut self, trace: Trace) {
208        let trace_id = trace.trace_id.clone();
209
210        // If trace already exists, update it in place (don't change order)
211        if let Some(existing) = self.traces.get_mut(&trace_id) {
212            *existing = trace;
213            return;
214        }
215
216        // Evict oldest if at capacity
217        while self.traces.len() >= self.max_traces {
218            if let Some(old_id) = self.order.pop_front() {
219                self.traces.remove(&old_id);
220            } else {
221                break;
222            }
223        }
224
225        self.order.push_back(trace_id.clone());
226        self.traces.insert(trace_id, trace);
227    }
228
229    fn get(&self, trace_id: &str) -> Option<Trace> {
230        self.traces.get(trace_id).cloned()
231    }
232
233    fn len(&self) -> usize {
234        self.traces.len()
235    }
236
237    #[allow(clippy::too_many_arguments)]
238    fn search(
239        &self,
240        service_name: Option<&str>,
241        operation_name: Option<&str>,
242        min_duration_us: Option<u64>,
243        max_duration_us: Option<u64>,
244        start_time_us: Option<u64>,
245        end_time_us: Option<u64>,
246        limit: usize,
247    ) -> Vec<TraceSummary> {
248        let mut results: Vec<TraceSummary> = self
249            .traces
250            .values()
251            .filter(|trace| {
252                // Filter by time range
253                if let Some(start) = start_time_us {
254                    if trace.start_time_us < start {
255                        return false;
256                    }
257                }
258                if let Some(end) = end_time_us {
259                    if trace.start_time_us > end {
260                        return false;
261                    }
262                }
263                // Filter by duration
264                if let Some(min) = min_duration_us {
265                    if trace.duration_us < min {
266                        return false;
267                    }
268                }
269                if let Some(max) = max_duration_us {
270                    if trace.duration_us > max {
271                        return false;
272                    }
273                }
274                // Filter by service name
275                if let Some(svc) = service_name {
276                    if !trace.services.iter().any(|s| s == svc) {
277                        return false;
278                    }
279                }
280                // Filter by operation name (check root span)
281                if let Some(op) = operation_name {
282                    let has_op = trace.spans.iter().any(|s| s.operation_name == op);
283                    if !has_op {
284                        return false;
285                    }
286                }
287                true
288            })
289            .map(|trace| {
290                let root = trace
291                    .root_span_id
292                    .as_ref()
293                    .and_then(|id| trace.get_span(id));
294                let error_count = trace
295                    .spans
296                    .iter()
297                    .filter(|s| s.status == crate::tracing::tempo::types::SpanStatus::Error)
298                    .count();
299                TraceSummary {
300                    trace_id: trace.trace_id.clone(),
301                    root_service_name: root.map(|s| s.service_name.clone()).unwrap_or_default(),
302                    root_operation_name: root.map(|s| s.operation_name.clone()).unwrap_or_default(),
303                    start_time_us: trace.start_time_us,
304                    duration_us: trace.duration_us,
305                    span_count: trace.spans.len(),
306                    error_count,
307                }
308            })
309            .collect();
310
311        // Sort by start time descending (newest first)
312        results.sort_by(|a, b| b.start_time_us.cmp(&a.start_time_us));
313        results.truncate(limit);
314        results
315    }
316}
317
318// ============================================================================
319// MetricsStore: bounded time-series storage for OTLP metrics
320// ============================================================================
321
322/// A single metric data point to be inserted into the store.
323#[derive(Debug, Clone)]
324pub struct MetricDataPoint {
325    /// Metric name (e.g., "http_requests_total").
326    pub name: String,
327    /// Labels/attributes (e.g., {"method": "GET", "service": "api"}).
328    pub labels: FxHashMap<String, String>,
329    /// Timestamp in nanoseconds since epoch.
330    pub timestamp_ns: u64,
331    /// The metric value.
332    pub value: f64,
333}
334
335/// A unique time series identified by metric name + sorted label set.
336#[derive(Debug, Clone, PartialEq, Eq, Hash)]
337struct SeriesKey {
338    name: String,
339    /// Labels as a sorted string: "k1=v1,k2=v2"
340    labels_key: String,
341}
342
343/// A time-ordered data point within a series.
344#[derive(Debug, Clone)]
345struct StoredPoint {
346    timestamp_ns: u64,
347    value: f64,
348}
349
350struct MetricsStore {
351    /// Time series indexed by their unique key.
352    series: FxHashMap<SeriesKey, TimeSeries>,
353    /// Maximum data points per series.
354    max_points_per_series: usize,
355}
356
357struct TimeSeries {
358    /// The original labels for this series.
359    labels: FxHashMap<String, String>,
360    /// Metric name.
361    name: String,
362    /// Data points sorted by timestamp.
363    points: VecDeque<StoredPoint>,
364}
365
366impl MetricsStore {
367    fn new(max_points_per_series: usize) -> Self {
368        Self {
369            series: FxHashMap::default(),
370            max_points_per_series,
371        }
372    }
373
374    fn make_key(name: &str, labels: &FxHashMap<String, String>) -> SeriesKey {
375        let mut pairs: Vec<_> = labels.iter().collect();
376        pairs.sort_by_key(|(k, _)| *k);
377        let labels_key = pairs
378            .iter()
379            .map(|(k, v)| format!("{k}={v}"))
380            .collect::<Vec<_>>()
381            .join(",");
382        SeriesKey {
383            name: name.to_string(),
384            labels_key,
385        }
386    }
387
388    fn insert(&mut self, point: MetricDataPoint) {
389        let key = Self::make_key(&point.name, &point.labels);
390        let series = self.series.entry(key).or_insert_with(|| TimeSeries {
391            labels: point.labels.clone(),
392            name: point.name.clone(),
393            points: VecDeque::new(),
394        });
395
396        // Insert in timestamp order (usually appending)
397        let stored = StoredPoint {
398            timestamp_ns: point.timestamp_ns,
399            value: point.value,
400        };
401
402        if series
403            .points
404            .back()
405            .is_none_or(|last| last.timestamp_ns <= stored.timestamp_ns)
406        {
407            series.points.push_back(stored);
408        } else {
409            // Out-of-order: find insertion point
410            let pos = series
411                .points
412                .iter()
413                .position(|p| p.timestamp_ns > stored.timestamp_ns)
414                .unwrap_or(series.points.len());
415            series.points.insert(pos, stored);
416        }
417
418        // Evict oldest if over capacity
419        while series.points.len() > self.max_points_per_series {
420            series.points.pop_front();
421        }
422    }
423
424    fn metric_names(&self) -> Vec<String> {
425        let mut names: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
426        for series in self.series.values() {
427            names.insert(series.name.clone());
428        }
429        let mut sorted: Vec<String> = names.into_iter().collect();
430        sorted.sort();
431        sorted
432    }
433
434    fn label_names(&self, metric: &str) -> Vec<String> {
435        let mut names: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
436        for series in self.series.values() {
437            if series.name == metric {
438                for key in series.labels.keys() {
439                    names.insert(key.clone());
440                }
441            }
442        }
443        let mut sorted: Vec<String> = names.into_iter().collect();
444        sorted.sort();
445        sorted
446    }
447
448    fn label_values(&self, metric: &str, label: &str) -> Vec<String> {
449        let mut values: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
450        for series in self.series.values() {
451            if series.name == metric {
452                if let Some(v) = series.labels.get(label) {
453                    values.insert(v.clone());
454                }
455            }
456        }
457        let mut sorted: Vec<String> = values.into_iter().collect();
458        sorted.sort();
459        sorted
460    }
461
462    fn query(
463        &self,
464        metric: &str,
465        label_filter: &FxHashMap<String, String>,
466        start_ns: u64,
467        end_ns: u64,
468        step_ns: u64,
469    ) -> QueryResponse {
470        let matching_series: Vec<&TimeSeries> = self
471            .series
472            .values()
473            .filter(|s| {
474                if s.name != metric {
475                    return false;
476                }
477                for (k, v) in label_filter {
478                    match s.labels.get(k) {
479                        Some(sv) if sv == v => {}
480                        _ => return false,
481                    }
482                }
483                true
484            })
485            .collect();
486
487        let step_ns = if step_ns == 0 {
488            // Auto-calculate: aim for ~200 data points
489            let range = end_ns.saturating_sub(start_ns);
490            (range / 200).max(1_000_000_000) // at least 1 second
491        } else {
492            step_ns
493        };
494
495        let groups: Vec<MetricsGroup> = matching_series
496            .iter()
497            .map(|series| {
498                // Build group label string
499                let mut pairs: Vec<_> = series.labels.iter().collect();
500                pairs.sort_by_key(|(k, _)| *k);
501                let group = pairs
502                    .iter()
503                    .map(|(k, v)| format!("{k}:{v}"))
504                    .collect::<Vec<_>>()
505                    .join(", ");
506
507                // Bucket the data points
508                let buckets = bucket_points(&series.points, start_ns, end_ns, step_ns);
509
510                MetricsGroup {
511                    group: if group.is_empty() {
512                        metric.to_string()
513                    } else {
514                        group
515                    },
516                    buckets,
517                }
518            })
519            .collect();
520
521        QueryResponse {
522            metric: metric.to_string(),
523            query: metric.to_string(),
524            parsed_agg: None,
525            parsed_filter: String::new(),
526            parsed_grouping: None,
527            parsed_time_range: None,
528            start: Some(start_ns as Timestamp),
529            end: Some(end_ns as Timestamp),
530            granularity_ns: step_ns as u128,
531            groups,
532            result_type: ResultType::Matrix,
533        }
534    }
535
536    fn series_count(&self) -> usize {
537        self.series.len()
538    }
539
540    fn point_count(&self) -> usize {
541        self.series.values().map(|s| s.points.len()).sum()
542    }
543}
544
545/// Bucket data points into fixed-width time windows, taking the last value per bucket.
546///
547/// Single-pass O(N) algorithm: since points are sorted by timestamp, we advance
548/// through points once, assigning each to its bucket.
549fn bucket_points(
550    points: &VecDeque<StoredPoint>,
551    start_ns: u64,
552    end_ns: u64,
553    step_ns: u64,
554) -> Vec<MetricsBucket> {
555    if points.is_empty() || start_ns >= end_ns || step_ns == 0 {
556        return Vec::new();
557    }
558
559    let num_buckets = (end_ns - start_ns).div_ceil(step_ns);
560    let mut bucket_data: Vec<Option<(f64, usize)>> = vec![None; num_buckets as usize];
561
562    for p in points.iter() {
563        if p.timestamp_ns < start_ns || p.timestamp_ns >= end_ns {
564            continue;
565        }
566        let idx = ((p.timestamp_ns - start_ns) / step_ns) as usize;
567        if idx < bucket_data.len() {
568            match &mut bucket_data[idx] {
569                Some((val, count)) => {
570                    *val = p.value;
571                    *count += 1;
572                }
573                slot @ None => {
574                    *slot = Some((p.value, 1));
575                }
576            }
577        }
578    }
579
580    bucket_data
581        .into_iter()
582        .enumerate()
583        .filter_map(|(i, data)| {
584            let (value, count) = data?;
585            let bucket_start = start_ns + (i as u64) * step_ns;
586            let bucket_end = (bucket_start + step_ns).min(end_ns);
587            Some(MetricsBucket {
588                start: bucket_start as Timestamp,
589                end: bucket_end as Timestamp,
590                value,
591                count,
592            })
593        })
594        .collect()
595}
596
597// ============================================================================
598// LogStore: bounded log entry ring buffer
599// ============================================================================
600
601struct LogStore {
602    /// Log entries (oldest first).
603    entries: VecDeque<LogEntry>,
604    /// Maximum number of entries.
605    max_entries: usize,
606}
607
608impl LogStore {
609    fn new(max_entries: usize) -> Self {
610        Self {
611            entries: VecDeque::new(),
612            max_entries,
613        }
614    }
615
616    fn insert_batch(&mut self, entries: Vec<LogEntry>) {
617        for entry in entries {
618            if self.entries.len() >= self.max_entries {
619                self.entries.pop_front();
620            }
621            self.entries.push_back(entry);
622        }
623    }
624
625    fn query(
626        &self,
627        start_ns: i64,
628        end_ns: i64,
629        labels: &FxHashMap<String, String>,
630        contains: Option<&str>,
631        limit: usize,
632    ) -> Vec<LogEntry> {
633        let contains_lower = contains.map(|t| t.to_lowercase());
634        self.entries
635            .iter()
636            .filter(|entry| {
637                // Time range filter
638                if entry.timestamp_ns < start_ns || entry.timestamp_ns > end_ns {
639                    return false;
640                }
641                // Label filter
642                for (key, value) in labels {
643                    match entry.labels.get(key) {
644                        Some(v) if v == value => {}
645                        _ => return false,
646                    }
647                }
648                // Text search
649                if let Some(ref pattern) = contains_lower {
650                    if !entry.message.to_lowercase().contains(pattern) {
651                        return false;
652                    }
653                }
654                true
655            })
656            .rev() // Newest first
657            .take(limit)
658            .cloned()
659            .collect()
660    }
661
662    fn known_labels(&self) -> Vec<String> {
663        let mut labels: rustc_hash::FxHashSet<String> = rustc_hash::FxHashSet::default();
664        for entry in &self.entries {
665            for key in entry.labels.keys() {
666                labels.insert(key.clone());
667            }
668        }
669        let mut sorted: Vec<String> = labels.into_iter().collect();
670        sorted.sort();
671        sorted
672    }
673
674    fn len(&self) -> usize {
675        self.entries.len()
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682    use crate::tracing::tempo::types::{Span, SpanStatus};
683
684    fn make_trace(id: &str, service: &str, duration_us: u64) -> Trace {
685        let spans = vec![Span {
686            span_id: format!("{id}-span1"),
687            trace_id: id.to_string(),
688            parent_span_id: None,
689            operation_name: "root".to_string(),
690            service_name: service.to_string(),
691            start_time_us: 1_000_000,
692            duration_us,
693            status: SpanStatus::Ok,
694            tags: FxHashMap::default(),
695            logs: vec![],
696            depth: 0,
697        }];
698        Trace::from_spans(id.to_string(), spans)
699    }
700
701    #[test]
702    fn test_store_insert_and_get_trace() {
703        let store = TelemetryStore::new(StoreConfig::default());
704        let trace = make_trace("trace1", "svc-a", 5000);
705
706        store.insert_trace(trace);
707        assert_eq!(store.trace_count(), 1);
708
709        let fetched = store.get_trace("trace1").unwrap();
710        assert_eq!(fetched.trace_id, "trace1");
711    }
712
713    #[test]
714    fn test_store_evicts_oldest() {
715        let store = TelemetryStore::new(StoreConfig {
716            max_traces: 2,
717            ..Default::default()
718        });
719
720        store.insert_trace(make_trace("t1", "svc", 100));
721        store.insert_trace(make_trace("t2", "svc", 200));
722        store.insert_trace(make_trace("t3", "svc", 300));
723
724        assert_eq!(store.trace_count(), 2);
725        assert!(store.get_trace("t1").is_none()); // evicted
726        assert!(store.get_trace("t2").is_some());
727        assert!(store.get_trace("t3").is_some());
728    }
729
730    #[test]
731    fn test_log_store_query() {
732        let store = TelemetryStore::new(StoreConfig::default());
733
734        let mut labels = FxHashMap::default();
735        labels.insert("service".to_string(), "api".to_string());
736
737        store.insert_logs(vec![
738            LogEntry {
739                timestamp_ns: 1000,
740                message: "hello world".to_string(),
741                labels: labels.clone(),
742                level: None,
743            },
744            LogEntry {
745                timestamp_ns: 2000,
746                message: "error occurred".to_string(),
747                labels: labels.clone(),
748                level: None,
749            },
750        ]);
751
752        let results = store.query_logs(0, 3000, &FxHashMap::default(), None, 100);
753        assert_eq!(results.len(), 2);
754
755        // Filter by text
756        let results = store.query_logs(0, 3000, &FxHashMap::default(), Some("error"), 100);
757        assert_eq!(results.len(), 1);
758        assert!(results[0].message.contains("error"));
759    }
760
761    #[test]
762    fn test_search_by_service_name() {
763        let store = TelemetryStore::new(StoreConfig::default());
764        store.insert_trace(make_trace("t1", "api", 5000));
765        store.insert_trace(make_trace("t2", "worker", 3000));
766        store.insert_trace(make_trace("t3", "api", 7000));
767
768        let results = store.search_traces(Some("api"), None, None, None, None, None, 100);
769        assert_eq!(results.len(), 2);
770        assert!(results.iter().all(|r| r.root_service_name == "api"));
771    }
772
773    #[test]
774    fn test_search_by_duration_range() {
775        let store = TelemetryStore::new(StoreConfig::default());
776        store.insert_trace(make_trace("t1", "svc", 1000));
777        store.insert_trace(make_trace("t2", "svc", 5000));
778        store.insert_trace(make_trace("t3", "svc", 10000));
779
780        // Min duration filter
781        let results = store.search_traces(None, None, Some(4000), None, None, None, 100);
782        assert_eq!(results.len(), 2);
783
784        // Max duration filter
785        let results = store.search_traces(None, None, None, Some(6000), None, None, 100);
786        assert_eq!(results.len(), 2);
787
788        // Both min and max
789        let results = store.search_traces(None, None, Some(2000), Some(8000), None, None, 100);
790        assert_eq!(results.len(), 1);
791        assert_eq!(results[0].trace_id, "t2");
792    }
793
794    #[test]
795    fn test_search_with_limit() {
796        let store = TelemetryStore::new(StoreConfig::default());
797        for i in 0..10 {
798            store.insert_trace(make_trace(&format!("t{i}"), "svc", 1000));
799        }
800
801        let results = store.search_traces(None, None, None, None, None, None, 3);
802        assert_eq!(results.len(), 3);
803    }
804
805    #[test]
806    fn test_search_returns_newest_first() {
807        let store = TelemetryStore::new(StoreConfig::default());
808
809        // All traces have start_time_us = 1_000_000 from make_trace,
810        // so ordering is deterministic by start_time
811        let mut spans = vec![Span {
812            span_id: "s1".to_string(),
813            trace_id: "early".to_string(),
814            parent_span_id: None,
815            operation_name: "root".to_string(),
816            service_name: "svc".to_string(),
817            start_time_us: 100,
818            duration_us: 1000,
819            status: SpanStatus::Ok,
820            tags: FxHashMap::default(),
821            logs: vec![],
822            depth: 0,
823        }];
824        store.insert_trace(Trace::from_spans("early".to_string(), spans.clone()));
825
826        spans[0].trace_id = "late".to_string();
827        spans[0].span_id = "s2".to_string();
828        spans[0].start_time_us = 999_000;
829        store.insert_trace(Trace::from_spans("late".to_string(), spans));
830
831        let results = store.search_traces(None, None, None, None, None, None, 100);
832        assert_eq!(results[0].trace_id, "late"); // newest first
833        assert_eq!(results[1].trace_id, "early");
834    }
835
836    #[test]
837    fn test_trace_update_in_place() {
838        let store = TelemetryStore::new(StoreConfig::default());
839        store.insert_trace(make_trace("t1", "svc-v1", 1000));
840        store.insert_trace(make_trace("t1", "svc-v2", 2000));
841
842        assert_eq!(store.trace_count(), 1); // same trace_id, updated in place
843        let trace = store.get_trace("t1").unwrap();
844        assert_eq!(trace.duration_us, 2000);
845    }
846
847    #[test]
848    fn test_log_query_with_label_filter() {
849        let store = TelemetryStore::new(StoreConfig::default());
850
851        let mut api_labels = FxHashMap::default();
852        api_labels.insert("service".to_string(), "api".to_string());
853
854        let mut worker_labels = FxHashMap::default();
855        worker_labels.insert("service".to_string(), "worker".to_string());
856
857        store.insert_logs(vec![
858            LogEntry {
859                timestamp_ns: 1000,
860                message: "api log".to_string(),
861                labels: api_labels.clone(),
862                level: None,
863            },
864            LogEntry {
865                timestamp_ns: 2000,
866                message: "worker log".to_string(),
867                labels: worker_labels,
868                level: None,
869            },
870        ]);
871
872        let results = store.query_logs(0, 5000, &api_labels, None, 100);
873        assert_eq!(results.len(), 1);
874        assert_eq!(results[0].message, "api log");
875    }
876
877    #[test]
878    fn test_known_log_labels_sorted() {
879        let store = TelemetryStore::new(StoreConfig::default());
880
881        let mut labels = FxHashMap::default();
882        labels.insert("service".to_string(), "api".to_string());
883        labels.insert("env".to_string(), "prod".to_string());
884        labels.insert("app".to_string(), "web".to_string());
885
886        store.insert_logs(vec![LogEntry {
887            timestamp_ns: 1000,
888            message: "msg".to_string(),
889            labels,
890            level: None,
891        }]);
892
893        let known = store.known_log_labels();
894        assert_eq!(known, vec!["app", "env", "service"]);
895    }
896
897    #[test]
898    fn test_log_query_case_insensitive_contains() {
899        let store = TelemetryStore::new(StoreConfig::default());
900        store.insert_logs(vec![LogEntry {
901            timestamp_ns: 1000,
902            message: "ERROR: Something failed".to_string(),
903            labels: FxHashMap::default(),
904            level: None,
905        }]);
906
907        let results = store.query_logs(0, 5000, &FxHashMap::default(), Some("error"), 100);
908        assert_eq!(results.len(), 1);
909
910        let results = store.query_logs(0, 5000, &FxHashMap::default(), Some("ERROR"), 100);
911        assert_eq!(results.len(), 1);
912    }
913
914    #[test]
915    fn test_log_store_evicts() {
916        let store = TelemetryStore::new(StoreConfig {
917            max_log_entries: 2,
918            ..Default::default()
919        });
920
921        store.insert_logs(vec![
922            LogEntry {
923                timestamp_ns: 1000,
924                message: "first".to_string(),
925                labels: FxHashMap::default(),
926                level: None,
927            },
928            LogEntry {
929                timestamp_ns: 2000,
930                message: "second".to_string(),
931                labels: FxHashMap::default(),
932                level: None,
933            },
934            LogEntry {
935                timestamp_ns: 3000,
936                message: "third".to_string(),
937                labels: FxHashMap::default(),
938                level: None,
939            },
940        ]);
941
942        assert_eq!(store.log_count(), 2);
943        let results = store.query_logs(0, 5000, &FxHashMap::default(), None, 100);
944        assert_eq!(results.len(), 2);
945        // "first" was evicted, should have "second" and "third"
946        assert!(results.iter().any(|e| e.message == "second"));
947        assert!(results.iter().any(|e| e.message == "third"));
948    }
949}