use crate::codec::WireTypeId;
use crate::encoder::FxHashMap;
use crate::types::FieldType;
use std::borrow::Cow;
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FieldAnnotation {
field_index: u16,
key: Cow<'static, str>,
value: Cow<'static, str>,
}
impl FieldAnnotation {
pub fn new(field_index: u16, key: impl Into<String>, value: impl Into<String>) -> Self {
Self {
field_index,
key: Cow::Owned(key.into()),
value: Cow::Owned(value.into()),
}
}
pub fn field_index(&self) -> u16 {
self.field_index
}
pub fn key(&self) -> &str {
&self.key
}
pub fn value(&self) -> &str {
&self.value
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FieldDef {
pub(crate) name: String,
pub(crate) field_type: FieldType,
}
impl FieldDef {
pub fn new(name: impl Into<String>, field_type: FieldType) -> Self {
Self {
name: name.into(),
field_type,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn field_type(&self) -> FieldType {
self.field_type
}
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaEntry {
pub(crate) name: String,
pub(crate) has_timestamp: bool,
pub(crate) fields: Vec<FieldDef>,
pub(crate) annotations: Vec<FieldAnnotation>,
}
impl SchemaEntry {
pub fn new(
name: impl Into<String>,
has_timestamp: bool,
fields: impl IntoIterator<Item = FieldDef>,
) -> Self {
Self {
name: name.into(),
has_timestamp,
fields: fields.into_iter().collect(),
annotations: Vec::new(),
}
}
pub fn with_annotations(
name: impl Into<String>,
has_timestamp: bool,
fields: impl IntoIterator<Item = FieldDef>,
annotations: impl IntoIterator<Item = FieldAnnotation>,
) -> Self {
Self {
name: name.into(),
has_timestamp,
fields: fields.into_iter().collect(),
annotations: annotations.into_iter().collect(),
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn has_timestamp(&self) -> bool {
self.has_timestamp
}
pub fn fields(&self) -> &[FieldDef] {
&self.fields
}
pub fn annotations(&self) -> &[FieldAnnotation] {
&self.annotations
}
}
#[derive(Debug, Default, Clone)]
pub struct SchemaRegistry {
pub(crate) schemas: FxHashMap<WireTypeId, SchemaEntry>,
pub(crate) next_id: u16,
}
impl SchemaRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn clear(&mut self) {
self.next_id = 0;
self.schemas.clear();
}
pub fn register(&mut self, type_id: WireTypeId, entry: SchemaEntry) -> Result<(), String> {
if let Some(existing) = self.schemas.get(&type_id) {
if *existing == entry {
return Ok(());
}
return Err(format!(
"type_id {:?} already registered with different schema",
type_id
));
}
self.schemas.insert(type_id, entry);
Ok(())
}
pub fn get(&self, type_id: WireTypeId) -> Option<&SchemaEntry> {
self.schemas.get(&type_id)
}
pub fn entries(&self) -> impl Iterator<Item = (WireTypeId, &SchemaEntry)> {
self.schemas.iter().map(|(&id, entry)| (id, entry))
}
pub fn next_type_id(&mut self) -> WireTypeId {
let id = WireTypeId(self.next_id);
self.next_id += 1;
id
}
pub fn sync_next_id(&mut self) {
for &id in self.schemas.keys() {
if id.0 >= self.next_id {
self.next_id = id.0 + 1;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn register_and_lookup() {
let mut reg = SchemaRegistry::new();
let id = reg.next_type_id();
let entry = SchemaEntry {
name: "PollStart".into(),
has_timestamp: true,
fields: vec![
FieldDef {
name: "timestamp_ns".into(),
field_type: FieldType::Varint,
},
FieldDef {
name: "worker".into(),
field_type: FieldType::Varint,
},
],
annotations: Vec::new(),
};
reg.register(id, entry.clone()).unwrap();
assert_eq!(reg.get(id), Some(&entry));
assert_eq!(reg.get(WireTypeId(99)), None);
}
#[test]
fn duplicate_type_id_same_schema_ok() {
let mut reg = SchemaRegistry::new();
let id = reg.next_type_id();
let entry = SchemaEntry {
name: "A".into(),
has_timestamp: true,
fields: vec![],
annotations: Vec::new(),
};
reg.register(id, entry.clone()).unwrap();
reg.register(id, entry).unwrap();
}
#[test]
fn duplicate_type_id_different_schema_rejected() {
let mut reg = SchemaRegistry::new();
let id = reg.next_type_id();
reg.register(
id,
SchemaEntry {
name: "A".into(),
has_timestamp: true,
fields: vec![],
annotations: Vec::new(),
},
)
.unwrap();
assert!(
reg.register(
id,
SchemaEntry {
name: "B".into(),
has_timestamp: true,
fields: vec![],
annotations: Vec::new(),
}
)
.is_err()
);
}
#[test]
fn multiple_schemas() {
let mut reg = SchemaRegistry::new();
let id1 = reg.next_type_id();
reg.register(
id1,
SchemaEntry {
name: "A".into(),
has_timestamp: true,
fields: vec![],
annotations: Vec::new(),
},
)
.unwrap();
let id2 = reg.next_type_id();
reg.register(
id2,
SchemaEntry {
name: "B".into(),
has_timestamp: true,
fields: vec![],
annotations: Vec::new(),
},
)
.unwrap();
assert_eq!(reg.entries().count(), 2);
}
#[test]
fn next_type_id_auto_increments() {
let mut reg = SchemaRegistry::new();
let id1 = reg.next_type_id();
let id2 = reg.next_type_id();
assert_ne!(id1, id2);
assert_eq!(id1, WireTypeId(0));
assert_eq!(id2, WireTypeId(1));
}
}