schema_registry_storage/
lib.rs1pub mod cache_warmer;
7pub mod postgres;
8pub mod redis_cache;
9pub mod s3;
10
11use async_trait::async_trait;
12use schema_registry_core::{error::Result, schema::RegisteredSchema, traits::SchemaStorage, versioning::SemanticVersion};
13use uuid::Uuid;
14
15#[derive(Debug, Clone)]
17pub enum StorageConfig {
18 Postgres {
20 connection_string: String,
21 max_connections: u32,
22 },
23 Redis {
25 url: String,
26 },
27 S3 {
29 bucket: String,
30 region: String,
31 },
32}
33
34pub struct MultiTierStorage {
36 postgres: postgres::PostgresStorage,
38 cache: redis_cache::RedisCache,
40 #[allow(dead_code)]
42 s3: s3::S3Storage,
43}
44
45impl MultiTierStorage {
46 pub async fn new(postgres_config: StorageConfig, redis_config: StorageConfig, s3_config: StorageConfig) -> Result<Self> {
48 Ok(Self {
49 postgres: postgres::PostgresStorage::new(postgres_config).await?,
50 cache: redis_cache::RedisCache::new(redis_config).await?,
51 s3: s3::S3Storage::new(s3_config).await?,
52 })
53 }
54}
55
56#[async_trait]
57impl SchemaStorage for MultiTierStorage {
58 async fn store(&self, schema: RegisteredSchema) -> Result<()> {
59 self.postgres.store(schema.clone()).await?;
61 self.cache.store(schema).await?;
63 Ok(())
64 }
65
66 async fn retrieve(&self, id: Uuid, version: Option<SemanticVersion>) -> Result<RegisteredSchema> {
67 if let Ok(schema) = self.cache.retrieve(id, version.clone()).await {
69 return Ok(schema);
70 }
71 let schema = self.postgres.retrieve(id, version).await?;
73 let _ = self.cache.store(schema.clone()).await;
75 Ok(schema)
76 }
77
78 async fn retrieve_by_hash(&self, content_hash: &str) -> Result<Option<RegisteredSchema>> {
79 self.postgres.retrieve_by_hash(content_hash).await
80 }
81
82 async fn update(&self, schema: RegisteredSchema) -> Result<()> {
83 self.postgres.update(schema.clone()).await?;
84 self.cache.store(schema).await?;
85 Ok(())
86 }
87
88 async fn delete(&self, id: Uuid, version: SemanticVersion) -> Result<()> {
89 self.postgres.delete(id, version).await?;
90 Ok(())
92 }
93
94 async fn list_versions(&self, id: Uuid) -> Result<Vec<SemanticVersion>> {
95 self.postgres.list_versions(id).await
96 }
97
98 async fn find_by_name(&self, namespace: &str, name: &str) -> Result<Vec<RegisteredSchema>> {
99 self.postgres.find_by_name(namespace, name).await
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 #[test]
108 fn test_storage_config_postgres_creation() {
109 let config = StorageConfig::Postgres {
110 connection_string: "postgresql://localhost/test".to_string(),
111 max_connections: 10,
112 };
113
114 match config {
115 StorageConfig::Postgres {
116 connection_string,
117 max_connections,
118 } => {
119 assert_eq!(connection_string, "postgresql://localhost/test");
120 assert_eq!(max_connections, 10);
121 }
122 _ => panic!("Wrong variant"),
123 }
124 }
125
126 #[test]
127 fn test_storage_config_redis_creation() {
128 let config = StorageConfig::Redis {
129 url: "redis://localhost:6379".to_string(),
130 };
131
132 match config {
133 StorageConfig::Redis { url } => {
134 assert_eq!(url, "redis://localhost:6379");
135 }
136 _ => panic!("Wrong variant"),
137 }
138 }
139
140 #[test]
141 fn test_storage_config_s3_creation() {
142 let config = StorageConfig::S3 {
143 bucket: "my-bucket".to_string(),
144 region: "us-east-1".to_string(),
145 };
146
147 match config {
148 StorageConfig::S3 { bucket, region } => {
149 assert_eq!(bucket, "my-bucket");
150 assert_eq!(region, "us-east-1");
151 }
152 _ => panic!("Wrong variant"),
153 }
154 }
155
156 #[test]
157 fn test_storage_config_clone() {
158 let config = StorageConfig::Postgres {
159 connection_string: "test".to_string(),
160 max_connections: 5,
161 };
162 let cloned = config.clone();
163
164 match (config, cloned) {
165 (
166 StorageConfig::Postgres {
167 connection_string: c1,
168 max_connections: m1,
169 },
170 StorageConfig::Postgres {
171 connection_string: c2,
172 max_connections: m2,
173 },
174 ) => {
175 assert_eq!(c1, c2);
176 assert_eq!(m1, m2);
177 }
178 _ => panic!("Clone failed"),
179 }
180 }
181
182 #[test]
183 fn test_storage_config_debug() {
184 let config = StorageConfig::Redis {
185 url: "redis://localhost".to_string(),
186 };
187 let debug_str = format!("{:?}", config);
188 assert!(debug_str.contains("Redis"));
189 assert!(debug_str.contains("url"));
190 }
191
192 #[test]
193 fn test_postgres_config_different_max_connections() {
194 let config1 = StorageConfig::Postgres {
195 connection_string: "test".to_string(),
196 max_connections: 10,
197 };
198 let config2 = StorageConfig::Postgres {
199 connection_string: "test".to_string(),
200 max_connections: 20,
201 };
202
203 if let (
204 StorageConfig::Postgres {
205 max_connections: m1, ..
206 },
207 StorageConfig::Postgres {
208 max_connections: m2, ..
209 },
210 ) = (config1, config2)
211 {
212 assert_ne!(m1, m2);
213 }
214 }
215
216 #[test]
217 fn test_s3_config_different_regions() {
218 let config1 = StorageConfig::S3 {
219 bucket: "bucket".to_string(),
220 region: "us-east-1".to_string(),
221 };
222 let config2 = StorageConfig::S3 {
223 bucket: "bucket".to_string(),
224 region: "eu-west-1".to_string(),
225 };
226
227 if let (
228 StorageConfig::S3 { region: r1, .. },
229 StorageConfig::S3 { region: r2, .. },
230 ) = (config1, config2)
231 {
232 assert_ne!(r1, r2);
233 }
234 }
235}