Skip to main content

force_pubsub/
schema_cache.rs

1//! DashMap-backed schema cache for Avro schemas fetched from the Pub/Sub API.
2
3use apache_avro::Schema;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tonic::transport::Channel;
7
8use crate::error::{PubSubError, Result};
9use crate::proto::eventbus_v1::{SchemaRequest, pub_sub_client::PubSubClient};
10
11/// Fetches and caches Avro schemas by schema ID.
12///
13/// The cache is lock-free for reads via `DashMap`. Each schema is fetched
14/// once from the Pub/Sub `GetSchema` RPC and cached for the lifetime of
15/// the handler.
16#[derive(Debug, Clone)]
17pub struct SchemaCache {
18    inner: Arc<SchemaCacheInner>,
19}
20
21#[derive(Debug)]
22struct SchemaCacheInner {
23    cache: DashMap<String, Schema>,
24}
25
26impl SchemaCache {
27    /// Creates a new empty schema cache.
28    #[must_use]
29    pub fn new() -> Self {
30        Self {
31            inner: Arc::new(SchemaCacheInner {
32                cache: DashMap::new(),
33            }),
34        }
35    }
36
37    /// Insert a pre-parsed schema directly.
38    pub fn insert(&self, schema_id: String, schema: Schema) {
39        self.inner.cache.insert(schema_id, schema);
40    }
41
42    /// Returns the number of cached schemas.
43    #[must_use]
44    pub fn len(&self) -> usize {
45        self.inner.cache.len()
46    }
47
48    /// True if no schemas are cached.
49    #[must_use]
50    pub fn is_empty(&self) -> bool {
51        self.inner.cache.is_empty()
52    }
53
54    /// Get a schema by ID if it is already in cache.
55    #[must_use]
56    pub fn get(&self, schema_id: &str) -> Option<Schema> {
57        self.inner.cache.get(schema_id).map(|r| r.value().clone())
58    }
59
60    /// Parse Avro schema JSON and store it in the cache.
61    ///
62    /// Returns the parsed schema. Called after receiving `schema_json` from `GetSchema`.
63    pub fn parse_and_insert(&self, schema_id: String, schema_json: &str) -> Result<Schema> {
64        let schema = Schema::parse_str(schema_json)
65            .map_err(|e| PubSubError::Avro(format!("failed to parse schema {schema_id}: {e}")))?;
66        self.inner.cache.insert(schema_id, schema.clone());
67        Ok(schema)
68    }
69
70    /// Return the schema for `schema_id` from cache, or fetch it via the `GetSchema` RPC.
71    ///
72    /// On a cache miss, calls `GetSchema` using the provided gRPC client and
73    /// authentication metadata. The fetched schema is parsed, inserted into the
74    /// cache, and returned.
75    ///
76    /// # Errors
77    ///
78    /// - [`PubSubError::Transport`] if the `GetSchema` RPC fails (e.g. schema not found).
79    /// - [`PubSubError::Avro`] if the returned schema JSON cannot be parsed.
80    pub async fn get_or_fetch(
81        &self,
82        schema_id: &str,
83        channel: &Channel,
84        metadata: tonic::metadata::MetadataMap,
85    ) -> Result<Schema> {
86        // Fast path: lock-free cache hit.
87        if let Some(schema) = self.get(schema_id) {
88            return Ok(schema);
89        }
90
91        // Cache miss — call GetSchema RPC.
92        let mut req = tonic::Request::new(SchemaRequest {
93            schema_id: schema_id.to_string(),
94        });
95        *req.metadata_mut() = metadata;
96
97        let resp = PubSubClient::new(channel.clone())
98            .get_schema(req)
99            .await?
100            .into_inner();
101
102        self.parse_and_insert(resp.schema_id, &resp.schema_json)
103    }
104}
105
106impl Default for SchemaCache {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    const SIMPLE_SCHEMA_JSON: &str = r#"
117    {
118        "type": "record",
119        "name": "OrderEvent",
120        "fields": [
121            {"name": "order_id", "type": "string"},
122            {"name": "total", "type": "double"}
123        ]
124    }
125    "#;
126
127    #[test]
128    fn test_new_cache_is_empty() {
129        let cache = SchemaCache::new();
130        assert!(cache.is_empty());
131        assert_eq!(cache.len(), 0);
132    }
133
134    #[test]
135    fn test_parse_and_insert() {
136        let cache = SchemaCache::new();
137        let Ok(_) = cache.parse_and_insert("schema-001".to_string(), SIMPLE_SCHEMA_JSON) else {
138            panic!("valid schema JSON")
139        };
140        assert_eq!(cache.len(), 1);
141        assert!(!cache.is_empty());
142        assert!(cache.get("schema-001").is_some());
143    }
144
145    #[test]
146    fn test_get_miss_returns_none() {
147        let cache = SchemaCache::new();
148        assert!(cache.get("nonexistent").is_none());
149    }
150
151    #[test]
152    fn test_parse_invalid_schema_returns_error() {
153        let cache = SchemaCache::new();
154        let result = cache.parse_and_insert("bad".to_string(), "{ not valid avro }");
155        let Err(err) = result else {
156            panic!("Expected an error");
157        };
158        assert!(matches!(err, PubSubError::Avro(_)));
159        assert!(cache.is_empty());
160    }
161
162    #[test]
163    fn test_cache_is_cloneable_and_shared() {
164        let cache = SchemaCache::new();
165        let cache2 = cache.clone();
166
167        let Ok(_) = cache.parse_and_insert("shared".to_string(), SIMPLE_SCHEMA_JSON) else {
168            panic!("valid")
169        };
170
171        // Clone shares the same underlying DashMap via Arc
172        assert_eq!(cache2.len(), 1);
173        assert!(cache2.get("shared").is_some());
174    }
175
176    #[test]
177    fn test_default_is_empty() {
178        let cache = SchemaCache::default();
179        assert!(cache.is_empty());
180    }
181}