Skip to main content

zelos_trace/
metadata.rs

1use std::{collections::HashMap, sync::Arc};
2
3use arc_swap::ArcSwap;
4use rpds::HashTrieMapSync;
5use uuid::Uuid;
6use zelos_trace_types::ipc;
7
8use crate::segment::TraceSegment;
9
10#[derive(Clone)]
11pub struct TraceMetadata {
12    segments: Arc<ArcSwap<HashTrieMapSync<Uuid, TraceSegment>>>,
13}
14
15impl TraceMetadata {
16    pub fn new() -> Self {
17        Self {
18            segments: Arc::new(ArcSwap::from_pointee(HashTrieMapSync::new_sync())),
19        }
20    }
21
22    pub fn from(msgs: impl IntoIterator<Item = ipc::IpcMessageWithId>) -> Self {
23        let mut segments: HashTrieMapSync<Uuid, TraceSegment> = HashTrieMapSync::new_sync();
24
25        for msg in msgs {
26            if let Some(seg) = segments.get_mut(&msg.segment_id) {
27                match &msg.msg {
28                    ipc::IpcMessage::TraceSegmentStart(_)
29                    | ipc::IpcMessage::TraceSegmentEnd(_)
30                    | ipc::IpcMessage::TraceEventSchema(_)
31                    | ipc::IpcMessage::TraceEventFieldNamedValues(_) => seg.update_mut(&msg.msg),
32                    ipc::IpcMessage::TraceEvent(_) => {
33                        // Do nothing
34                    }
35                }
36            } else {
37                let seg = if let ipc::IpcMessage::TraceSegmentStart(m) = &msg.msg {
38                    TraceSegment::from_ipc(msg.segment_id, m)
39                } else {
40                    TraceSegment::empty(msg.segment_id, msg.source_name)
41                };
42                segments.insert_mut(msg.segment_id, seg.update(&msg.msg));
43            }
44        }
45
46        Self {
47            segments: Arc::new(ArcSwap::from_pointee(segments)),
48        }
49    }
50
51    #[tracing::instrument(level = "trace", skip_all)]
52    pub fn update(&self, msg: &ipc::IpcMessageWithId) {
53        // Early return if we see messages we don't care about
54        if let ipc::IpcMessage::TraceEvent(_) = &msg.msg {
55            return;
56        }
57
58        let segments = self.segments.load();
59        if let Some(seg) = segments.get(&msg.segment_id) {
60            match &msg.msg {
61                ipc::IpcMessage::TraceSegmentStart(_)
62                | ipc::IpcMessage::TraceSegmentEnd(_)
63                | ipc::IpcMessage::TraceEventSchema(_)
64                | ipc::IpcMessage::TraceEventFieldNamedValues(_) => {
65                    let new = segments.insert(msg.segment_id, seg.update(&msg.msg));
66                    self.segments.store(Arc::new(new));
67                }
68                ipc::IpcMessage::TraceEvent(_) => {
69                    // Do nothing
70                }
71            }
72        } else {
73            let seg = if let ipc::IpcMessage::TraceSegmentStart(m) = &msg.msg {
74                TraceSegment::from_ipc(msg.segment_id, m)
75            } else {
76                TraceSegment::empty(msg.segment_id, msg.source_name.clone())
77            };
78            let new = segments.insert(msg.segment_id, seg.update(&msg.msg));
79            self.segments.store(Arc::new(new));
80        }
81    }
82
83    pub fn as_ipc(&self) -> Vec<ipc::IpcMessageWithId> {
84        let segments = self.segments.load();
85        segments
86            .iter()
87            .flat_map(|(id, seg)| {
88                seg.as_ipc()
89                    .into_iter()
90                    .map(move |msg| ipc::IpcMessageWithId {
91                        segment_id: *id,
92                        source_name: seg.source.clone(),
93                        msg,
94                    })
95            })
96            .collect()
97    }
98
99    /// Returns a clone of all trace segments
100    #[tracing::instrument(level = "trace", skip_all)]
101    pub fn segments(&self) -> HashMap<Uuid, TraceSegment> {
102        let segments = self.segments.load();
103        segments
104            .iter()
105            .map(|(id, seg)| (*id, seg.clone()))
106            .collect()
107    }
108
109    /// Returns an iterator of a clone of all trace segments
110    pub fn segments_iter(&self) -> impl Iterator<Item = TraceSegment> {
111        let segments = self.segments.load();
112        segments.values().cloned().collect::<Vec<_>>().into_iter()
113    }
114
115    /// Returns a clone of a single trace segment
116    #[tracing::instrument(level = "trace", skip_all)]
117    pub fn get_segment(&self, id: &Uuid) -> Option<TraceSegment> {
118        let segments = self.segments.load();
119        segments.get(id).cloned()
120    }
121
122    #[tracing::instrument(level = "trace", skip_all)]
123    pub fn remove_segment(&self, id: &Uuid) {
124        let segments = self.segments.load();
125        let new = segments.remove(id);
126        self.segments.store(Arc::new(new));
127    }
128}
129
130impl Default for TraceMetadata {
131    fn default() -> Self {
132        Self::new()
133    }
134}
135
136#[cfg(test)]
137mod test {
138    use chrono::DateTime;
139
140    use super::*;
141
142    #[test]
143    fn test_basic() {
144        let metadata = TraceMetadata::new();
145
146        let segment_id = Uuid::try_parse("0196c84d-6eb8-7c46-83b1-e4cac73ba9b6").unwrap();
147        let source_name = "src";
148        let start = ipc::TraceSegmentStart {
149            time_ns: 0,
150            source_name: source_name.to_string(),
151        };
152        metadata.update(&ipc::IpcMessageWithId {
153            segment_id,
154            source_name: source_name.to_string(),
155            msg: start.into(),
156        });
157
158        {
159            let seg = metadata.get_segment(&segment_id).unwrap();
160            assert_eq!(seg.id, segment_id);
161            assert_eq!(seg.source, "src");
162            assert_eq!(seg.start_time, Some(DateTime::from_timestamp_nanos(0)));
163            assert_eq!(seg.end_time, None);
164        }
165
166        let end = ipc::TraceSegmentEnd { time_ns: 1 };
167        metadata.update(&ipc::IpcMessageWithId {
168            segment_id,
169            source_name: source_name.to_string(),
170            msg: end.into(),
171        });
172
173        {
174            let seg = metadata.get_segment(&segment_id).unwrap();
175            assert_eq!(seg.end_time, Some(DateTime::from_timestamp_nanos(1)));
176        }
177    }
178}