Skip to main content

dial9_trace_format/
schema.rs

1//! Schema types describing event layouts.
2//!
3//! A [`SchemaEntry`] defines the name and fields of an event type. The
4//! [`SchemaRegistry`] tracks all registered schemas and assigns wire type IDs.
5
6use crate::codec::WireTypeId;
7use crate::encoder::FxHashMap;
8use crate::types::FieldType;
9use std::borrow::Cow;
10
11/// A per-field annotation carrying arbitrary key-value metadata.
12///
13/// Annotations are emitted in a separate frame (`TAG_SCHEMA_ANNOTATIONS`)
14/// after the schema frame they belong to. They carry metadata such as units,
15/// display hints, or semantic-convention labels.
16#[non_exhaustive]
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct FieldAnnotation {
19    field_index: u16,
20    key: Cow<'static, str>,
21    value: Cow<'static, str>,
22}
23
24impl FieldAnnotation {
25    /// Create a new field annotation.
26    pub fn new(field_index: u16, key: impl Into<String>, value: impl Into<String>) -> Self {
27        Self {
28            field_index,
29            key: Cow::Owned(key.into()),
30            value: Cow::Owned(value.into()),
31        }
32    }
33
34    /// Index of the field this annotation applies to (0-based, matching the
35    /// field order in [`SchemaEntry::fields`]).
36    pub fn field_index(&self) -> u16 {
37        self.field_index
38    }
39
40    /// Annotation key (e.g. `"metrique.unit"`).
41    pub fn key(&self) -> &str {
42        &self.key
43    }
44
45    /// Annotation value (e.g. `"microseconds"`).
46    pub fn value(&self) -> &str {
47        &self.value
48    }
49}
50
51/// A single field within a schema: a name and a [`FieldType`].
52#[non_exhaustive]
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FieldDef {
55    pub(crate) name: String,
56    pub(crate) field_type: FieldType,
57}
58
59impl FieldDef {
60    /// Construct a field definition with the given name and type.
61    ///
62    /// ```
63    /// # use dial9_trace_format::schema::FieldDef;
64    /// # use dial9_trace_format::types::FieldType;
65    /// FieldDef::new("worker_id", FieldType::Varint);
66    /// FieldDef::new("tags", FieldType::DynamicList);
67    /// ```
68    pub fn new(name: impl Into<String>, field_type: FieldType) -> Self {
69        Self {
70            name: name.into(),
71            field_type,
72        }
73    }
74
75    /// Field name (e.g. `"worker_id"`).
76    pub fn name(&self) -> &str {
77        &self.name
78    }
79
80    /// Wire type used to encode this field.
81    pub fn field_type(&self) -> FieldType {
82        self.field_type
83    }
84}
85
86/// Describes the layout of an event type. Does not carry a wire type ID —
87/// the ID is assigned by the encoder and tracked externally by the registry.
88#[non_exhaustive]
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct SchemaEntry {
91    pub(crate) name: String,
92    pub(crate) has_timestamp: bool,
93    pub(crate) fields: Vec<FieldDef>,
94    pub(crate) annotations: Vec<FieldAnnotation>,
95}
96
97impl SchemaEntry {
98    /// Construct a new schema entry.
99    pub fn new(
100        name: impl Into<String>,
101        has_timestamp: bool,
102        fields: impl IntoIterator<Item = FieldDef>,
103    ) -> Self {
104        Self {
105            name: name.into(),
106            has_timestamp,
107            fields: fields.into_iter().collect(),
108            annotations: Vec::new(),
109        }
110    }
111
112    /// Construct a schema entry with annotations.
113    pub fn with_annotations(
114        name: impl Into<String>,
115        has_timestamp: bool,
116        fields: impl IntoIterator<Item = FieldDef>,
117        annotations: impl IntoIterator<Item = FieldAnnotation>,
118    ) -> Self {
119        Self {
120            name: name.into(),
121            has_timestamp,
122            fields: fields.into_iter().collect(),
123            annotations: annotations.into_iter().collect(),
124        }
125    }
126
127    /// Event type name (e.g. `"PollStart"`).
128    pub fn name(&self) -> &str {
129        &self.name
130    }
131
132    /// Whether events of this type carry a packed timestamp in the event header.
133    pub fn has_timestamp(&self) -> bool {
134        self.has_timestamp
135    }
136
137    /// Ordered list of fields (excluding the timestamp).
138    pub fn fields(&self) -> &[FieldDef] {
139        &self.fields
140    }
141
142    /// Per-field annotations.
143    pub fn annotations(&self) -> &[FieldAnnotation] {
144        &self.annotations
145    }
146}
147
148#[derive(Debug, Default, Clone)]
149pub struct SchemaRegistry {
150    pub(crate) schemas: FxHashMap<WireTypeId, SchemaEntry>,
151    pub(crate) next_id: u16,
152}
153
154impl SchemaRegistry {
155    pub fn new() -> Self {
156        Self::default()
157    }
158
159    /// Resets the schema registry to a blank slate without releasing the allocations
160    pub fn clear(&mut self) {
161        self.next_id = 0;
162        self.schemas.clear();
163    }
164
165    /// Register a schema under the given wire type ID.
166    pub fn register(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
167        if let Some(existing) = self.schemas.get(&type_id) {
168            if *existing == entry {
169                return Ok(());
170            }
171            return Err(format!(
172                "type_id {:?} already registered with different schema",
173                type_id
174            ));
175        }
176        self.schemas.insert(type_id, entry);
177        Ok(())
178    }
179
180    pub fn get(&self, type_id: WireTypeId) -> Option<&SchemaEntry> {
181        self.schemas.get(&type_id)
182    }
183
184    pub fn entries(&self) -> impl Iterator<Item = (WireTypeId, &SchemaEntry)> {
185        self.schemas.iter().map(|(&id, entry)| (id, entry))
186    }
187
188    /// Allocate the next wire type ID.
189    pub fn next_type_id(&mut self) -> WireTypeId {
190        let id = WireTypeId(self.next_id);
191        self.next_id += 1;
192        id
193    }
194
195    /// Advance `next_id` past all registered type IDs.
196    ///
197    /// Call this after bulk-inserting schemas (e.g. from a decoded trace) so
198    /// that [`next_type_id`](Self::next_type_id) won't collide.
199    pub fn sync_next_id(&mut self) {
200        for &id in self.schemas.keys() {
201            if id.0 >= self.next_id {
202                self.next_id = id.0 + 1;
203            }
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211
212    #[test]
213    fn register_and_lookup() {
214        let mut reg = SchemaRegistry::new();
215        let id = reg.next_type_id();
216        let entry = SchemaEntry {
217            name: "PollStart".into(),
218            has_timestamp: true,
219            fields: vec![
220                FieldDef {
221                    name: "timestamp_ns".into(),
222                    field_type: FieldType::Varint,
223                },
224                FieldDef {
225                    name: "worker".into(),
226                    field_type: FieldType::Varint,
227                },
228            ],
229            annotations: Vec::new(),
230        };
231        reg.register(id, entry.clone()).unwrap();
232        assert_eq!(reg.get(id), Some(&entry));
233        assert_eq!(reg.get(WireTypeId(99)), None);
234    }
235
236    #[test]
237    fn duplicate_type_id_same_schema_ok() {
238        let mut reg = SchemaRegistry::new();
239        let id = reg.next_type_id();
240        let entry = SchemaEntry {
241            name: "A".into(),
242            has_timestamp: true,
243            fields: vec![],
244            annotations: Vec::new(),
245        };
246        reg.register(id, entry.clone()).unwrap();
247        reg.register(id, entry).unwrap();
248    }
249
250    #[test]
251    fn duplicate_type_id_different_schema_rejected() {
252        let mut reg = SchemaRegistry::new();
253        let id = reg.next_type_id();
254        reg.register(
255            id,
256            SchemaEntry {
257                name: "A".into(),
258                has_timestamp: true,
259                fields: vec![],
260                annotations: Vec::new(),
261            },
262        )
263        .unwrap();
264        assert!(
265            reg.register(
266                id,
267                SchemaEntry {
268                    name: "B".into(),
269                    has_timestamp: true,
270                    fields: vec![],
271                    annotations: Vec::new(),
272                }
273            )
274            .is_err()
275        );
276    }
277
278    #[test]
279    fn multiple_schemas() {
280        let mut reg = SchemaRegistry::new();
281        let id1 = reg.next_type_id();
282        reg.register(
283            id1,
284            SchemaEntry {
285                name: "A".into(),
286                has_timestamp: true,
287                fields: vec![],
288                annotations: Vec::new(),
289            },
290        )
291        .unwrap();
292        let id2 = reg.next_type_id();
293        reg.register(
294            id2,
295            SchemaEntry {
296                name: "B".into(),
297                has_timestamp: true,
298                fields: vec![],
299                annotations: Vec::new(),
300            },
301        )
302        .unwrap();
303        assert_eq!(reg.entries().count(), 2);
304    }
305
306    #[test]
307    fn next_type_id_auto_increments() {
308        let mut reg = SchemaRegistry::new();
309        let id1 = reg.next_type_id();
310        let id2 = reg.next_type_id();
311        assert_ne!(id1, id2);
312        assert_eq!(id1, WireTypeId(0));
313        assert_eq!(id2, WireTypeId(1));
314    }
315}