1use super::version::SchemaVersion;
7use crate::error::{Result, XervError};
8use crate::traits::{FieldInfo, TypeInfo};
9use parking_lot::RwLock;
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12
13#[derive(Default)]
21pub struct VersionedSchemaRegistry {
22 schemas: RwLock<HashMap<String, TypeInfo>>,
24 by_hash: RwLock<HashMap<u64, String>>,
26 versions: RwLock<HashMap<String, Vec<(SchemaVersion, u64)>>>,
28 persistence_path: Option<PathBuf>,
30}
31
32impl VersionedSchemaRegistry {
33 pub fn new() -> Self {
35 Self {
36 schemas: RwLock::new(HashMap::new()),
37 by_hash: RwLock::new(HashMap::new()),
38 versions: RwLock::new(HashMap::new()),
39 persistence_path: None,
40 }
41 }
42
43 pub fn with_persistence(path: impl AsRef<Path>) -> Self {
45 Self {
46 schemas: RwLock::new(HashMap::new()),
47 by_hash: RwLock::new(HashMap::new()),
48 versions: RwLock::new(HashMap::new()),
49 persistence_path: Some(path.as_ref().to_path_buf()),
50 }
51 }
52
53 pub fn register(&self, info: TypeInfo) {
55 let version = SchemaVersion::new(info.version as u16, 0);
56 let hash = info.hash;
57
58 let mut schemas = self.schemas.write();
59 let mut by_hash = self.by_hash.write();
60 let mut versions = self.versions.write();
61
62 schemas.insert(info.name.clone(), info.clone());
63 by_hash.insert(hash, info.name.clone());
64
65 let family_versions = versions.entry(info.short_name.clone()).or_default();
67 if !family_versions.iter().any(|(v, _)| *v == version) {
68 family_versions.push((version, hash));
69 family_versions.sort_by_key(|(v, _)| *v);
70 }
71 }
72
73 pub fn register_versioned(&self, mut info: TypeInfo, version: SchemaVersion) {
75 info.name = format!("{}@{}", info.short_name, version.to_short_string());
77 info.version = version.major as u32;
78
79 self.register(info);
80 }
81
82 pub fn get(&self, name: &str) -> Option<TypeInfo> {
84 let schemas = self.schemas.read();
85 schemas.get(name).cloned()
86 }
87
88 pub fn get_by_hash(&self, hash: u64) -> Option<TypeInfo> {
90 let by_hash = self.by_hash.read();
91 let schemas = self.schemas.read();
92
93 by_hash
94 .get(&hash)
95 .and_then(|name| schemas.get(name).cloned())
96 }
97
98 pub fn get_versions(&self, short_name: &str) -> Vec<SchemaVersion> {
100 let versions = self.versions.read();
101 versions
102 .get(short_name)
103 .map(|v| v.iter().map(|(ver, _)| *ver).collect())
104 .unwrap_or_default()
105 }
106
107 pub fn get_latest(&self, short_name: &str) -> Option<TypeInfo> {
109 let versions = self.versions.read();
110 let schemas = self.schemas.read();
111
112 versions.get(short_name).and_then(|v| {
113 v.last().and_then(|(ver, _)| {
114 let name = format!("{}@{}", short_name, ver.to_short_string());
116 schemas.get(&name).cloned()
117 })
118 })
119 }
120
121 pub fn get_version(&self, short_name: &str, version: SchemaVersion) -> Option<TypeInfo> {
123 let name = format!("{}@{}", short_name, version.to_short_string());
125 self.get(&name)
126 }
127
128 pub fn contains(&self, name: &str) -> bool {
130 let schemas = self.schemas.read();
131 schemas.contains_key(name)
132 }
133
134 pub fn contains_family(&self, short_name: &str) -> bool {
136 let versions = self.versions.read();
137 versions.contains_key(short_name)
138 }
139
140 pub fn names(&self) -> Vec<String> {
142 let schemas = self.schemas.read();
143 schemas.keys().cloned().collect()
144 }
145
146 pub fn families(&self) -> Vec<String> {
148 let versions = self.versions.read();
149 versions.keys().cloned().collect()
150 }
151
152 pub fn check_compatibility(&self, from: &str, to: &str) -> bool {
154 let schemas = self.schemas.read();
155 match (schemas.get(from), schemas.get(to)) {
156 (Some(from_info), Some(to_info)) => from_info.is_compatible_with(to_info),
157 _ => false,
158 }
159 }
160
161 pub fn persist(&self) -> Result<()> {
163 let path = self
164 .persistence_path
165 .as_ref()
166 .ok_or_else(|| XervError::ConfigValue {
167 field: "persistence_path".to_string(),
168 cause: "No persistence path configured".to_string(),
169 })?;
170
171 let schemas = self.schemas.read();
172 let versions = self.versions.read();
173
174 let data = RegistryData {
175 schemas: schemas.values().map(|s| SchemaData::from(s)).collect(),
176 families: versions
177 .iter()
178 .map(|(name, vers)| FamilyData {
179 name: name.clone(),
180 versions: vers.iter().map(|(v, h)| (v.to_string(), *h)).collect(),
181 })
182 .collect(),
183 };
184
185 let json = serde_json::to_string_pretty(&data)
186 .map_err(|e| XervError::Serialization(e.to_string()))?;
187
188 std::fs::write(path, json).map_err(|e| XervError::Io {
189 path: path.clone(),
190 cause: e.to_string(),
191 })?;
192
193 Ok(())
194 }
195
196 pub fn load(path: impl AsRef<Path>) -> Result<Self> {
198 let path = path.as_ref();
199
200 let json = std::fs::read_to_string(path).map_err(|e| XervError::Io {
201 path: path.to_path_buf(),
202 cause: e.to_string(),
203 })?;
204
205 let data: RegistryData =
206 serde_json::from_str(&json).map_err(|e| XervError::Serialization(e.to_string()))?;
207
208 let registry = Self::with_persistence(path);
209
210 for schema_data in data.schemas {
211 let info = schema_data.into_type_info();
212 registry.register(info);
213 }
214
215 Ok(registry)
216 }
217
218 pub fn clear(&self) {
220 let mut schemas = self.schemas.write();
221 let mut by_hash = self.by_hash.write();
222 let mut versions = self.versions.write();
223
224 schemas.clear();
225 by_hash.clear();
226 versions.clear();
227 }
228
229 pub fn len(&self) -> usize {
231 let schemas = self.schemas.read();
232 schemas.len()
233 }
234
235 pub fn is_empty(&self) -> bool {
237 let schemas = self.schemas.read();
238 schemas.is_empty()
239 }
240}
241
242#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
244struct SchemaData {
245 name: String,
246 short_name: String,
247 version: u32,
248 hash: u64,
249 size: usize,
250 alignment: usize,
251 fields: Vec<FieldData>,
252 stable_layout: bool,
253}
254
255impl SchemaData {
256 fn from(info: &TypeInfo) -> Self {
257 Self {
258 name: info.name.clone(),
259 short_name: info.short_name.clone(),
260 version: info.version,
261 hash: info.hash,
262 size: info.size,
263 alignment: info.alignment,
264 fields: info.fields.iter().map(FieldData::from).collect(),
265 stable_layout: info.stable_layout,
266 }
267 }
268
269 fn into_type_info(self) -> TypeInfo {
270 TypeInfo {
271 name: self.name,
272 short_name: self.short_name,
273 version: self.version,
274 hash: self.hash,
275 size: self.size,
276 alignment: self.alignment,
277 fields: self
278 .fields
279 .into_iter()
280 .map(|f| f.into_field_info())
281 .collect(),
282 stable_layout: self.stable_layout,
283 }
284 }
285}
286
287#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
289struct FieldData {
290 name: String,
291 type_name: String,
292 offset: usize,
293 size: usize,
294 optional: bool,
295}
296
297impl FieldData {
298 fn from(info: &FieldInfo) -> Self {
299 Self {
300 name: info.name.clone(),
301 type_name: info.type_name.clone(),
302 offset: info.offset,
303 size: info.size,
304 optional: info.optional,
305 }
306 }
307
308 fn into_field_info(self) -> FieldInfo {
309 let mut info = FieldInfo::new(self.name, self.type_name)
310 .with_offset(self.offset)
311 .with_size(self.size);
312 if self.optional {
313 info = info.optional();
314 }
315 info
316 }
317}
318
319#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
321struct FamilyData {
322 name: String,
323 versions: Vec<(String, u64)>,
324}
325
326#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
328struct RegistryData {
329 schemas: Vec<SchemaData>,
330 families: Vec<FamilyData>,
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 fn create_test_schema(name: &str, version: u32, hash: u64) -> TypeInfo {
338 TypeInfo::new(name, version)
339 .with_hash(hash)
340 .with_size(32)
341 .with_fields(vec![
342 FieldInfo::new("id", "String").with_offset(0).with_size(24),
343 FieldInfo::new("value", "i32").with_offset(24).with_size(4),
344 ])
345 .stable()
346 }
347
348 #[test]
349 fn register_and_get() {
350 let registry = VersionedSchemaRegistry::new();
351
352 let schema = create_test_schema("Order", 1, 100);
353 registry.register(schema);
354
355 assert!(registry.contains("Order@v1"));
356 assert!(!registry.contains("Order@v2"));
357
358 let retrieved = registry.get("Order@v1").unwrap();
359 assert_eq!(retrieved.hash, 100);
360 }
361
362 #[test]
363 fn get_by_hash() {
364 let registry = VersionedSchemaRegistry::new();
365
366 let schema = create_test_schema("Order", 1, 12345);
367 registry.register(schema);
368
369 let retrieved = registry.get_by_hash(12345).unwrap();
370 assert_eq!(retrieved.short_name, "Order");
371 }
372
373 #[test]
374 fn version_history() {
375 let registry = VersionedSchemaRegistry::new();
376
377 registry.register(create_test_schema("Order", 1, 100));
378 registry.register(create_test_schema("Order", 2, 200));
379 registry.register(create_test_schema("Order", 3, 300));
380
381 let versions = registry.get_versions("Order");
382 assert_eq!(versions.len(), 3);
383 assert_eq!(versions[0], SchemaVersion::new(1, 0));
384 assert_eq!(versions[1], SchemaVersion::new(2, 0));
385 assert_eq!(versions[2], SchemaVersion::new(3, 0));
386 }
387
388 #[test]
389 fn get_latest() {
390 let registry = VersionedSchemaRegistry::new();
391
392 registry.register(create_test_schema("Order", 1, 100));
393 registry.register(create_test_schema("Order", 2, 200));
394
395 let latest = registry.get_latest("Order").unwrap();
396 assert_eq!(latest.version, 2);
397 assert_eq!(latest.hash, 200);
398 }
399
400 #[test]
401 fn get_version() {
402 let registry = VersionedSchemaRegistry::new();
403
404 registry.register(create_test_schema("Order", 1, 100));
405 registry.register(create_test_schema("Order", 2, 200));
406
407 let v1 = registry.get_version("Order", SchemaVersion::new(1, 0));
408 assert!(v1.is_some());
409 assert_eq!(v1.unwrap().hash, 100);
410
411 let v3 = registry.get_version("Order", SchemaVersion::new(3, 0));
412 assert!(v3.is_none());
413 }
414
415 #[test]
416 fn families() {
417 let registry = VersionedSchemaRegistry::new();
418
419 registry.register(create_test_schema("Order", 1, 100));
420 registry.register(create_test_schema("User", 1, 200));
421
422 let families = registry.families();
423 assert_eq!(families.len(), 2);
424 assert!(families.contains(&"Order".to_string()));
425 assert!(families.contains(&"User".to_string()));
426 }
427
428 #[test]
429 fn contains_family() {
430 let registry = VersionedSchemaRegistry::new();
431
432 registry.register(create_test_schema("Order", 1, 100));
433
434 assert!(registry.contains_family("Order"));
435 assert!(!registry.contains_family("User"));
436 }
437
438 #[test]
439 fn clear() {
440 let registry = VersionedSchemaRegistry::new();
441
442 registry.register(create_test_schema("Order", 1, 100));
443 assert!(!registry.is_empty());
444
445 registry.clear();
446 assert!(registry.is_empty());
447 assert!(registry.families().is_empty());
448 }
449
450 #[test]
451 fn persistence_roundtrip() {
452 let temp_dir = std::env::temp_dir();
453 let path = temp_dir.join("test_schema_registry.json");
454
455 let registry = VersionedSchemaRegistry::with_persistence(&path);
457 registry.register(create_test_schema("Order", 1, 100));
458 registry.register(create_test_schema("Order", 2, 200));
459 registry.register(create_test_schema("User", 1, 300));
460
461 registry.persist().unwrap();
463
464 let loaded = VersionedSchemaRegistry::load(&path).unwrap();
466
467 assert_eq!(loaded.len(), 3);
469 assert!(loaded.contains("Order@v1"));
470 assert!(loaded.contains("Order@v2"));
471 assert!(loaded.contains("User@v1"));
472 assert_eq!(loaded.get_by_hash(200).unwrap().short_name, "Order");
473
474 let _ = std::fs::remove_file(path);
476 }
477}