Skip to main content

data_model/
registry.rs

1use std::collections::HashMap;
2
3use core_types::{ErrorCode, ErrorDomain, RtError};
4
5use crate::compat::CompatibilityPolicy;
6use crate::schema::{SchemaDescriptor, SchemaId, SchemaVersion};
7
8pub trait SchemaRegistry {
9    fn register(&mut self, descriptor: SchemaDescriptor) -> Result<(), RtError>;
10    fn get(&self, id: &SchemaId) -> Option<&SchemaDescriptor>;
11    fn is_compatible(
12        &self,
13        id: &SchemaId,
14        version: SchemaVersion,
15        policy: CompatibilityPolicy,
16    ) -> bool;
17}
18
19#[derive(Clone, Debug, Eq, PartialEq)]
20pub struct CodecDescriptor {
21    pub schema_id: SchemaId,
22    pub name: String,
23    pub zero_copy: bool,
24}
25
26pub trait CodecRegistry {
27    fn register(&mut self, descriptor: CodecDescriptor) -> Result<(), RtError>;
28    fn get(&self, schema_id: &SchemaId) -> Option<&CodecDescriptor>;
29}
30
31#[derive(Default)]
32pub struct SimpleSchemaRegistry {
33    schemas: HashMap<SchemaId, SchemaDescriptor>,
34}
35
36impl SchemaRegistry for SimpleSchemaRegistry {
37    fn register(&mut self, descriptor: SchemaDescriptor) -> Result<(), RtError> {
38        if self.schemas.contains_key(&descriptor.id) {
39            return Err(RtError::new(
40                ErrorCode::InvalidState,
41                ErrorDomain::DataModel,
42                false,
43                "schema already registered",
44            ));
45        }
46        self.schemas.insert(descriptor.id.clone(), descriptor);
47        Ok(())
48    }
49
50    fn get(&self, id: &SchemaId) -> Option<&SchemaDescriptor> {
51        self.schemas.get(id)
52    }
53
54    fn is_compatible(
55        &self,
56        id: &SchemaId,
57        version: SchemaVersion,
58        policy: CompatibilityPolicy,
59    ) -> bool {
60        self.get(id)
61            .map(|descriptor| policy.check(descriptor.version, version))
62            .unwrap_or(false)
63    }
64}
65
66#[derive(Default)]
67pub struct SimpleCodecRegistry {
68    codecs: HashMap<SchemaId, CodecDescriptor>,
69}
70
71impl CodecRegistry for SimpleCodecRegistry {
72    fn register(&mut self, descriptor: CodecDescriptor) -> Result<(), RtError> {
73        if self.codecs.contains_key(&descriptor.schema_id) {
74            return Err(RtError::new(
75                ErrorCode::InvalidState,
76                ErrorDomain::DataModel,
77                false,
78                "codec already registered",
79            ));
80        }
81        self.codecs.insert(descriptor.schema_id.clone(), descriptor);
82        Ok(())
83    }
84
85    fn get(&self, schema_id: &SchemaId) -> Option<&CodecDescriptor> {
86        self.codecs.get(schema_id)
87    }
88}