1use std::collections::HashMap;
2
3use core_types::{ErrorCode, ErrorDomain, RtError};
4
5use crate::compat::CompatibilityPolicy;
6use crate::schema::{SchemaDescriptor, SchemaId, SchemaVersion};
7
8pub trait SchemaRegistry {
9 fn register(&mut self, descriptor: SchemaDescriptor) -> Result<(), RtError>;
10 fn get(&self, id: &SchemaId) -> Option<&SchemaDescriptor>;
11 fn is_compatible(
12 &self,
13 id: &SchemaId,
14 version: SchemaVersion,
15 policy: CompatibilityPolicy,
16 ) -> bool;
17}
18
19#[derive(Clone, Debug, Eq, PartialEq)]
20pub struct CodecDescriptor {
21 pub schema_id: SchemaId,
22 pub name: String,
23 pub zero_copy: bool,
24}
25
26pub trait CodecRegistry {
27 fn register(&mut self, descriptor: CodecDescriptor) -> Result<(), RtError>;
28 fn get(&self, schema_id: &SchemaId) -> Option<&CodecDescriptor>;
29}
30
31#[derive(Default)]
32pub struct SimpleSchemaRegistry {
33 schemas: HashMap<SchemaId, SchemaDescriptor>,
34}
35
36impl SchemaRegistry for SimpleSchemaRegistry {
37 fn register(&mut self, descriptor: SchemaDescriptor) -> Result<(), RtError> {
38 if self.schemas.contains_key(&descriptor.id) {
39 return Err(RtError::new(
40 ErrorCode::InvalidState,
41 ErrorDomain::DataModel,
42 false,
43 "schema already registered",
44 ));
45 }
46 self.schemas.insert(descriptor.id.clone(), descriptor);
47 Ok(())
48 }
49
50 fn get(&self, id: &SchemaId) -> Option<&SchemaDescriptor> {
51 self.schemas.get(id)
52 }
53
54 fn is_compatible(
55 &self,
56 id: &SchemaId,
57 version: SchemaVersion,
58 policy: CompatibilityPolicy,
59 ) -> bool {
60 self.get(id)
61 .map(|descriptor| policy.check(descriptor.version, version))
62 .unwrap_or(false)
63 }
64}
65
66#[derive(Default)]
67pub struct SimpleCodecRegistry {
68 codecs: HashMap<SchemaId, CodecDescriptor>,
69}
70
71impl CodecRegistry for SimpleCodecRegistry {
72 fn register(&mut self, descriptor: CodecDescriptor) -> Result<(), RtError> {
73 if self.codecs.contains_key(&descriptor.schema_id) {
74 return Err(RtError::new(
75 ErrorCode::InvalidState,
76 ErrorDomain::DataModel,
77 false,
78 "codec already registered",
79 ));
80 }
81 self.codecs.insert(descriptor.schema_id.clone(), descriptor);
82 Ok(())
83 }
84
85 fn get(&self, schema_id: &SchemaId) -> Option<&CodecDescriptor> {
86 self.codecs.get(schema_id)
87 }
88}