Skip to main content

force_pubsub/
schema_cache.rs

1//! DashMap-backed schema cache for Avro schemas fetched from the Pub/Sub API.
2
3use apache_avro::Schema;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tonic::transport::Channel;
7
8use crate::error::{PubSubError, Result};
9use crate::proto::eventbus_v1::{SchemaRequest, pub_sub_client::PubSubClient};
10
11/// Fetches and caches Avro schemas by schema ID.
12///
13/// The cache is lock-free for reads via `DashMap`. Each schema is fetched
14/// once from the Pub/Sub `GetSchema` RPC and cached for the lifetime of
15/// the handler.
16#[derive(Debug, Clone)]
17pub struct SchemaCache {
18    inner: Arc<SchemaCacheInner>,
19}
20
21#[derive(Debug)]
22struct SchemaCacheInner {
23    cache: DashMap<String, Schema>,
24}
25
26impl SchemaCache {
27    /// Creates a new empty schema cache.
28    #[must_use]
29    pub fn new() -> Self {
30        Self {
31            inner: Arc::new(SchemaCacheInner {
32                cache: DashMap::new(),
33            }),
34        }
35    }
36
37    /// Insert a pre-parsed schema directly.
38    pub fn insert(&self, schema_id: String, schema: Schema) {
39        self.inner.cache.insert(schema_id, schema);
40    }
41
42    /// Returns the number of cached schemas.
43    #[must_use]
44    pub fn len(&self) -> usize {
45        self.inner.cache.len()
46    }
47
48    /// True if no schemas are cached.
49    #[must_use]
50    pub fn is_empty(&self) -> bool {
51        self.inner.cache.is_empty()
52    }
53
54    /// Get a schema by ID if it is already in cache.
55    #[must_use]
56    pub fn get(&self, schema_id: &str) -> Option<Schema> {
57        self.inner.cache.get(schema_id).map(|r| r.value().clone())
58    }
59
60    /// Parse Avro schema JSON and store it in the cache.
61    ///
62    /// Returns the parsed schema. Called after receiving `schema_json` from `GetSchema`.
63    pub fn parse_and_insert(&self, schema_id: String, schema_json: &str) -> Result<Schema> {
64        let schema = Schema::parse_str(schema_json)
65            .map_err(|e| PubSubError::Avro(format!("failed to parse schema {schema_id}: {e}")))?;
66        self.inner.cache.insert(schema_id, schema.clone());
67        Ok(schema)
68    }
69
70    /// Return the schema for `schema_id` from cache, or fetch it via the `GetSchema` RPC.
71    ///
72    /// On a cache miss, calls `GetSchema` using the provided gRPC client and
73    /// authentication metadata. The fetched schema is parsed, inserted into the
74    /// cache, and returned.
75    ///
76    /// # Errors
77    ///
78    /// - [`PubSubError::Transport`] if the `GetSchema` RPC fails (e.g. schema not found).
79    /// - [`PubSubError::Avro`] if the returned schema JSON cannot be parsed.
80    pub async fn get_or_fetch(
81        &self,
82        schema_id: &str,
83        channel: &Channel,
84        metadata: tonic::metadata::MetadataMap,
85    ) -> Result<Schema> {
86        // Fast path: lock-free cache hit.
87        if let Some(schema) = self.get(schema_id) {
88            return Ok(schema);
89        }
90
91        // Cache miss — call GetSchema RPC.
92        let mut req = tonic::Request::new(SchemaRequest {
93            schema_id: schema_id.to_string(),
94        });
95        *req.metadata_mut() = metadata;
96
97        let resp = PubSubClient::new(channel.clone())
98            .get_schema(req)
99            .await?
100            .into_inner();
101
102        self.parse_and_insert(resp.schema_id, &resp.schema_json)
103    }
104}
105
106impl Default for SchemaCache {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112#[cfg(test)]
113#[allow(clippy::unwrap_used, clippy::expect_used)]
114mod tests {
115    use super::*;
116
117    const SIMPLE_SCHEMA_JSON: &str = r#"
118    {
119        "type": "record",
120        "name": "OrderEvent",
121        "fields": [
122            {"name": "order_id", "type": "string"},
123            {"name": "total", "type": "double"}
124        ]
125    }
126    "#;
127
128    #[test]
129    fn test_new_cache_is_empty() {
130        let cache = SchemaCache::new();
131        assert!(cache.is_empty());
132        assert_eq!(cache.len(), 0);
133    }
134
135    #[test]
136    fn test_parse_and_insert() {
137        let cache = SchemaCache::new();
138        cache
139            .parse_and_insert("schema-001".to_string(), SIMPLE_SCHEMA_JSON)
140            .expect("valid schema JSON");
141        assert_eq!(cache.len(), 1);
142        assert!(!cache.is_empty());
143        assert!(cache.get("schema-001").is_some());
144    }
145
146    #[test]
147    fn test_get_miss_returns_none() {
148        let cache = SchemaCache::new();
149        assert!(cache.get("nonexistent").is_none());
150    }
151
152    #[test]
153    fn test_parse_invalid_schema_returns_error() {
154        let cache = SchemaCache::new();
155        let result = cache.parse_and_insert("bad".to_string(), "{ not valid avro }");
156        let Err(err) = result else {
157            panic!("Expected an error");
158        };
159        assert!(matches!(err, PubSubError::Avro(_)));
160        assert!(cache.is_empty());
161    }
162
163    #[test]
164    fn test_cache_is_cloneable_and_shared() {
165        let cache = SchemaCache::new();
166        let cache2 = cache.clone();
167
168        cache
169            .parse_and_insert("shared".to_string(), SIMPLE_SCHEMA_JSON)
170            .expect("valid");
171
172        // Clone shares the same underlying DashMap via Arc
173        assert_eq!(cache2.len(), 1);
174        assert!(cache2.get("shared").is_some());
175    }
176
177    #[test]
178    fn test_default_is_empty() {
179        let cache = SchemaCache::default();
180        assert!(cache.is_empty());
181    }
182}