schema_registry_storage/
lib.rs

1//! # Schema Registry Storage
2//!
3//! Storage abstraction layer for PostgreSQL, Redis, and S3.
4//! Implements the SchemaStorage trait from schema-registry-core.
5
6pub mod cache_warmer;
7pub mod postgres;
8pub mod redis_cache;
9pub mod s3;
10
11use async_trait::async_trait;
12use schema_registry_core::{error::Result, schema::RegisteredSchema, traits::SchemaStorage, versioning::SemanticVersion};
13use uuid::Uuid;
14
15/// Storage backend configuration
16#[derive(Debug, Clone)]
17pub enum StorageConfig {
18    /// PostgreSQL configuration
19    Postgres {
20        connection_string: String,
21        max_connections: u32,
22    },
23    /// Redis configuration
24    Redis {
25        url: String,
26    },
27    /// S3 configuration
28    S3 {
29        bucket: String,
30        region: String,
31    },
32}
33
34/// Multi-tier storage implementation
35pub struct MultiTierStorage {
36    // Primary storage (PostgreSQL)
37    postgres: postgres::PostgresStorage,
38    // Cache layer (Redis)
39    cache: redis_cache::RedisCache,
40    // Archive storage (S3)
41    #[allow(dead_code)]
42    s3: s3::S3Storage,
43}
44
45impl MultiTierStorage {
46    /// Create a new multi-tier storage instance
47    pub async fn new(postgres_config: StorageConfig, redis_config: StorageConfig, s3_config: StorageConfig) -> Result<Self> {
48        Ok(Self {
49            postgres: postgres::PostgresStorage::new(postgres_config).await?,
50            cache: redis_cache::RedisCache::new(redis_config).await?,
51            s3: s3::S3Storage::new(s3_config).await?,
52        })
53    }
54}
55
56#[async_trait]
57impl SchemaStorage for MultiTierStorage {
58    async fn store(&self, schema: RegisteredSchema) -> Result<()> {
59        // Store in PostgreSQL (primary)
60        self.postgres.store(schema.clone()).await?;
61        // Update cache
62        self.cache.store(schema).await?;
63        Ok(())
64    }
65
66    async fn retrieve(&self, id: Uuid, version: Option<SemanticVersion>) -> Result<RegisteredSchema> {
67        // Try cache first
68        if let Ok(schema) = self.cache.retrieve(id, version.clone()).await {
69            return Ok(schema);
70        }
71        // Fallback to PostgreSQL
72        let schema = self.postgres.retrieve(id, version).await?;
73        // Update cache
74        let _ = self.cache.store(schema.clone()).await;
75        Ok(schema)
76    }
77
78    async fn retrieve_by_hash(&self, content_hash: &str) -> Result<Option<RegisteredSchema>> {
79        self.postgres.retrieve_by_hash(content_hash).await
80    }
81
82    async fn update(&self, schema: RegisteredSchema) -> Result<()> {
83        self.postgres.update(schema.clone()).await?;
84        self.cache.store(schema).await?;
85        Ok(())
86    }
87
88    async fn delete(&self, id: Uuid, version: SemanticVersion) -> Result<()> {
89        self.postgres.delete(id, version).await?;
90        // Invalidate cache
91        Ok(())
92    }
93
94    async fn list_versions(&self, id: Uuid) -> Result<Vec<SemanticVersion>> {
95        self.postgres.list_versions(id).await
96    }
97
98    async fn find_by_name(&self, namespace: &str, name: &str) -> Result<Vec<RegisteredSchema>> {
99        self.postgres.find_by_name(namespace, name).await
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106
107    #[test]
108    fn test_storage_config_postgres_creation() {
109        let config = StorageConfig::Postgres {
110            connection_string: "postgresql://localhost/test".to_string(),
111            max_connections: 10,
112        };
113
114        match config {
115            StorageConfig::Postgres {
116                connection_string,
117                max_connections,
118            } => {
119                assert_eq!(connection_string, "postgresql://localhost/test");
120                assert_eq!(max_connections, 10);
121            }
122            _ => panic!("Wrong variant"),
123        }
124    }
125
126    #[test]
127    fn test_storage_config_redis_creation() {
128        let config = StorageConfig::Redis {
129            url: "redis://localhost:6379".to_string(),
130        };
131
132        match config {
133            StorageConfig::Redis { url } => {
134                assert_eq!(url, "redis://localhost:6379");
135            }
136            _ => panic!("Wrong variant"),
137        }
138    }
139
140    #[test]
141    fn test_storage_config_s3_creation() {
142        let config = StorageConfig::S3 {
143            bucket: "my-bucket".to_string(),
144            region: "us-east-1".to_string(),
145        };
146
147        match config {
148            StorageConfig::S3 { bucket, region } => {
149                assert_eq!(bucket, "my-bucket");
150                assert_eq!(region, "us-east-1");
151            }
152            _ => panic!("Wrong variant"),
153        }
154    }
155
156    #[test]
157    fn test_storage_config_clone() {
158        let config = StorageConfig::Postgres {
159            connection_string: "test".to_string(),
160            max_connections: 5,
161        };
162        let cloned = config.clone();
163
164        match (config, cloned) {
165            (
166                StorageConfig::Postgres {
167                    connection_string: c1,
168                    max_connections: m1,
169                },
170                StorageConfig::Postgres {
171                    connection_string: c2,
172                    max_connections: m2,
173                },
174            ) => {
175                assert_eq!(c1, c2);
176                assert_eq!(m1, m2);
177            }
178            _ => panic!("Clone failed"),
179        }
180    }
181
182    #[test]
183    fn test_storage_config_debug() {
184        let config = StorageConfig::Redis {
185            url: "redis://localhost".to_string(),
186        };
187        let debug_str = format!("{:?}", config);
188        assert!(debug_str.contains("Redis"));
189        assert!(debug_str.contains("url"));
190    }
191
192    #[test]
193    fn test_postgres_config_different_max_connections() {
194        let config1 = StorageConfig::Postgres {
195            connection_string: "test".to_string(),
196            max_connections: 10,
197        };
198        let config2 = StorageConfig::Postgres {
199            connection_string: "test".to_string(),
200            max_connections: 20,
201        };
202
203        if let (
204            StorageConfig::Postgres {
205                max_connections: m1, ..
206            },
207            StorageConfig::Postgres {
208                max_connections: m2, ..
209            },
210        ) = (config1, config2)
211        {
212            assert_ne!(m1, m2);
213        }
214    }
215
216    #[test]
217    fn test_s3_config_different_regions() {
218        let config1 = StorageConfig::S3 {
219            bucket: "bucket".to_string(),
220            region: "us-east-1".to_string(),
221        };
222        let config2 = StorageConfig::S3 {
223            bucket: "bucket".to_string(),
224            region: "eu-west-1".to_string(),
225        };
226
227        if let (
228            StorageConfig::S3 { region: r1, .. },
229            StorageConfig::S3 { region: r2, .. },
230        ) = (config1, config2)
231        {
232            assert_ne!(r1, r2);
233        }
234    }
235}