Skip to main content

otlp_embedded/
state.rs

1use std::{collections::BTreeSet, sync::Arc};
2
3use crate::{
4    limiter::MyLimiter,
5    proto::trace::v1::ResourceSpans,
6    trace::{SpanValue, Trace, TraceId},
7};
8use schnellru::LruMap;
9use tokio::sync::RwLock;
10
11/// Configuration for the [`State`].
12///
13/// Either the maximum number of traces or the maximum memory usage
14/// is reached, the oldest traces will be evicted.
15pub struct Config {
16    /// The maximum number of traces to keep.
17    pub max_length: u32,
18
19    /// The maximum memory usage of the traces in bytes.
20    ///
21    /// The memory usage is estimated and the actual value may be higher.
22    pub max_memory_usage: usize,
23}
24
25/// In-memory state that maintains the most recent traces.
26///
27/// Old traces that are no longer updated or accessed will be evicted
28/// when the capacity is reached.
29pub struct State {
30    traces: LruMap<TraceId, Trace, MyLimiter>,
31}
32
33/// A reference to the [`State`].
34pub type StateRef = Arc<RwLock<State>>;
35
36impl State {
37    /// Create a new [`State`] with the given configuration.
38    pub fn new(
39        Config {
40            max_length,
41            max_memory_usage,
42        }: Config,
43    ) -> StateRef {
44        let this = Self {
45            traces: LruMap::new(MyLimiter::new(max_memory_usage, max_length)),
46        };
47
48        Arc::new(RwLock::new(this))
49    }
50
51    fn add_value(&mut self, value: SpanValue) {
52        // Use a pair of `remove` and `insert` to maintain the memory usage correctly.
53        let mut trace = self.traces.remove(&value.span.trace_id).unwrap_or_default();
54        let id = value.span.trace_id.clone();
55        trace.add_value(value);
56
57        self.traces.insert(id, trace);
58    }
59
60    pub(crate) fn apply(&mut self, resource_spans: ResourceSpans) {
61        let ResourceSpans {
62            resource,
63            scope_spans,
64            schema_url: _,
65        } = resource_spans;
66
67        let resource = Arc::new(resource.unwrap_or_default());
68
69        for span in scope_spans.into_iter().flat_map(|s| s.spans) {
70            let value = SpanValue {
71                span: Box::new(span),
72                resource: resource.clone(),
73            };
74            self.add_value(value);
75        }
76    }
77
78    /// Get the number of traces in the state.
79    #[allow(clippy::len_without_is_empty)]
80    pub fn len(&self) -> usize {
81        self.traces.len()
82    }
83
84    /// Get the estimated memory usage of the state in bytes.
85    pub fn estimated_memory_usage(&self) -> usize {
86        self.traces.limiter().estimated_memory_usage()
87    }
88
89    /// Get a trace by its ID.
90    ///
91    /// The trace will be promoted to the most recent.
92    pub fn get_by_id(&mut self, id: &[u8]) -> Option<Trace> {
93        self.traces.get(id).cloned()
94    }
95
96    /// Get an iterator over all traces that are complete.
97    pub fn get_all_complete(&self) -> impl Iterator<Item = Trace> + '_ {
98        self.traces.iter().filter_map(|(_, trace)| {
99            if trace.is_complete() {
100                Some(trace.clone())
101            } else {
102                None
103            }
104        })
105    }
106
107    /// Get a set of all services.
108    pub fn get_all_services(&self) -> BTreeSet<&str> {
109        self.traces
110            .iter()
111            .filter_map(|(_, t)| t.root_span())
112            .map(|v| v.service_name())
113            .collect()
114    }
115
116    /// Get a set of all operations for the given service.
117    pub fn get_operations(&self, service_name: &str) -> BTreeSet<&str> {
118        self.traces
119            .iter()
120            .filter_map(|(_, t)| t.root_span())
121            .filter(|v| v.service_name() == service_name)
122            .map(|v| v.operation())
123            .collect()
124    }
125}