Skip to main content

dial9_trace_format/
schema.rs

1// Schema types and registry
2
3use crate::codec::WireTypeId;
4use crate::types::FieldType;
5use std::collections::HashMap;
6
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct FieldDef {
9    pub name: String,
10    pub field_type: FieldType,
11}
12
13/// Describes the layout of an event type. Does not carry a wire type ID —
14/// the ID is assigned by the encoder and tracked externally by the registry.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct SchemaEntry {
17    pub name: String,
18    /// Whether events of this type carry a packed u24 nanosecond timestamp in the event header.
19    pub has_timestamp: bool,
20    pub fields: Vec<FieldDef>,
21}
22
23#[derive(Debug, Default, Clone)]
24pub struct SchemaRegistry {
25    pub(crate) schemas: HashMap<WireTypeId, SchemaEntry>,
26    pub(crate) next_id: u16,
27}
28
29impl SchemaRegistry {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Register a schema under the given wire type ID.
35    pub fn register(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
36        if let Some(existing) = self.schemas.get(&type_id) {
37            if *existing == entry {
38                return Ok(());
39            }
40            return Err(format!(
41                "type_id {:?} already registered with different schema",
42                type_id
43            ));
44        }
45        self.schemas.insert(type_id, entry);
46        Ok(())
47    }
48
49    pub fn get(&self, type_id: WireTypeId) -> Option<&SchemaEntry> {
50        self.schemas.get(&type_id)
51    }
52
53    pub fn entries(&self) -> impl Iterator<Item = (WireTypeId, &SchemaEntry)> {
54        self.schemas.iter().map(|(&id, entry)| (id, entry))
55    }
56
57    /// Allocate the next wire type ID.
58    pub fn next_type_id(&mut self) -> WireTypeId {
59        let id = WireTypeId(self.next_id);
60        self.next_id += 1;
61        id
62    }
63
64    /// Advance `next_id` past all registered type IDs.
65    ///
66    /// Call this after bulk-inserting schemas (e.g. from a decoded trace) so
67    /// that [`next_type_id`](Self::next_type_id) won't collide.
68    pub fn sync_next_id(&mut self) {
69        for &id in self.schemas.keys() {
70            if id.0 >= self.next_id {
71                self.next_id = id.0 + 1;
72            }
73        }
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[test]
82    fn register_and_lookup() {
83        let mut reg = SchemaRegistry::new();
84        let id = reg.next_type_id();
85        let entry = SchemaEntry {
86            name: "PollStart".into(),
87            has_timestamp: true,
88            fields: vec![
89                FieldDef {
90                    name: "timestamp_ns".into(),
91                    field_type: FieldType::Varint,
92                },
93                FieldDef {
94                    name: "worker".into(),
95                    field_type: FieldType::Varint,
96                },
97            ],
98        };
99        reg.register(id, entry.clone()).unwrap();
100        assert_eq!(reg.get(id), Some(&entry));
101        assert_eq!(reg.get(WireTypeId(99)), None);
102    }
103
104    #[test]
105    fn duplicate_type_id_same_schema_ok() {
106        let mut reg = SchemaRegistry::new();
107        let id = reg.next_type_id();
108        let entry = SchemaEntry {
109            name: "A".into(),
110            has_timestamp: true,
111            fields: vec![],
112        };
113        reg.register(id, entry.clone()).unwrap();
114        reg.register(id, entry).unwrap();
115    }
116
117    #[test]
118    fn duplicate_type_id_different_schema_rejected() {
119        let mut reg = SchemaRegistry::new();
120        let id = reg.next_type_id();
121        reg.register(
122            id,
123            SchemaEntry {
124                name: "A".into(),
125                has_timestamp: true,
126                fields: vec![],
127            },
128        )
129        .unwrap();
130        assert!(
131            reg.register(
132                id,
133                SchemaEntry {
134                    name: "B".into(),
135                    has_timestamp: true,
136                    fields: vec![]
137                }
138            )
139            .is_err()
140        );
141    }
142
143    #[test]
144    fn multiple_schemas() {
145        let mut reg = SchemaRegistry::new();
146        let id1 = reg.next_type_id();
147        reg.register(
148            id1,
149            SchemaEntry {
150                name: "A".into(),
151                has_timestamp: true,
152                fields: vec![],
153            },
154        )
155        .unwrap();
156        let id2 = reg.next_type_id();
157        reg.register(
158            id2,
159            SchemaEntry {
160                name: "B".into(),
161                has_timestamp: true,
162                fields: vec![],
163            },
164        )
165        .unwrap();
166        assert_eq!(reg.entries().count(), 2);
167    }
168
169    #[test]
170    fn next_type_id_auto_increments() {
171        let mut reg = SchemaRegistry::new();
172        let id1 = reg.next_type_id();
173        let id2 = reg.next_type_id();
174        assert_ne!(id1, id2);
175        assert_eq!(id1, WireTypeId(0));
176        assert_eq!(id2, WireTypeId(1));
177    }
178}