force_pubsub/
schema_cache.rs1use apache_avro::Schema;
4use dashmap::DashMap;
5use std::sync::Arc;
6use tonic::transport::Channel;
7
8use crate::error::{PubSubError, Result};
9use crate::proto::eventbus_v1::{SchemaRequest, pub_sub_client::PubSubClient};
10
11#[derive(Debug, Clone)]
17pub struct SchemaCache {
18 inner: Arc<SchemaCacheInner>,
19}
20
21#[derive(Debug)]
22struct SchemaCacheInner {
23 cache: DashMap<String, Schema>,
24}
25
26impl SchemaCache {
27 #[must_use]
29 pub fn new() -> Self {
30 Self {
31 inner: Arc::new(SchemaCacheInner {
32 cache: DashMap::new(),
33 }),
34 }
35 }
36
37 pub fn insert(&self, schema_id: String, schema: Schema) {
39 self.inner.cache.insert(schema_id, schema);
40 }
41
42 #[must_use]
44 pub fn len(&self) -> usize {
45 self.inner.cache.len()
46 }
47
48 #[must_use]
50 pub fn is_empty(&self) -> bool {
51 self.inner.cache.is_empty()
52 }
53
54 #[must_use]
56 pub fn get(&self, schema_id: &str) -> Option<Schema> {
57 self.inner.cache.get(schema_id).map(|r| r.value().clone())
58 }
59
60 pub fn parse_and_insert(&self, schema_id: String, schema_json: &str) -> Result<Schema> {
64 let schema = Schema::parse_str(schema_json)
65 .map_err(|e| PubSubError::Avro(format!("failed to parse schema {schema_id}: {e}")))?;
66 self.inner.cache.insert(schema_id, schema.clone());
67 Ok(schema)
68 }
69
70 pub async fn get_or_fetch(
81 &self,
82 schema_id: &str,
83 channel: &Channel,
84 metadata: tonic::metadata::MetadataMap,
85 ) -> Result<Schema> {
86 if let Some(schema) = self.get(schema_id) {
88 return Ok(schema);
89 }
90
91 let mut req = tonic::Request::new(SchemaRequest {
93 schema_id: schema_id.to_string(),
94 });
95 *req.metadata_mut() = metadata;
96
97 let resp = PubSubClient::new(channel.clone())
98 .get_schema(req)
99 .await?
100 .into_inner();
101
102 self.parse_and_insert(resp.schema_id, &resp.schema_json)
103 }
104}
105
106impl Default for SchemaCache {
107 fn default() -> Self {
108 Self::new()
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 const SIMPLE_SCHEMA_JSON: &str = r#"
117 {
118 "type": "record",
119 "name": "OrderEvent",
120 "fields": [
121 {"name": "order_id", "type": "string"},
122 {"name": "total", "type": "double"}
123 ]
124 }
125 "#;
126
127 #[test]
128 fn test_new_cache_is_empty() {
129 let cache = SchemaCache::new();
130 assert!(cache.is_empty());
131 assert_eq!(cache.len(), 0);
132 }
133
134 #[test]
135 fn test_parse_and_insert() {
136 let cache = SchemaCache::new();
137 let Ok(_) = cache.parse_and_insert("schema-001".to_string(), SIMPLE_SCHEMA_JSON) else {
138 panic!("valid schema JSON")
139 };
140 assert_eq!(cache.len(), 1);
141 assert!(!cache.is_empty());
142 assert!(cache.get("schema-001").is_some());
143 }
144
145 #[test]
146 fn test_get_miss_returns_none() {
147 let cache = SchemaCache::new();
148 assert!(cache.get("nonexistent").is_none());
149 }
150
151 #[test]
152 fn test_parse_invalid_schema_returns_error() {
153 let cache = SchemaCache::new();
154 let result = cache.parse_and_insert("bad".to_string(), "{ not valid avro }");
155 let Err(err) = result else {
156 panic!("Expected an error");
157 };
158 assert!(matches!(err, PubSubError::Avro(_)));
159 assert!(cache.is_empty());
160 }
161
162 #[test]
163 fn test_cache_is_cloneable_and_shared() {
164 let cache = SchemaCache::new();
165 let cache2 = cache.clone();
166
167 let Ok(_) = cache.parse_and_insert("shared".to_string(), SIMPLE_SCHEMA_JSON) else {
168 panic!("valid")
169 };
170
171 assert_eq!(cache2.len(), 1);
173 assert!(cache2.get("shared").is_some());
174 }
175
176 #[test]
177 fn test_default_is_empty() {
178 let cache = SchemaCache::default();
179 assert!(cache.is_empty());
180 }
181}