xerv_core/schema/
registry.rs

1//! Enhanced schema registry with version history and persistence.
2//!
3//! Extends the base schema registry with version tracking, persistence,
4//! and schema family management.
5
6use super::version::SchemaVersion;
7use crate::error::{Result, XervError};
8use crate::traits::{FieldInfo, TypeInfo};
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12
13/// Enhanced schema registry with version history.
14///
15/// Provides storage and lookup for schema metadata with support for:
16/// - Version history tracking per schema family
17/// - Hash-based lookup for WAL replay
18/// - Optional persistence to JSON
19/// - Compatibility checking between versions
20#[derive(Default)]
21pub struct VersionedSchemaRegistry {
22    /// Schemas by full name (e.g., "OrderInput@v1").
23    schemas: RwLock<HashMap<String, TypeInfo>>,
24    /// Schemas by hash for fast lookup.
25    by_hash: RwLock<HashMap<u64, String>>,
26    /// Version history per schema family (short name -> sorted versions).
27    versions: RwLock<HashMap<String, Vec<(SchemaVersion, u64)>>>,
28    /// Persistence path (optional).
29    persistence_path: Option<PathBuf>,
30}
31
32impl VersionedSchemaRegistry {
33    /// Create a new empty registry.
34    pub fn new() -> Self {
35        Self {
36            schemas: RwLock::new(HashMap::new()),
37            by_hash: RwLock::new(HashMap::new()),
38            versions: RwLock::new(HashMap::new()),
39            persistence_path: None,
40        }
41    }
42
43    /// Create a registry with persistence.
44    pub fn with_persistence(path: impl AsRef<Path>) -> Self {
45        Self {
46            schemas: RwLock::new(HashMap::new()),
47            by_hash: RwLock::new(HashMap::new()),
48            versions: RwLock::new(HashMap::new()),
49            persistence_path: Some(path.as_ref().to_path_buf()),
50        }
51    }
52
53    /// Register a schema.
54    pub fn register(&self, info: TypeInfo) {
55        let version = SchemaVersion::new(info.version as u16, 0);
56        let hash = info.hash;
57
58        let mut schemas = self.schemas.write();
59        let mut by_hash = self.by_hash.write();
60        let mut versions = self.versions.write();
61
62        schemas.insert(info.name.clone(), info.clone());
63        by_hash.insert(hash, info.name.clone());
64
65        // Update version history
66        let family_versions = versions.entry(info.short_name.clone()).or_default();
67        if !family_versions.iter().any(|(v, _)| *v == version) {
68            family_versions.push((version, hash));
69            family_versions.sort_by_key(|(v, _)| *v);
70        }
71    }
72
73    /// Register a schema with explicit version.
74    pub fn register_versioned(&self, mut info: TypeInfo, version: SchemaVersion) {
75        // Update the name to include the version (use to_short_string for consistency)
76        info.name = format!("{}@{}", info.short_name, version.to_short_string());
77        info.version = version.major as u32;
78
79        self.register(info);
80    }
81
82    /// Get a schema by full name.
83    pub fn get(&self, name: &str) -> Option<TypeInfo> {
84        let schemas = self.schemas.read();
85        schemas.get(name).cloned()
86    }
87
88    /// Get a schema by hash.
89    pub fn get_by_hash(&self, hash: u64) -> Option<TypeInfo> {
90        let by_hash = self.by_hash.read();
91        let schemas = self.schemas.read();
92
93        by_hash
94            .get(&hash)
95            .and_then(|name| schemas.get(name).cloned())
96    }
97
98    /// Get all versions of a schema family.
99    pub fn get_versions(&self, short_name: &str) -> Vec<SchemaVersion> {
100        let versions = self.versions.read();
101        versions
102            .get(short_name)
103            .map(|v| v.iter().map(|(ver, _)| *ver).collect())
104            .unwrap_or_default()
105    }
106
107    /// Get the latest version of a schema.
108    pub fn get_latest(&self, short_name: &str) -> Option<TypeInfo> {
109        let versions = self.versions.read();
110        let schemas = self.schemas.read();
111
112        versions.get(short_name).and_then(|v| {
113            v.last().and_then(|(ver, _)| {
114                // Use to_short_string() to match the format used by TypeInfo::new()
115                let name = format!("{}@{}", short_name, ver.to_short_string());
116                schemas.get(&name).cloned()
117            })
118        })
119    }
120
121    /// Get a specific version of a schema.
122    pub fn get_version(&self, short_name: &str, version: SchemaVersion) -> Option<TypeInfo> {
123        // Use to_short_string() to match the format used by TypeInfo::new()
124        let name = format!("{}@{}", short_name, version.to_short_string());
125        self.get(&name)
126    }
127
128    /// Check if a schema is registered.
129    pub fn contains(&self, name: &str) -> bool {
130        let schemas = self.schemas.read();
131        schemas.contains_key(name)
132    }
133
134    /// Check if a schema family exists.
135    pub fn contains_family(&self, short_name: &str) -> bool {
136        let versions = self.versions.read();
137        versions.contains_key(short_name)
138    }
139
140    /// Get all registered schema names.
141    pub fn names(&self) -> Vec<String> {
142        let schemas = self.schemas.read();
143        schemas.keys().cloned().collect()
144    }
145
146    /// Get all schema families.
147    pub fn families(&self) -> Vec<String> {
148        let versions = self.versions.read();
149        versions.keys().cloned().collect()
150    }
151
152    /// Check compatibility between two schemas.
153    pub fn check_compatibility(&self, from: &str, to: &str) -> bool {
154        let schemas = self.schemas.read();
155        match (schemas.get(from), schemas.get(to)) {
156            (Some(from_info), Some(to_info)) => from_info.is_compatible_with(to_info),
157            _ => false,
158        }
159    }
160
161    /// Persist the registry to disk.
162    pub fn persist(&self) -> Result<()> {
163        let path = self
164            .persistence_path
165            .as_ref()
166            .ok_or_else(|| XervError::ConfigValue {
167                field: "persistence_path".to_string(),
168                cause: "No persistence path configured".to_string(),
169            })?;
170
171        let schemas = self.schemas.read();
172        let versions = self.versions.read();
173
174        let data = RegistryData {
175            schemas: schemas.values().map(|s| SchemaData::from(s)).collect(),
176            families: versions
177                .iter()
178                .map(|(name, vers)| FamilyData {
179                    name: name.clone(),
180                    versions: vers.iter().map(|(v, h)| (v.to_string(), *h)).collect(),
181                })
182                .collect(),
183        };
184
185        let json = serde_json::to_string_pretty(&data)
186            .map_err(|e| XervError::Serialization(e.to_string()))?;
187
188        std::fs::write(path, json).map_err(|e| XervError::Io {
189            path: path.clone(),
190            cause: e.to_string(),
191        })?;
192
193        Ok(())
194    }
195
196    /// Load the registry from disk.
197    pub fn load(path: impl AsRef<Path>) -> Result<Self> {
198        let path = path.as_ref();
199
200        let json = std::fs::read_to_string(path).map_err(|e| XervError::Io {
201            path: path.to_path_buf(),
202            cause: e.to_string(),
203        })?;
204
205        let data: RegistryData =
206            serde_json::from_str(&json).map_err(|e| XervError::Serialization(e.to_string()))?;
207
208        let registry = Self::with_persistence(path);
209
210        for schema_data in data.schemas {
211            let info = schema_data.into_type_info();
212            registry.register(info);
213        }
214
215        Ok(registry)
216    }
217
218    /// Clear all registered schemas.
219    pub fn clear(&self) {
220        let mut schemas = self.schemas.write();
221        let mut by_hash = self.by_hash.write();
222        let mut versions = self.versions.write();
223
224        schemas.clear();
225        by_hash.clear();
226        versions.clear();
227    }
228
229    /// Get the total number of registered schemas.
230    pub fn len(&self) -> usize {
231        let schemas = self.schemas.read();
232        schemas.len()
233    }
234
235    /// Check if the registry is empty.
236    pub fn is_empty(&self) -> bool {
237        let schemas = self.schemas.read();
238        schemas.is_empty()
239    }
240}
241
242/// Serializable schema data for persistence.
243#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
244struct SchemaData {
245    name: String,
246    short_name: String,
247    version: u32,
248    hash: u64,
249    size: usize,
250    alignment: usize,
251    fields: Vec<FieldData>,
252    stable_layout: bool,
253}
254
255impl SchemaData {
256    fn from(info: &TypeInfo) -> Self {
257        Self {
258            name: info.name.clone(),
259            short_name: info.short_name.clone(),
260            version: info.version,
261            hash: info.hash,
262            size: info.size,
263            alignment: info.alignment,
264            fields: info.fields.iter().map(FieldData::from).collect(),
265            stable_layout: info.stable_layout,
266        }
267    }
268
269    fn into_type_info(self) -> TypeInfo {
270        TypeInfo {
271            name: self.name,
272            short_name: self.short_name,
273            version: self.version,
274            hash: self.hash,
275            size: self.size,
276            alignment: self.alignment,
277            fields: self
278                .fields
279                .into_iter()
280                .map(|f| f.into_field_info())
281                .collect(),
282            stable_layout: self.stable_layout,
283        }
284    }
285}
286
287/// Serializable field data for persistence.
288#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
289struct FieldData {
290    name: String,
291    type_name: String,
292    offset: usize,
293    size: usize,
294    optional: bool,
295}
296
297impl FieldData {
298    fn from(info: &FieldInfo) -> Self {
299        Self {
300            name: info.name.clone(),
301            type_name: info.type_name.clone(),
302            offset: info.offset,
303            size: info.size,
304            optional: info.optional,
305        }
306    }
307
308    fn into_field_info(self) -> FieldInfo {
309        let mut info = FieldInfo::new(self.name, self.type_name)
310            .with_offset(self.offset)
311            .with_size(self.size);
312        if self.optional {
313            info = info.optional();
314        }
315        info
316    }
317}
318
319/// Serializable family data for persistence.
320#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
321struct FamilyData {
322    name: String,
323    versions: Vec<(String, u64)>,
324}
325
326/// Top-level registry persistence format.
327#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
328struct RegistryData {
329    schemas: Vec<SchemaData>,
330    families: Vec<FamilyData>,
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    fn create_test_schema(name: &str, version: u32, hash: u64) -> TypeInfo {
338        TypeInfo::new(name, version)
339            .with_hash(hash)
340            .with_size(32)
341            .with_fields(vec![
342                FieldInfo::new("id", "String").with_offset(0).with_size(24),
343                FieldInfo::new("value", "i32").with_offset(24).with_size(4),
344            ])
345            .stable()
346    }
347
348    #[test]
349    fn register_and_get() {
350        let registry = VersionedSchemaRegistry::new();
351
352        let schema = create_test_schema("Order", 1, 100);
353        registry.register(schema);
354
355        assert!(registry.contains("Order@v1"));
356        assert!(!registry.contains("Order@v2"));
357
358        let retrieved = registry.get("Order@v1").unwrap();
359        assert_eq!(retrieved.hash, 100);
360    }
361
362    #[test]
363    fn get_by_hash() {
364        let registry = VersionedSchemaRegistry::new();
365
366        let schema = create_test_schema("Order", 1, 12345);
367        registry.register(schema);
368
369        let retrieved = registry.get_by_hash(12345).unwrap();
370        assert_eq!(retrieved.short_name, "Order");
371    }
372
373    #[test]
374    fn version_history() {
375        let registry = VersionedSchemaRegistry::new();
376
377        registry.register(create_test_schema("Order", 1, 100));
378        registry.register(create_test_schema("Order", 2, 200));
379        registry.register(create_test_schema("Order", 3, 300));
380
381        let versions = registry.get_versions("Order");
382        assert_eq!(versions.len(), 3);
383        assert_eq!(versions[0], SchemaVersion::new(1, 0));
384        assert_eq!(versions[1], SchemaVersion::new(2, 0));
385        assert_eq!(versions[2], SchemaVersion::new(3, 0));
386    }
387
388    #[test]
389    fn get_latest() {
390        let registry = VersionedSchemaRegistry::new();
391
392        registry.register(create_test_schema("Order", 1, 100));
393        registry.register(create_test_schema("Order", 2, 200));
394
395        let latest = registry.get_latest("Order").unwrap();
396        assert_eq!(latest.version, 2);
397        assert_eq!(latest.hash, 200);
398    }
399
400    #[test]
401    fn get_version() {
402        let registry = VersionedSchemaRegistry::new();
403
404        registry.register(create_test_schema("Order", 1, 100));
405        registry.register(create_test_schema("Order", 2, 200));
406
407        let v1 = registry.get_version("Order", SchemaVersion::new(1, 0));
408        assert!(v1.is_some());
409        assert_eq!(v1.unwrap().hash, 100);
410
411        let v3 = registry.get_version("Order", SchemaVersion::new(3, 0));
412        assert!(v3.is_none());
413    }
414
415    #[test]
416    fn families() {
417        let registry = VersionedSchemaRegistry::new();
418
419        registry.register(create_test_schema("Order", 1, 100));
420        registry.register(create_test_schema("User", 1, 200));
421
422        let families = registry.families();
423        assert_eq!(families.len(), 2);
424        assert!(families.contains(&"Order".to_string()));
425        assert!(families.contains(&"User".to_string()));
426    }
427
428    #[test]
429    fn contains_family() {
430        let registry = VersionedSchemaRegistry::new();
431
432        registry.register(create_test_schema("Order", 1, 100));
433
434        assert!(registry.contains_family("Order"));
435        assert!(!registry.contains_family("User"));
436    }
437
438    #[test]
439    fn clear() {
440        let registry = VersionedSchemaRegistry::new();
441
442        registry.register(create_test_schema("Order", 1, 100));
443        assert!(!registry.is_empty());
444
445        registry.clear();
446        assert!(registry.is_empty());
447        assert!(registry.families().is_empty());
448    }
449
450    #[test]
451    fn persistence_roundtrip() {
452        let temp_dir = std::env::temp_dir();
453        let path = temp_dir.join("test_schema_registry.json");
454
455        // Create and populate registry
456        let registry = VersionedSchemaRegistry::with_persistence(&path);
457        registry.register(create_test_schema("Order", 1, 100));
458        registry.register(create_test_schema("Order", 2, 200));
459        registry.register(create_test_schema("User", 1, 300));
460
461        // Persist
462        registry.persist().unwrap();
463
464        // Load into new registry
465        let loaded = VersionedSchemaRegistry::load(&path).unwrap();
466
467        // Verify
468        assert_eq!(loaded.len(), 3);
469        assert!(loaded.contains("Order@v1"));
470        assert!(loaded.contains("Order@v2"));
471        assert!(loaded.contains("User@v1"));
472        assert_eq!(loaded.get_by_hash(200).unwrap().short_name, "Order");
473
474        // Cleanup
475        let _ = std::fs::remove_file(path);
476    }
477}