Skip to main content

rivven_schema/
config.rs

1//! Schema Registry configuration
2
3use crate::compatibility::CompatibilityLevel;
4use serde::{Deserialize, Serialize};
5
6/// Configuration for the Schema Registry
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct RegistryConfig {
9    /// Storage backend configuration
10    pub storage: StorageConfig,
11
12    /// Default compatibility level
13    #[serde(default)]
14    pub compatibility: CompatibilityLevel,
15
16    /// Enable schema normalization before storing
17    #[serde(default = "default_true")]
18    pub normalize_schemas: bool,
19
20    /// Cache configuration
21    #[serde(default)]
22    pub cache: CacheConfig,
23
24    /// Schema ID generation mode
25    #[serde(default)]
26    pub id_generation: IdGeneration,
27}
28
29fn default_true() -> bool {
30    true
31}
32
33impl Default for RegistryConfig {
34    fn default() -> Self {
35        Self {
36            storage: StorageConfig::Memory,
37            compatibility: CompatibilityLevel::default(),
38            normalize_schemas: true,
39            cache: CacheConfig::default(),
40            id_generation: IdGeneration::default(),
41        }
42    }
43}
44
45impl RegistryConfig {
46    /// Create config with in-memory storage
47    pub fn memory() -> Self {
48        Self {
49            storage: StorageConfig::Memory,
50            ..Default::default()
51        }
52    }
53
54    /// Create config with broker-backed storage
55    pub fn broker(config: BrokerStorageConfig) -> Self {
56        Self {
57            storage: StorageConfig::Broker(config),
58            ..Default::default()
59        }
60    }
61
62    /// Set compatibility level
63    pub fn with_compatibility(mut self, level: CompatibilityLevel) -> Self {
64        self.compatibility = level;
65        self
66    }
67
68    /// Set schema normalization
69    pub fn with_normalize(mut self, normalize: bool) -> Self {
70        self.normalize_schemas = normalize;
71        self
72    }
73}
74
75/// Storage backend configuration
76///
77/// The schema registry supports multiple storage backends:
78/// - **Memory**: In-memory storage for development and testing
79/// - **Broker**: Durable storage in rivven broker topics (recommended for production)
80/// - **Glue**: AWS Glue Schema Registry for AWS-native deployments
81#[derive(Debug, Clone, Default, Serialize, Deserialize)]
82#[serde(tag = "type", rename_all = "lowercase")]
83pub enum StorageConfig {
84    /// In-memory storage (default for development)
85    #[default]
86    Memory,
87
88    /// Broker-backed storage (durable, replicated)
89    ///
90    /// Stores schemas in a compacted rivven topic (similar to Kafka's `_schemas` topic).
91    /// Provides durability and automatic replication across cluster nodes.
92    Broker(BrokerStorageConfig),
93
94    /// AWS Glue Schema Registry (external)
95    Glue {
96        region: String,
97        registry_name: Option<String>,
98    },
99}
100
101/// Configuration for broker-backed schema storage
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct BrokerStorageConfig {
104    /// Broker address(es) to connect to
105    /// Format: "host:port" or "host1:port1,host2:port2" for multiple brokers
106    pub brokers: String,
107
108    /// Topic name for storing schemas (default: "_schemas")
109    #[serde(default = "default_schema_topic")]
110    pub topic: String,
111
112    /// Replication factor for the schema topic (default: 3)
113    #[serde(default = "default_replication_factor")]
114    pub replication_factor: u16,
115
116    /// Number of partitions for the schema topic (default: 1)
117    /// Note: Using 1 partition ensures global ordering for schema IDs
118    #[serde(default = "default_partitions")]
119    pub partitions: u32,
120
121    /// TLS configuration for broker connection
122    #[serde(default)]
123    pub tls: Option<BrokerTlsConfig>,
124
125    /// Authentication configuration
126    #[serde(default)]
127    pub auth: Option<BrokerAuthConfig>,
128
129    /// Connection timeout in milliseconds (default: 10000)
130    #[serde(default = "default_timeout")]
131    pub connect_timeout_ms: u64,
132
133    /// Bootstrap timeout - how long to wait for initial schema load (default: 30000)
134    #[serde(default = "default_bootstrap_timeout")]
135    pub bootstrap_timeout_ms: u64,
136}
137
138fn default_schema_topic() -> String {
139    "_schemas".to_string()
140}
141
142fn default_replication_factor() -> u16 {
143    3
144}
145
146fn default_partitions() -> u32 {
147    1
148}
149
150fn default_timeout() -> u64 {
151    10000
152}
153
154fn default_bootstrap_timeout() -> u64 {
155    30000
156}
157
158impl Default for BrokerStorageConfig {
159    fn default() -> Self {
160        Self {
161            brokers: "localhost:9092".to_string(),
162            topic: default_schema_topic(),
163            replication_factor: default_replication_factor(),
164            partitions: default_partitions(),
165            tls: None,
166            auth: None,
167            connect_timeout_ms: default_timeout(),
168            bootstrap_timeout_ms: default_bootstrap_timeout(),
169        }
170    }
171}
172
173impl BrokerStorageConfig {
174    /// Create broker storage config with default topic name
175    pub fn new(brokers: impl Into<String>) -> Self {
176        Self {
177            brokers: brokers.into(),
178            ..Default::default()
179        }
180    }
181
182    /// Set custom topic name
183    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
184        self.topic = topic.into();
185        self
186    }
187
188    /// Set replication factor
189    pub fn with_replication_factor(mut self, factor: u16) -> Self {
190        self.replication_factor = factor;
191        self
192    }
193
194    /// Enable TLS
195    pub fn with_tls(mut self, config: BrokerTlsConfig) -> Self {
196        self.tls = Some(config);
197        self
198    }
199
200    /// Set authentication
201    pub fn with_auth(mut self, config: BrokerAuthConfig) -> Self {
202        self.auth = Some(config);
203        self
204    }
205}
206
207/// TLS configuration for broker connection
208#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct BrokerTlsConfig {
210    /// Path to CA certificate file
211    pub ca_cert: Option<String>,
212
213    /// Path to client certificate file (for mTLS)
214    pub client_cert: Option<String>,
215
216    /// Path to client key file (for mTLS)
217    pub client_key: Option<String>,
218
219    /// Skip server certificate verification (NOT recommended for production)
220    #[serde(default)]
221    pub insecure_skip_verify: bool,
222}
223
224/// Authentication configuration for broker connection
225#[derive(Debug, Clone, Serialize, Deserialize)]
226#[serde(tag = "type", rename_all = "lowercase")]
227pub enum BrokerAuthConfig {
228    /// SASL/PLAIN authentication
229    Plain { username: String, password: String },
230
231    /// SASL/SCRAM-SHA-256 authentication
232    ScramSha256 { username: String, password: String },
233
234    /// SASL/SCRAM-SHA-512 authentication
235    ScramSha512 { username: String, password: String },
236}
237
238/// Cache configuration
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct CacheConfig {
241    /// Maximum number of schemas to cache
242    pub max_schemas: usize,
243
244    /// Maximum number of subjects to cache
245    pub max_subjects: usize,
246
247    /// TTL for cache entries in seconds (0 = no expiry)
248    pub ttl_seconds: u64,
249}
250
251impl Default for CacheConfig {
252    fn default() -> Self {
253        Self {
254            max_schemas: 10000,
255            max_subjects: 1000,
256            ttl_seconds: 0, // No expiry by default
257        }
258    }
259}
260
261/// Schema ID generation mode
262#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
263#[serde(rename_all = "lowercase")]
264pub enum IdGeneration {
265    /// Auto-increment IDs (default)
266    #[default]
267    AutoIncrement,
268
269    /// Hash-based IDs (deterministic)
270    Hash,
271
272    /// External ID provider
273    External,
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    #[test]
281    fn test_default_config() {
282        let config = RegistryConfig::default();
283        assert!(matches!(config.storage, StorageConfig::Memory));
284        assert_eq!(config.compatibility, CompatibilityLevel::Backward);
285        assert!(config.normalize_schemas);
286    }
287
288    #[test]
289    fn test_memory_config() {
290        let config = RegistryConfig::memory();
291        assert!(matches!(config.storage, StorageConfig::Memory));
292    }
293
294    #[test]
295    fn test_broker_config() {
296        let broker_config = BrokerStorageConfig::new("localhost:9092")
297            .with_topic("my_schemas")
298            .with_replication_factor(3);
299
300        let config = RegistryConfig::broker(broker_config);
301        if let StorageConfig::Broker(bc) = config.storage {
302            assert_eq!(bc.brokers, "localhost:9092");
303            assert_eq!(bc.topic, "my_schemas");
304            assert_eq!(bc.replication_factor, 3);
305        } else {
306            panic!("Expected Broker storage config");
307        }
308    }
309
310    #[test]
311    fn test_broker_storage_defaults() {
312        let config = BrokerStorageConfig::default();
313        assert_eq!(config.brokers, "localhost:9092");
314        assert_eq!(config.topic, "_schemas");
315        assert_eq!(config.replication_factor, 3);
316        assert_eq!(config.partitions, 1);
317        assert!(config.tls.is_none());
318        assert!(config.auth.is_none());
319    }
320}