Skip to main content

dial9_trace_format/
schema.rs

1//! Schema types describing event layouts.
2//!
3//! A [`SchemaEntry`] defines the name and fields of an event type. The
4//! [`SchemaRegistry`] tracks all registered schemas and assigns wire type IDs.
5
6use crate::codec::WireTypeId;
7use crate::encoder::FxHashMap;
8use crate::types::FieldType;
9
10/// A single field within a schema: a name and a [`FieldType`].
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct FieldDef {
13    /// Field name (e.g. `"worker_id"`).
14    pub name: String,
15    /// Wire type used to encode this field. Optional variants (e.g.
16    /// `FieldType::OptionalPooledString`) indicate the field uses the
17    /// high-bit optional encoding on the wire.
18    pub field_type: FieldType,
19}
20
21/// Describes the layout of an event type. Does not carry a wire type ID —
22/// the ID is assigned by the encoder and tracked externally by the registry.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct SchemaEntry {
25    /// Event type name (e.g. `"PollStart"`).
26    pub name: String,
27    /// Whether events of this type carry a packed u24 nanosecond timestamp in the event header.
28    pub has_timestamp: bool,
29    /// Ordered list of fields (excluding the timestamp, which is in the header).
30    pub fields: Vec<FieldDef>,
31}
32
33#[derive(Debug, Default, Clone)]
34pub struct SchemaRegistry {
35    pub(crate) schemas: FxHashMap<WireTypeId, SchemaEntry>,
36    pub(crate) next_id: u16,
37}
38
39impl SchemaRegistry {
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Resets the schema registry to a blank slate without releasing the allocations
45    pub fn clear(&mut self) {
46        self.next_id = 0;
47        self.schemas.clear();
48    }
49
50    /// Register a schema under the given wire type ID.
51    pub fn register(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
52        if let Some(existing) = self.schemas.get(&type_id) {
53            if *existing == entry {
54                return Ok(());
55            }
56            return Err(format!(
57                "type_id {:?} already registered with different schema",
58                type_id
59            ));
60        }
61        self.schemas.insert(type_id, entry);
62        Ok(())
63    }
64
65    pub fn get(&self, type_id: WireTypeId) -> Option<&SchemaEntry> {
66        self.schemas.get(&type_id)
67    }
68
69    pub fn entries(&self) -> impl Iterator<Item = (WireTypeId, &SchemaEntry)> {
70        self.schemas.iter().map(|(&id, entry)| (id, entry))
71    }
72
73    /// Allocate the next wire type ID.
74    pub fn next_type_id(&mut self) -> WireTypeId {
75        let id = WireTypeId(self.next_id);
76        self.next_id += 1;
77        id
78    }
79
80    /// Advance `next_id` past all registered type IDs.
81    ///
82    /// Call this after bulk-inserting schemas (e.g. from a decoded trace) so
83    /// that [`next_type_id`](Self::next_type_id) won't collide.
84    pub fn sync_next_id(&mut self) {
85        for &id in self.schemas.keys() {
86            if id.0 >= self.next_id {
87                self.next_id = id.0 + 1;
88            }
89        }
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::*;
96
97    #[test]
98    fn register_and_lookup() {
99        let mut reg = SchemaRegistry::new();
100        let id = reg.next_type_id();
101        let entry = SchemaEntry {
102            name: "PollStart".into(),
103            has_timestamp: true,
104            fields: vec![
105                FieldDef {
106                    name: "timestamp_ns".into(),
107                    field_type: FieldType::Varint,
108                },
109                FieldDef {
110                    name: "worker".into(),
111                    field_type: FieldType::Varint,
112                },
113            ],
114        };
115        reg.register(id, entry.clone()).unwrap();
116        assert_eq!(reg.get(id), Some(&entry));
117        assert_eq!(reg.get(WireTypeId(99)), None);
118    }
119
120    #[test]
121    fn duplicate_type_id_same_schema_ok() {
122        let mut reg = SchemaRegistry::new();
123        let id = reg.next_type_id();
124        let entry = SchemaEntry {
125            name: "A".into(),
126            has_timestamp: true,
127            fields: vec![],
128        };
129        reg.register(id, entry.clone()).unwrap();
130        reg.register(id, entry).unwrap();
131    }
132
133    #[test]
134    fn duplicate_type_id_different_schema_rejected() {
135        let mut reg = SchemaRegistry::new();
136        let id = reg.next_type_id();
137        reg.register(
138            id,
139            SchemaEntry {
140                name: "A".into(),
141                has_timestamp: true,
142                fields: vec![],
143            },
144        )
145        .unwrap();
146        assert!(
147            reg.register(
148                id,
149                SchemaEntry {
150                    name: "B".into(),
151                    has_timestamp: true,
152                    fields: vec![]
153                }
154            )
155            .is_err()
156        );
157    }
158
159    #[test]
160    fn multiple_schemas() {
161        let mut reg = SchemaRegistry::new();
162        let id1 = reg.next_type_id();
163        reg.register(
164            id1,
165            SchemaEntry {
166                name: "A".into(),
167                has_timestamp: true,
168                fields: vec![],
169            },
170        )
171        .unwrap();
172        let id2 = reg.next_type_id();
173        reg.register(
174            id2,
175            SchemaEntry {
176                name: "B".into(),
177                has_timestamp: true,
178                fields: vec![],
179            },
180        )
181        .unwrap();
182        assert_eq!(reg.entries().count(), 2);
183    }
184
185    #[test]
186    fn next_type_id_auto_increments() {
187        let mut reg = SchemaRegistry::new();
188        let id1 = reg.next_type_id();
189        let id2 = reg.next_type_id();
190        assert_ne!(id1, id2);
191        assert_eq!(id1, WireTypeId(0));
192        assert_eq!(id2, WireTypeId(1));
193    }
194}