crabka_schema_serde/
cache.rs1use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6
7use crate::error::SchemaSerdeError;
8use crate::registry::RegistryClient;
9use crate::subject::{Role, SchemaKind, SubjectStrategy, TopicNameStrategy};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum RegisterMode {
14 AutoRegister,
16 LookupOnly,
18 UseLatest,
20}
21
22#[derive(Debug, Clone)]
24pub struct CacheConfig {
25 pub mode: RegisterMode,
26}
27
28impl Default for CacheConfig {
29 fn default() -> Self {
30 Self {
31 mode: RegisterMode::AutoRegister,
32 }
33 }
34}
35
36#[derive(Debug, Clone)]
38struct Interned {
39 subject: String,
40 kind: SchemaKind,
41 schema: String,
42}
43
44#[derive(Default)]
45struct Inner {
46 subject_id: HashMap<String, u32>,
48 id_schema: HashMap<u32, String>,
50 interned: Vec<Interned>,
52 fetching: std::collections::HashSet<u32>,
54}
55
56pub struct SchemaCache {
58 client: RegistryClient,
59 config: CacheConfig,
60 strategy: Box<dyn SubjectStrategy>,
61 inner: Mutex<Inner>,
62}
63
64impl std::fmt::Debug for SchemaCache {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 f.debug_struct("SchemaCache")
67 .field("config", &self.config)
68 .finish_non_exhaustive()
69 }
70}
71
72static DEFAULT_REGISTRY: std::sync::OnceLock<Arc<SchemaCache>> = std::sync::OnceLock::new();
77
78pub fn set_default_registry(cache: Arc<SchemaCache>) {
81 let _ = DEFAULT_REGISTRY.set(cache);
82}
83
84#[must_use]
86pub fn default_registry() -> Option<Arc<SchemaCache>> {
87 DEFAULT_REGISTRY.get().cloned()
88}
89
90impl SchemaCache {
91 #[must_use]
93 pub fn new(client: RegistryClient, config: CacheConfig) -> Arc<Self> {
94 Arc::new(Self {
95 client,
96 config,
97 strategy: Box::new(TopicNameStrategy),
98 inner: Mutex::new(Inner::default()),
99 })
100 }
101
102 #[must_use]
104 pub fn subject(&self, topic: &str, role: Role) -> String {
105 self.strategy.subject(topic, role)
106 }
107
108 pub fn intern(&self, subject: &str, kind: SchemaKind, schema: &str) {
110 let mut g = self.inner.lock().unwrap();
111 if g.interned.iter().any(|i| i.subject == subject) {
112 return;
113 }
114 g.interned.push(Interned {
115 subject: subject.to_string(),
116 kind,
117 schema: schema.to_string(),
118 });
119 }
120
121 pub async fn prewarm(&self) -> Result<(), SchemaSerdeError> {
124 let pending: Vec<Interned> = self.inner.lock().unwrap().interned.clone();
125 for i in pending {
126 let id = match self.config.mode {
127 RegisterMode::AutoRegister => {
128 self.client.register(&i.subject, i.kind, &i.schema).await?
129 }
130 RegisterMode::LookupOnly => {
131 self.client.lookup(&i.subject, i.kind, &i.schema).await?
132 }
133 RegisterMode::UseLatest => self.client.latest_id(&i.subject).await?,
134 };
135 let mut g = self.inner.lock().unwrap();
136 g.subject_id.insert(i.subject.clone(), id);
137 g.id_schema.insert(id, i.schema.clone());
138 }
139 Ok(())
140 }
141
142 #[must_use]
145 pub fn id_for_subject(&self, subject: &str) -> Option<u32> {
146 self.inner.lock().unwrap().subject_id.get(subject).copied()
147 }
148
149 pub fn writer_schema(self: &Arc<Self>, id: u32) -> Result<String, SchemaSerdeError> {
152 {
153 let mut g = self.inner.lock().unwrap();
154 if let Some(s) = g.id_schema.get(&id) {
155 return Ok(s.clone());
156 }
157 if g.fetching.insert(id) {
158 let this = Arc::clone(self);
159 tokio::spawn(async move {
160 let fetched = this.client.schema_by_id(id).await;
161 let mut g = this.inner.lock().unwrap();
162 g.fetching.remove(&id);
163 if let Ok(schema) = fetched {
164 g.id_schema.insert(id, schema);
165 }
166 });
167 }
168 }
169 Err(SchemaSerdeError::WriterSchemaPending(id))
170 }
171
172 pub fn seed_writer_schema(&self, id: u32, schema: impl Into<String>) {
174 self.inner
175 .lock()
176 .unwrap()
177 .id_schema
178 .insert(id, schema.into());
179 }
180
181 pub fn seed_subject_id(&self, subject: impl Into<String>, id: u32) {
183 self.inner
184 .lock()
185 .unwrap()
186 .subject_id
187 .insert(subject.into(), id);
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::registry::RegistryClient;
195 use assert2::check;
196
197 fn cache() -> Arc<SchemaCache> {
198 SchemaCache::new(RegistryClient::new("http://unused"), CacheConfig::default())
199 }
200
201 #[test]
202 fn intern_is_idempotent_per_subject() {
203 let c = cache();
204 c.intern("orders-value", SchemaKind::Avro, "a");
205 c.intern("orders-value", SchemaKind::Avro, "a");
206 check!(c.inner.lock().unwrap().interned.len() == 1);
207 }
208
209 #[tokio::test]
210 async fn seeded_reads_are_synchronous() {
211 let c = cache();
212 c.seed_subject_id("orders-value", 42);
213 c.seed_writer_schema(42, "schema-text");
214 check!(c.id_for_subject("orders-value") == Some(42));
215 check!(c.writer_schema(7).is_err());
217 check!(c.writer_schema(42).unwrap() == "schema-text");
218 }
219
220 #[test]
221 fn default_mode_is_auto_register() {
222 check!(CacheConfig::default().mode == RegisterMode::AutoRegister);
223 }
224}