use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use crate::error::SchemaSerdeError;
use crate::registry::RegistryClient;
use crate::subject::{Role, SchemaKind, SubjectStrategy, TopicNameStrategy};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RegisterMode {
AutoRegister,
LookupOnly,
UseLatest,
}
#[derive(Debug, Clone)]
pub struct CacheConfig {
pub mode: RegisterMode,
}
impl Default for CacheConfig {
fn default() -> Self {
Self {
mode: RegisterMode::AutoRegister,
}
}
}
#[derive(Debug, Clone)]
struct Interned {
subject: String,
kind: SchemaKind,
schema: String,
}
#[derive(Default)]
struct Inner {
subject_id: HashMap<String, u32>,
id_schema: HashMap<u32, String>,
interned: Vec<Interned>,
fetching: std::collections::HashSet<u32>,
}
pub struct SchemaCache {
client: RegistryClient,
config: CacheConfig,
strategy: Box<dyn SubjectStrategy>,
inner: Mutex<Inner>,
}
impl std::fmt::Debug for SchemaCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SchemaCache")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
static DEFAULT_REGISTRY: std::sync::OnceLock<Arc<SchemaCache>> = std::sync::OnceLock::new();
pub fn set_default_registry(cache: Arc<SchemaCache>) {
let _ = DEFAULT_REGISTRY.set(cache);
}
#[must_use]
pub fn default_registry() -> Option<Arc<SchemaCache>> {
DEFAULT_REGISTRY.get().cloned()
}
impl SchemaCache {
#[must_use]
pub fn new(client: RegistryClient, config: CacheConfig) -> Arc<Self> {
Arc::new(Self {
client,
config,
strategy: Box::new(TopicNameStrategy),
inner: Mutex::new(Inner::default()),
})
}
#[must_use]
pub fn subject(&self, topic: &str, role: Role) -> String {
self.strategy.subject(topic, role)
}
pub fn intern(&self, subject: &str, kind: SchemaKind, schema: &str) {
let mut g = self.inner.lock().unwrap();
if g.interned.iter().any(|i| i.subject == subject) {
return;
}
g.interned.push(Interned {
subject: subject.to_string(),
kind,
schema: schema.to_string(),
});
}
pub async fn prewarm(&self) -> Result<(), SchemaSerdeError> {
let pending: Vec<Interned> = self.inner.lock().unwrap().interned.clone();
for i in pending {
let id = match self.config.mode {
RegisterMode::AutoRegister => {
self.client.register(&i.subject, i.kind, &i.schema).await?
}
RegisterMode::LookupOnly => {
self.client.lookup(&i.subject, i.kind, &i.schema).await?
}
RegisterMode::UseLatest => self.client.latest_id(&i.subject).await?,
};
let mut g = self.inner.lock().unwrap();
g.subject_id.insert(i.subject.clone(), id);
g.id_schema.insert(id, i.schema.clone());
}
Ok(())
}
#[must_use]
pub fn id_for_subject(&self, subject: &str) -> Option<u32> {
self.inner.lock().unwrap().subject_id.get(subject).copied()
}
pub fn writer_schema(self: &Arc<Self>, id: u32) -> Result<String, SchemaSerdeError> {
{
let mut g = self.inner.lock().unwrap();
if let Some(s) = g.id_schema.get(&id) {
return Ok(s.clone());
}
if g.fetching.insert(id) {
let this = Arc::clone(self);
tokio::spawn(async move {
let fetched = this.client.schema_by_id(id).await;
let mut g = this.inner.lock().unwrap();
g.fetching.remove(&id);
if let Ok(schema) = fetched {
g.id_schema.insert(id, schema);
}
});
}
}
Err(SchemaSerdeError::WriterSchemaPending(id))
}
pub fn seed_writer_schema(&self, id: u32, schema: impl Into<String>) {
self.inner
.lock()
.unwrap()
.id_schema
.insert(id, schema.into());
}
pub fn seed_subject_id(&self, subject: impl Into<String>, id: u32) {
self.inner
.lock()
.unwrap()
.subject_id
.insert(subject.into(), id);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::registry::RegistryClient;
use assert2::check;
fn cache() -> Arc<SchemaCache> {
SchemaCache::new(RegistryClient::new("http://unused"), CacheConfig::default())
}
#[test]
fn intern_is_idempotent_per_subject() {
let c = cache();
c.intern("orders-value", SchemaKind::Avro, "a");
c.intern("orders-value", SchemaKind::Avro, "a");
check!(c.inner.lock().unwrap().interned.len() == 1);
}
#[tokio::test]
async fn seeded_reads_are_synchronous() {
let c = cache();
c.seed_subject_id("orders-value", 42);
c.seed_writer_schema(42, "schema-text");
check!(c.id_for_subject("orders-value") == Some(42));
check!(c.writer_schema(7).is_err());
check!(c.writer_schema(42).unwrap() == "schema-text");
}
#[test]
fn default_mode_is_auto_register() {
check!(CacheConfig::default().mode == RegisterMode::AutoRegister);
}
}