1use std::{collections::HashMap, sync::Arc};
2
3use arc_swap::ArcSwap;
4use rpds::HashTrieMapSync;
5use uuid::Uuid;
6use zelos_trace_types::ipc;
7
8use crate::segment::TraceSegment;
9
10#[derive(Clone)]
11pub struct TraceMetadata {
12 segments: Arc<ArcSwap<HashTrieMapSync<Uuid, TraceSegment>>>,
13}
14
15impl TraceMetadata {
16 pub fn new() -> Self {
17 Self {
18 segments: Arc::new(ArcSwap::from_pointee(HashTrieMapSync::new_sync())),
19 }
20 }
21
22 pub fn from(msgs: impl IntoIterator<Item = ipc::IpcMessageWithId>) -> Self {
23 let mut segments: HashTrieMapSync<Uuid, TraceSegment> = HashTrieMapSync::new_sync();
24
25 for msg in msgs {
26 if let Some(seg) = segments.get_mut(&msg.segment_id) {
27 match &msg.msg {
28 ipc::IpcMessage::TraceSegmentStart(_)
29 | ipc::IpcMessage::TraceSegmentEnd(_)
30 | ipc::IpcMessage::TraceEventSchema(_)
31 | ipc::IpcMessage::TraceEventFieldNamedValues(_) => seg.update_mut(&msg.msg),
32 ipc::IpcMessage::TraceEvent(_) => {
33 }
35 }
36 } else {
37 let seg = if let ipc::IpcMessage::TraceSegmentStart(m) = &msg.msg {
38 TraceSegment::from_ipc(msg.segment_id, m)
39 } else {
40 TraceSegment::empty(msg.segment_id, msg.source_name)
41 };
42 segments.insert_mut(msg.segment_id, seg.update(&msg.msg));
43 }
44 }
45
46 Self {
47 segments: Arc::new(ArcSwap::from_pointee(segments)),
48 }
49 }
50
51 #[tracing::instrument(level = "trace", skip_all)]
52 pub fn update(&self, msg: &ipc::IpcMessageWithId) {
53 if let ipc::IpcMessage::TraceEvent(_) = &msg.msg {
55 return;
56 }
57
58 let segments = self.segments.load();
59 if let Some(seg) = segments.get(&msg.segment_id) {
60 match &msg.msg {
61 ipc::IpcMessage::TraceSegmentStart(_)
62 | ipc::IpcMessage::TraceSegmentEnd(_)
63 | ipc::IpcMessage::TraceEventSchema(_)
64 | ipc::IpcMessage::TraceEventFieldNamedValues(_) => {
65 let new = segments.insert(msg.segment_id, seg.update(&msg.msg));
66 self.segments.store(Arc::new(new));
67 }
68 ipc::IpcMessage::TraceEvent(_) => {
69 }
71 }
72 } else {
73 let seg = if let ipc::IpcMessage::TraceSegmentStart(m) = &msg.msg {
74 TraceSegment::from_ipc(msg.segment_id, m)
75 } else {
76 TraceSegment::empty(msg.segment_id, msg.source_name.clone())
77 };
78 let new = segments.insert(msg.segment_id, seg.update(&msg.msg));
79 self.segments.store(Arc::new(new));
80 }
81 }
82
83 pub fn as_ipc(&self) -> Vec<ipc::IpcMessageWithId> {
84 let segments = self.segments.load();
85 segments
86 .iter()
87 .flat_map(|(id, seg)| {
88 seg.as_ipc()
89 .into_iter()
90 .map(move |msg| ipc::IpcMessageWithId {
91 segment_id: *id,
92 source_name: seg.source.clone(),
93 msg,
94 })
95 })
96 .collect()
97 }
98
99 #[tracing::instrument(level = "trace", skip_all)]
101 pub fn segments(&self) -> HashMap<Uuid, TraceSegment> {
102 let segments = self.segments.load();
103 segments
104 .iter()
105 .map(|(id, seg)| (*id, seg.clone()))
106 .collect()
107 }
108
109 pub fn segments_iter(&self) -> impl Iterator<Item = TraceSegment> {
111 let segments = self.segments.load();
112 segments.values().cloned().collect::<Vec<_>>().into_iter()
113 }
114
115 #[tracing::instrument(level = "trace", skip_all)]
117 pub fn get_segment(&self, id: &Uuid) -> Option<TraceSegment> {
118 let segments = self.segments.load();
119 segments.get(id).cloned()
120 }
121
122 #[tracing::instrument(level = "trace", skip_all)]
123 pub fn remove_segment(&self, id: &Uuid) {
124 let segments = self.segments.load();
125 let new = segments.remove(id);
126 self.segments.store(Arc::new(new));
127 }
128}
129
130impl Default for TraceMetadata {
131 fn default() -> Self {
132 Self::new()
133 }
134}
135
136#[cfg(test)]
137mod test {
138 use chrono::DateTime;
139
140 use super::*;
141
142 #[test]
143 fn test_basic() {
144 let metadata = TraceMetadata::new();
145
146 let segment_id = Uuid::try_parse("0196c84d-6eb8-7c46-83b1-e4cac73ba9b6").unwrap();
147 let source_name = "src";
148 let start = ipc::TraceSegmentStart {
149 time_ns: 0,
150 source_name: source_name.to_string(),
151 };
152 metadata.update(&ipc::IpcMessageWithId {
153 segment_id,
154 source_name: source_name.to_string(),
155 msg: start.into(),
156 });
157
158 {
159 let seg = metadata.get_segment(&segment_id).unwrap();
160 assert_eq!(seg.id, segment_id);
161 assert_eq!(seg.source, "src");
162 assert_eq!(seg.start_time, Some(DateTime::from_timestamp_nanos(0)));
163 assert_eq!(seg.end_time, None);
164 }
165
166 let end = ipc::TraceSegmentEnd { time_ns: 1 };
167 metadata.update(&ipc::IpcMessageWithId {
168 segment_id,
169 source_name: source_name.to_string(),
170 msg: end.into(),
171 });
172
173 {
174 let seg = metadata.get_segment(&segment_id).unwrap();
175 assert_eq!(seg.end_time, Some(DateTime::from_timestamp_nanos(1)));
176 }
177 }
178}