1use crate::codec::WireTypeId;
7use crate::encoder::FxHashMap;
8use crate::types::FieldType;
9use std::borrow::Cow;
10
11#[non_exhaustive]
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct FieldAnnotation {
19 field_index: u16,
20 key: Cow<'static, str>,
21 value: Cow<'static, str>,
22}
23
24impl FieldAnnotation {
25 pub fn new(field_index: u16, key: impl Into<String>, value: impl Into<String>) -> Self {
27 Self {
28 field_index,
29 key: Cow::Owned(key.into()),
30 value: Cow::Owned(value.into()),
31 }
32 }
33
34 pub fn field_index(&self) -> u16 {
37 self.field_index
38 }
39
40 pub fn key(&self) -> &str {
42 &self.key
43 }
44
45 pub fn value(&self) -> &str {
47 &self.value
48 }
49}
50
51#[non_exhaustive]
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct FieldDef {
55 pub(crate) name: String,
56 pub(crate) field_type: FieldType,
57}
58
59impl FieldDef {
60 pub fn new(name: impl Into<String>, field_type: FieldType) -> Self {
69 Self {
70 name: name.into(),
71 field_type,
72 }
73 }
74
75 pub fn name(&self) -> &str {
77 &self.name
78 }
79
80 pub fn field_type(&self) -> FieldType {
82 self.field_type
83 }
84}
85
86#[non_exhaustive]
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct SchemaEntry {
91 pub(crate) name: String,
92 pub(crate) has_timestamp: bool,
93 pub(crate) fields: Vec<FieldDef>,
94 pub(crate) annotations: Vec<FieldAnnotation>,
95}
96
97impl SchemaEntry {
98 pub fn new(
100 name: impl Into<String>,
101 has_timestamp: bool,
102 fields: impl IntoIterator<Item = FieldDef>,
103 ) -> Self {
104 Self {
105 name: name.into(),
106 has_timestamp,
107 fields: fields.into_iter().collect(),
108 annotations: Vec::new(),
109 }
110 }
111
112 pub fn with_annotations(
114 name: impl Into<String>,
115 has_timestamp: bool,
116 fields: impl IntoIterator<Item = FieldDef>,
117 annotations: impl IntoIterator<Item = FieldAnnotation>,
118 ) -> Self {
119 Self {
120 name: name.into(),
121 has_timestamp,
122 fields: fields.into_iter().collect(),
123 annotations: annotations.into_iter().collect(),
124 }
125 }
126
127 pub fn name(&self) -> &str {
129 &self.name
130 }
131
132 pub fn has_timestamp(&self) -> bool {
134 self.has_timestamp
135 }
136
137 pub fn fields(&self) -> &[FieldDef] {
139 &self.fields
140 }
141
142 pub fn annotations(&self) -> &[FieldAnnotation] {
144 &self.annotations
145 }
146}
147
148#[derive(Debug, Default, Clone)]
149pub struct SchemaRegistry {
150 pub(crate) schemas: FxHashMap<WireTypeId, SchemaEntry>,
151 pub(crate) next_id: u16,
152}
153
154impl SchemaRegistry {
155 pub fn new() -> Self {
156 Self::default()
157 }
158
159 pub fn clear(&mut self) {
161 self.next_id = 0;
162 self.schemas.clear();
163 }
164
165 pub fn register(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
167 if let Some(existing) = self.schemas.get(&type_id) {
168 if *existing == entry {
169 return Ok(());
170 }
171 return Err(format!(
172 "type_id {:?} already registered with different schema",
173 type_id
174 ));
175 }
176 self.schemas.insert(type_id, entry);
177 Ok(())
178 }
179
180 pub fn get(&self, type_id: WireTypeId) -> Option<&SchemaEntry> {
181 self.schemas.get(&type_id)
182 }
183
184 pub fn entries(&self) -> impl Iterator<Item = (WireTypeId, &SchemaEntry)> {
185 self.schemas.iter().map(|(&id, entry)| (id, entry))
186 }
187
188 pub fn next_type_id(&mut self) -> WireTypeId {
190 let id = WireTypeId(self.next_id);
191 self.next_id += 1;
192 id
193 }
194
195 pub fn sync_next_id(&mut self) {
200 for &id in self.schemas.keys() {
201 if id.0 >= self.next_id {
202 self.next_id = id.0 + 1;
203 }
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211
212 #[test]
213 fn register_and_lookup() {
214 let mut reg = SchemaRegistry::new();
215 let id = reg.next_type_id();
216 let entry = SchemaEntry {
217 name: "PollStart".into(),
218 has_timestamp: true,
219 fields: vec![
220 FieldDef {
221 name: "timestamp_ns".into(),
222 field_type: FieldType::Varint,
223 },
224 FieldDef {
225 name: "worker".into(),
226 field_type: FieldType::Varint,
227 },
228 ],
229 annotations: Vec::new(),
230 };
231 reg.register(id, entry.clone()).unwrap();
232 assert_eq!(reg.get(id), Some(&entry));
233 assert_eq!(reg.get(WireTypeId(99)), None);
234 }
235
236 #[test]
237 fn duplicate_type_id_same_schema_ok() {
238 let mut reg = SchemaRegistry::new();
239 let id = reg.next_type_id();
240 let entry = SchemaEntry {
241 name: "A".into(),
242 has_timestamp: true,
243 fields: vec![],
244 annotations: Vec::new(),
245 };
246 reg.register(id, entry.clone()).unwrap();
247 reg.register(id, entry).unwrap();
248 }
249
250 #[test]
251 fn duplicate_type_id_different_schema_rejected() {
252 let mut reg = SchemaRegistry::new();
253 let id = reg.next_type_id();
254 reg.register(
255 id,
256 SchemaEntry {
257 name: "A".into(),
258 has_timestamp: true,
259 fields: vec![],
260 annotations: Vec::new(),
261 },
262 )
263 .unwrap();
264 assert!(
265 reg.register(
266 id,
267 SchemaEntry {
268 name: "B".into(),
269 has_timestamp: true,
270 fields: vec![],
271 annotations: Vec::new(),
272 }
273 )
274 .is_err()
275 );
276 }
277
278 #[test]
279 fn multiple_schemas() {
280 let mut reg = SchemaRegistry::new();
281 let id1 = reg.next_type_id();
282 reg.register(
283 id1,
284 SchemaEntry {
285 name: "A".into(),
286 has_timestamp: true,
287 fields: vec![],
288 annotations: Vec::new(),
289 },
290 )
291 .unwrap();
292 let id2 = reg.next_type_id();
293 reg.register(
294 id2,
295 SchemaEntry {
296 name: "B".into(),
297 has_timestamp: true,
298 fields: vec![],
299 annotations: Vec::new(),
300 },
301 )
302 .unwrap();
303 assert_eq!(reg.entries().count(), 2);
304 }
305
306 #[test]
307 fn next_type_id_auto_increments() {
308 let mut reg = SchemaRegistry::new();
309 let id1 = reg.next_type_id();
310 let id2 = reg.next_type_id();
311 assert_ne!(id1, id2);
312 assert_eq!(id1, WireTypeId(0));
313 assert_eq!(id2, WireTypeId(1));
314 }
315}