Skip to main content

rivven_schema/
config.rs

1//! Schema Registry configuration
2
3use crate::compatibility::CompatibilityLevel;
4use serde::{Deserialize, Serialize};
5
6/// Configuration for the Schema Registry
7#[derive(Debug, Clone, Serialize, Deserialize)]
8pub struct RegistryConfig {
9    /// Storage backend configuration
10    pub storage: StorageConfig,
11
12    /// Default compatibility level
13    #[serde(default)]
14    pub compatibility: CompatibilityLevel,
15
16    /// Enable schema normalization before storing
17    #[serde(default = "default_true")]
18    pub normalize_schemas: bool,
19
20    /// Schema ID generation mode
21    #[serde(default)]
22    pub id_generation: IdGeneration,
23
24    /// Maximum number of schemas to cache in memory (0 = unlimited).
25    /// When the limit is reached, the oldest entries are evicted.
26    #[serde(default = "default_max_cache_size")]
27    pub max_cache_size: usize,
28}
29
30fn default_true() -> bool {
31    true
32}
33
34fn default_max_cache_size() -> usize {
35    10_000
36}
37
38impl Default for RegistryConfig {
39    fn default() -> Self {
40        Self {
41            storage: StorageConfig::Memory,
42            compatibility: CompatibilityLevel::default(),
43            normalize_schemas: true,
44            id_generation: IdGeneration::default(),
45            max_cache_size: default_max_cache_size(),
46        }
47    }
48}
49
50impl RegistryConfig {
51    /// Create config with in-memory storage
52    pub fn memory() -> Self {
53        Self {
54            storage: StorageConfig::Memory,
55            ..Default::default()
56        }
57    }
58
59    /// Create config with broker-backed storage
60    pub fn broker(config: BrokerStorageConfig) -> Self {
61        Self {
62            storage: StorageConfig::Broker(config),
63            ..Default::default()
64        }
65    }
66
67    /// Set compatibility level
68    pub fn with_compatibility(mut self, level: CompatibilityLevel) -> Self {
69        self.compatibility = level;
70        self
71    }
72
73    /// Set schema normalization
74    pub fn with_normalize(mut self, normalize: bool) -> Self {
75        self.normalize_schemas = normalize;
76        self
77    }
78}
79
80/// Storage backend configuration
81///
82/// The schema registry supports multiple storage backends:
83/// - **Memory**: In-memory storage for development and testing
84/// - **Broker**: Durable storage in rivven broker topics (recommended for production)
85/// - **Glue**: AWS Glue Schema Registry for AWS-native deployments
86#[derive(Debug, Clone, Default, Serialize, Deserialize)]
87#[serde(tag = "type", rename_all = "lowercase")]
88pub enum StorageConfig {
89    /// In-memory storage (default for development)
90    #[default]
91    Memory,
92
93    /// Broker-backed storage (durable, replicated)
94    ///
95    /// Stores schemas in a compacted rivven topic (similar to Kafka's `_schemas` topic).
96    /// Provides durability and automatic replication across cluster nodes.
97    Broker(BrokerStorageConfig),
98
99    /// AWS Glue Schema Registry (external)
100    Glue {
101        region: String,
102        registry_name: Option<String>,
103    },
104}
105
106/// Configuration for broker-backed schema storage
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct BrokerStorageConfig {
109    /// Broker address(es) to connect to
110    /// Format: "host:port" or "host1:port1,host2:port2" for multiple brokers
111    pub brokers: String,
112
113    /// Topic name for storing schemas (default: "_schemas")
114    #[serde(default = "default_schema_topic")]
115    pub topic: String,
116
117    /// Replication factor for the schema topic (default: 3)
118    #[serde(default = "default_replication_factor")]
119    pub replication_factor: u16,
120
121    /// Number of partitions for the schema topic (default: 1)
122    /// Note: Using 1 partition ensures global ordering for schema IDs
123    #[serde(default = "default_partitions")]
124    pub partitions: u32,
125
126    /// TLS configuration for broker connection
127    #[serde(default)]
128    pub tls: Option<BrokerTlsConfig>,
129
130    /// Authentication configuration
131    #[serde(default)]
132    pub auth: Option<BrokerAuthConfig>,
133
134    /// Connection timeout in milliseconds (default: 10000)
135    #[serde(default = "default_timeout")]
136    pub connect_timeout_ms: u64,
137
138    /// Bootstrap timeout - how long to wait for initial schema load (default: 30000)
139    #[serde(default = "default_bootstrap_timeout")]
140    pub bootstrap_timeout_ms: u64,
141}
142
143fn default_schema_topic() -> String {
144    "_schemas".to_string()
145}
146
147fn default_replication_factor() -> u16 {
148    3
149}
150
151fn default_partitions() -> u32 {
152    1
153}
154
155fn default_timeout() -> u64 {
156    10000
157}
158
159fn default_bootstrap_timeout() -> u64 {
160    30000
161}
162
163impl Default for BrokerStorageConfig {
164    fn default() -> Self {
165        Self {
166            brokers: "localhost:9092".to_string(),
167            topic: default_schema_topic(),
168            replication_factor: default_replication_factor(),
169            partitions: default_partitions(),
170            tls: None,
171            auth: None,
172            connect_timeout_ms: default_timeout(),
173            bootstrap_timeout_ms: default_bootstrap_timeout(),
174        }
175    }
176}
177
178impl BrokerStorageConfig {
179    /// Create broker storage config with default topic name
180    pub fn new(brokers: impl Into<String>) -> Self {
181        Self {
182            brokers: brokers.into(),
183            ..Default::default()
184        }
185    }
186
187    /// Set custom topic name
188    pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
189        self.topic = topic.into();
190        self
191    }
192
193    /// Set replication factor
194    pub fn with_replication_factor(mut self, factor: u16) -> Self {
195        self.replication_factor = factor;
196        self
197    }
198
199    /// Enable TLS
200    pub fn with_tls(mut self, config: BrokerTlsConfig) -> Self {
201        self.tls = Some(config);
202        self
203    }
204
205    /// Set authentication
206    pub fn with_auth(mut self, config: BrokerAuthConfig) -> Self {
207        self.auth = Some(config);
208        self
209    }
210}
211
212/// TLS configuration for broker connection
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct BrokerTlsConfig {
215    /// Path to CA certificate file
216    pub ca_cert: Option<String>,
217
218    /// Path to client certificate file (for mTLS)
219    pub client_cert: Option<String>,
220
221    /// Path to client key file (for mTLS)
222    pub client_key: Option<String>,
223
224    /// Skip server certificate verification (NOT recommended for production)
225    #[serde(default)]
226    pub insecure_skip_verify: bool,
227}
228
229/// Authentication configuration for broker connection
230#[derive(Clone, Serialize, Deserialize)]
231#[serde(tag = "type", rename_all = "lowercase")]
232pub enum BrokerAuthConfig {
233    /// SASL/PLAIN authentication
234    Plain { username: String, password: String },
235
236    /// SASL/SCRAM-SHA-256 authentication
237    ScramSha256 { username: String, password: String },
238
239    /// SASL/SCRAM-SHA-512 authentication
240    ScramSha512 { username: String, password: String },
241}
242
243impl std::fmt::Debug for BrokerAuthConfig {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        match self {
246            Self::Plain { username, .. } => f
247                .debug_struct("Plain")
248                .field("username", username)
249                .field("password", &"[REDACTED]")
250                .finish(),
251            Self::ScramSha256 { username, .. } => f
252                .debug_struct("ScramSha256")
253                .field("username", username)
254                .field("password", &"[REDACTED]")
255                .finish(),
256            Self::ScramSha512 { username, .. } => f
257                .debug_struct("ScramSha512")
258                .field("username", username)
259                .field("password", &"[REDACTED]")
260                .finish(),
261        }
262    }
263}
264
265/// Schema ID generation mode
266#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
267#[serde(rename_all = "lowercase")]
268pub enum IdGeneration {
269    /// Auto-increment IDs (default)
270    #[default]
271    AutoIncrement,
272
273    /// Hash-based IDs (deterministic)
274    Hash,
275
276    /// External ID provider
277    External,
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn test_default_config() {
286        let config = RegistryConfig::default();
287        assert!(matches!(config.storage, StorageConfig::Memory));
288        assert_eq!(config.compatibility, CompatibilityLevel::Backward);
289        assert!(config.normalize_schemas);
290    }
291
292    #[test]
293    fn test_memory_config() {
294        let config = RegistryConfig::memory();
295        assert!(matches!(config.storage, StorageConfig::Memory));
296    }
297
298    #[test]
299    fn test_broker_config() {
300        let broker_config = BrokerStorageConfig::new("localhost:9092")
301            .with_topic("my_schemas")
302            .with_replication_factor(3);
303
304        let config = RegistryConfig::broker(broker_config);
305        if let StorageConfig::Broker(bc) = config.storage {
306            assert_eq!(bc.brokers, "localhost:9092");
307            assert_eq!(bc.topic, "my_schemas");
308            assert_eq!(bc.replication_factor, 3);
309        } else {
310            panic!("Expected Broker storage config");
311        }
312    }
313
314    #[test]
315    fn test_broker_storage_defaults() {
316        let config = BrokerStorageConfig::default();
317        assert_eq!(config.brokers, "localhost:9092");
318        assert_eq!(config.topic, "_schemas");
319        assert_eq!(config.replication_factor, 3);
320        assert_eq!(config.partitions, 1);
321        assert!(config.tls.is_none());
322        assert!(config.auth.is_none());
323    }
324}