Skip to main content

crabka_schema_serde/
cache.rs

1//! Shared, background-refreshed schema cache. Hot-path reads are synchronous;
2//! registry I/O happens at pre-warm and on background fetches.
3
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex};
6
7use crate::error::SchemaSerdeError;
8use crate::registry::RegistryClient;
9use crate::subject::{Role, SchemaKind, SubjectStrategy, TopicNameStrategy};
10
11/// How serialize-side ids are resolved.
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum RegisterMode {
14    /// Register the local schema on pre-warm (Confluent default).
15    AutoRegister,
16    /// Look up the local schema's id; never register.
17    LookupOnly,
18    /// Use the latest registered version's id for the subject.
19    UseLatest,
20}
21
22/// Cache configuration.
23#[derive(Debug, Clone)]
24pub struct CacheConfig {
25    pub mode: RegisterMode,
26}
27
28impl Default for CacheConfig {
29    fn default() -> Self {
30        Self {
31            mode: RegisterMode::AutoRegister,
32        }
33    }
34}
35
36/// An interned local schema awaiting pre-warm resolution.
37#[derive(Debug, Clone)]
38struct Interned {
39    subject: String,
40    kind: SchemaKind,
41    schema: String,
42}
43
44#[derive(Default)]
45struct Inner {
46    /// subject ⇒ resolved id (serialize path).
47    subject_id: HashMap<String, u32>,
48    /// id ⇒ writer schema text (deserialize path).
49    id_schema: HashMap<u32, String>,
50    /// Local schemas to resolve on pre-warm.
51    interned: Vec<Interned>,
52    /// ids whose fetch is in flight (dedup background fetches).
53    fetching: std::collections::HashSet<u32>,
54}
55
56/// `Arc`-shared cache wiring serdes to a registry.
57pub struct SchemaCache {
58    client: RegistryClient,
59    config: CacheConfig,
60    strategy: Box<dyn SubjectStrategy>,
61    inner: Mutex<Inner>,
62}
63
64impl std::fmt::Debug for SchemaCache {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.debug_struct("SchemaCache")
67            .field("config", &self.config)
68            .finish_non_exhaustive()
69    }
70}
71
72/// Process-wide default registry cache, read by the `Default` impls of the
73/// format serdes (so a type can declare a default schema serde without
74/// threading a cache to the call site — analogous to Confluent serdes reading
75/// `schema.registry.url` from config). Set once at application startup.
76static DEFAULT_REGISTRY: std::sync::OnceLock<Arc<SchemaCache>> = std::sync::OnceLock::new();
77
78/// Install the process-wide default registry cache (first call wins). Required
79/// before constructing any default (`Default::default()`) format serde.
80pub fn set_default_registry(cache: Arc<SchemaCache>) {
81    let _ = DEFAULT_REGISTRY.set(cache);
82}
83
84/// The process-wide default registry cache, if [`set_default_registry`] ran.
85#[must_use]
86pub fn default_registry() -> Option<Arc<SchemaCache>> {
87    DEFAULT_REGISTRY.get().cloned()
88}
89
90impl SchemaCache {
91    /// Build a cache from a registry client and config, using `TopicNameStrategy`.
92    #[must_use]
93    pub fn new(client: RegistryClient, config: CacheConfig) -> Arc<Self> {
94        Arc::new(Self {
95            client,
96            config,
97            strategy: Box::new(TopicNameStrategy),
98            inner: Mutex::new(Inner::default()),
99        })
100    }
101
102    /// Resolve the subject for `(topic, role)` under the active strategy.
103    #[must_use]
104    pub fn subject(&self, topic: &str, role: Role) -> String {
105        self.strategy.subject(topic, role)
106    }
107
108    /// Register a local `(subject, kind, schema)` for pre-warm. Idempotent.
109    pub fn intern(&self, subject: &str, kind: SchemaKind, schema: &str) {
110        let mut g = self.inner.lock().unwrap();
111        if g.interned.iter().any(|i| i.subject == subject) {
112            return;
113        }
114        g.interned.push(Interned {
115            subject: subject.to_string(),
116            kind,
117            schema: schema.to_string(),
118        });
119    }
120
121    /// Resolve every interned subject's id (register/lookup/latest per mode).
122    /// Called once at client/membership start.
123    pub async fn prewarm(&self) -> Result<(), SchemaSerdeError> {
124        let pending: Vec<Interned> = self.inner.lock().unwrap().interned.clone();
125        for i in pending {
126            let id = match self.config.mode {
127                RegisterMode::AutoRegister => {
128                    self.client.register(&i.subject, i.kind, &i.schema).await?
129                }
130                RegisterMode::LookupOnly => {
131                    self.client.lookup(&i.subject, i.kind, &i.schema).await?
132                }
133                RegisterMode::UseLatest => self.client.latest_id(&i.subject).await?,
134            };
135            let mut g = self.inner.lock().unwrap();
136            g.subject_id.insert(i.subject.clone(), id);
137            g.id_schema.insert(id, i.schema.clone());
138        }
139        Ok(())
140    }
141
142    /// Synchronous hot-path read: the id bound to `subject`, or `None` if
143    /// pre-warm has not resolved it.
144    #[must_use]
145    pub fn id_for_subject(&self, subject: &str) -> Option<u32> {
146        self.inner.lock().unwrap().subject_id.get(subject).copied()
147    }
148
149    /// Synchronous hot-path read of a writer schema by id. On a miss, spawns a
150    /// background fetch and returns `WriterSchemaPending` (retriable).
151    pub fn writer_schema(self: &Arc<Self>, id: u32) -> Result<String, SchemaSerdeError> {
152        {
153            let mut g = self.inner.lock().unwrap();
154            if let Some(s) = g.id_schema.get(&id) {
155                return Ok(s.clone());
156            }
157            if g.fetching.insert(id) {
158                let this = Arc::clone(self);
159                tokio::spawn(async move {
160                    let fetched = this.client.schema_by_id(id).await;
161                    let mut g = this.inner.lock().unwrap();
162                    g.fetching.remove(&id);
163                    if let Ok(schema) = fetched {
164                        g.id_schema.insert(id, schema);
165                    }
166                });
167            }
168        }
169        Err(SchemaSerdeError::WriterSchemaPending(id))
170    }
171
172    /// Test/seed hook: install an id→schema mapping directly.
173    pub fn seed_writer_schema(&self, id: u32, schema: impl Into<String>) {
174        self.inner
175            .lock()
176            .unwrap()
177            .id_schema
178            .insert(id, schema.into());
179    }
180
181    /// Test/seed hook: install a subject→id mapping directly.
182    pub fn seed_subject_id(&self, subject: impl Into<String>, id: u32) {
183        self.inner
184            .lock()
185            .unwrap()
186            .subject_id
187            .insert(subject.into(), id);
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::registry::RegistryClient;
195    use assert2::check;
196
197    fn cache() -> Arc<SchemaCache> {
198        SchemaCache::new(RegistryClient::new("http://unused"), CacheConfig::default())
199    }
200
201    #[test]
202    fn intern_is_idempotent_per_subject() {
203        let c = cache();
204        c.intern("orders-value", SchemaKind::Avro, "a");
205        c.intern("orders-value", SchemaKind::Avro, "a");
206        check!(c.inner.lock().unwrap().interned.len() == 1);
207    }
208
209    #[tokio::test]
210    async fn seeded_reads_are_synchronous() {
211        let c = cache();
212        c.seed_subject_id("orders-value", 42);
213        c.seed_writer_schema(42, "schema-text");
214        check!(c.id_for_subject("orders-value") == Some(42));
215        // Unknown id ⇒ pending (spawns a background fetch; needs a runtime).
216        check!(c.writer_schema(7).is_err());
217        check!(c.writer_schema(42).unwrap() == "schema-text");
218    }
219
220    #[test]
221    fn default_mode_is_auto_register() {
222        check!(CacheConfig::default().mode == RegisterMode::AutoRegister);
223    }
224}