1use super::version::SchemaVersion;
7use crate::arena::Arena;
8use crate::error::{Result, XervError};
9use crate::types::ArenaOffset;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14pub type MigrationFn = Arc<dyn Fn(&Arena, ArenaOffset) -> Result<ArenaOffset> + Send + Sync>;
28
29#[derive(Clone)]
31pub struct Migration {
32 pub from_schema: String,
34 pub from_hash: u64,
36 pub to_schema: String,
38 pub to_hash: u64,
40 pub from_version: SchemaVersion,
42 pub to_version: SchemaVersion,
44 pub transform: MigrationFn,
46 pub description: String,
48}
49
50impl Migration {
51 pub fn new<F>(
53 from_schema: impl Into<String>,
54 from_hash: u64,
55 to_schema: impl Into<String>,
56 to_hash: u64,
57 transform: F,
58 ) -> Self
59 where
60 F: Fn(&Arena, ArenaOffset) -> Result<ArenaOffset> + Send + Sync + 'static,
61 {
62 let from = from_schema.into();
63 let to = to_schema.into();
64
65 let from_version = extract_version(&from).unwrap_or_default();
67 let to_version = extract_version(&to).unwrap_or_default();
68
69 Self {
70 from_schema: from,
71 from_hash,
72 to_schema: to,
73 to_hash,
74 from_version,
75 to_version,
76 transform: Arc::new(transform),
77 description: String::new(),
78 }
79 }
80
81 pub fn with_description(mut self, description: impl Into<String>) -> Self {
83 self.description = description.into();
84 self
85 }
86
87 pub fn apply(&self, arena: &Arena, offset: ArenaOffset) -> Result<ArenaOffset> {
89 (self.transform)(arena, offset)
90 }
91}
92
93impl std::fmt::Debug for Migration {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("Migration")
96 .field("from_schema", &self.from_schema)
97 .field("from_hash", &self.from_hash)
98 .field("to_schema", &self.to_schema)
99 .field("to_hash", &self.to_hash)
100 .field("description", &self.description)
101 .finish()
102 }
103}
104
105struct MigrationGraph {
109 edges: HashMap<u64, Vec<(u64, usize)>>,
111}
112
113impl MigrationGraph {
114 fn new() -> Self {
115 Self {
116 edges: HashMap::new(),
117 }
118 }
119
120 fn add_edge(&mut self, from_hash: u64, to_hash: u64, migration_index: usize) {
121 self.edges
122 .entry(from_hash)
123 .or_default()
124 .push((to_hash, migration_index));
125 }
126
127 fn remove_edges_for(&mut self, from_hash: u64, to_hash: u64) {
128 if let Some(edges) = self.edges.get_mut(&from_hash) {
129 edges.retain(|(hash, _)| *hash != to_hash);
130 }
131 }
132
133 fn shortest_path(&self, from_hash: u64, to_hash: u64) -> Option<Vec<usize>> {
137 if from_hash == to_hash {
138 return Some(Vec::new());
139 }
140
141 let mut visited: HashMap<u64, (u64, usize)> = HashMap::new();
143 let mut queue = std::collections::VecDeque::new();
144
145 queue.push_back(from_hash);
146 visited.insert(from_hash, (0, usize::MAX)); while let Some(current) = queue.pop_front() {
149 if let Some(neighbors) = self.edges.get(¤t) {
150 for &(next_hash, migration_idx) in neighbors {
151 if next_hash == to_hash {
152 let mut path = vec![migration_idx];
154 let mut node = current;
155
156 while let Some(&(parent, idx)) = visited.get(&node) {
157 if idx == usize::MAX {
158 break; }
160 path.push(idx);
161 node = parent;
162 }
163
164 path.reverse();
165 return Some(path);
166 }
167
168 if !visited.contains_key(&next_hash) {
169 visited.insert(next_hash, (current, migration_idx));
170 queue.push_back(next_hash);
171 }
172 }
173 }
174 }
175
176 None
177 }
178}
179
180pub struct MigrationRegistry {
185 migrations: RwLock<Vec<Migration>>,
187 direct: RwLock<HashMap<(u64, u64), usize>>,
189 graph: RwLock<MigrationGraph>,
191}
192
193impl MigrationRegistry {
194 pub fn new() -> Self {
196 Self {
197 migrations: RwLock::new(Vec::new()),
198 direct: RwLock::new(HashMap::new()),
199 graph: RwLock::new(MigrationGraph::new()),
200 }
201 }
202
203 pub fn register(&self, migration: Migration) -> Result<()> {
207 let key = (migration.from_hash, migration.to_hash);
208
209 let mut migrations = self.migrations.write();
210 let mut direct = self.direct.write();
211 let mut graph = self.graph.write();
212
213 if let Some(&existing_idx) = direct.get(&key) {
215 migrations[existing_idx] = migration;
217 } else {
218 let idx = migrations.len();
220 migrations.push(migration);
221 direct.insert(key, idx);
222 graph.add_edge(key.0, key.1, idx);
223 }
224
225 Ok(())
226 }
227
228 pub fn unregister(&self, from_hash: u64, to_hash: u64) -> Option<Migration> {
230 let key = (from_hash, to_hash);
231
232 let migrations = self.migrations.read();
233 let mut direct = self.direct.write();
234 let mut graph = self.graph.write();
235
236 if let Some(idx) = direct.remove(&key) {
237 graph.remove_edges_for(from_hash, to_hash);
238 Some(migrations[idx].clone())
241 } else {
242 None
243 }
244 }
245
246 pub fn find_direct(&self, from_hash: u64, to_hash: u64) -> Option<Migration> {
248 let direct = self.direct.read();
249 let migrations = self.migrations.read();
250
251 direct
252 .get(&(from_hash, to_hash))
253 .map(|&idx| migrations[idx].clone())
254 }
255
256 pub fn find_path(&self, from_hash: u64, to_hash: u64) -> Option<Vec<Migration>> {
260 if from_hash == to_hash {
261 return Some(Vec::new());
262 }
263
264 let graph = self.graph.read();
265 let migrations = self.migrations.read();
266
267 graph.shortest_path(from_hash, to_hash).map(|indices| {
268 indices
269 .into_iter()
270 .map(|idx| migrations[idx].clone())
271 .collect()
272 })
273 }
274
275 pub fn has_path(&self, from_hash: u64, to_hash: u64) -> bool {
277 if from_hash == to_hash {
278 return true;
279 }
280
281 let graph = self.graph.read();
282 graph.shortest_path(from_hash, to_hash).is_some()
283 }
284
285 pub fn migrate(
289 &self,
290 arena: &Arena,
291 offset: ArenaOffset,
292 from_hash: u64,
293 to_hash: u64,
294 ) -> Result<ArenaOffset> {
295 if from_hash == to_hash {
296 return Ok(offset);
297 }
298
299 let path =
300 self.find_path(from_hash, to_hash)
301 .ok_or_else(|| XervError::SchemaValidation {
302 schema: format!("hash:{}", from_hash),
303 cause: format!("No migration path to hash:{}", to_hash),
304 })?;
305
306 let mut current_offset = offset;
307 for migration in path {
308 current_offset = migration.apply(arena, current_offset)?;
309 }
310
311 Ok(current_offset)
312 }
313
314 pub fn list(&self) -> Vec<Migration> {
316 self.migrations.read().clone()
317 }
318
319 pub fn len(&self) -> usize {
321 self.migrations.read().len()
322 }
323
324 pub fn is_empty(&self) -> bool {
326 self.migrations.read().is_empty()
327 }
328
329 pub fn migrations_from(&self, from_hash: u64) -> Vec<Migration> {
331 let migrations = self.migrations.read();
332 let graph = self.graph.read();
333
334 graph
335 .edges
336 .get(&from_hash)
337 .map(|edges| {
338 edges
339 .iter()
340 .map(|&(_, idx)| migrations[idx].clone())
341 .collect()
342 })
343 .unwrap_or_default()
344 }
345
346 pub fn migrations_to(&self, to_hash: u64) -> Vec<Migration> {
348 let migrations = self.migrations.read();
349 let direct = self.direct.read();
350
351 direct
352 .iter()
353 .filter(|&(&(_, to), _)| to == to_hash)
354 .map(|(_, &idx)| migrations[idx].clone())
355 .collect()
356 }
357}
358
359impl Default for MigrationRegistry {
360 fn default() -> Self {
361 Self::new()
362 }
363}
364
365fn extract_version(schema_name: &str) -> Option<SchemaVersion> {
367 let version_part = schema_name.rsplit('@').next()?;
368 SchemaVersion::parse(version_part)
369}
370
371pub fn compose_migrations(migrations: Vec<Migration>) -> MigrationFn {
376 Arc::new(move |arena, offset| {
377 let mut current = offset;
378 for migration in &migrations {
379 current = migration.apply(arena, current)?;
380 }
381 Ok(current)
382 })
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 fn mock_migration(from_hash: u64, to_hash: u64) -> Migration {
390 Migration::new(
391 format!("Test@v{}", from_hash),
392 from_hash,
393 format!("Test@v{}", to_hash),
394 to_hash,
395 move |_arena, offset| {
396 Ok(offset)
398 },
399 )
400 .with_description(format!("Migrate {} to {}", from_hash, to_hash))
401 }
402
403 #[test]
404 fn register_and_find_direct() {
405 let registry = MigrationRegistry::new();
406
407 let migration = mock_migration(100, 200);
408 registry.register(migration).unwrap();
409
410 assert!(registry.find_direct(100, 200).is_some());
411 assert!(registry.find_direct(200, 100).is_none());
412 assert!(registry.find_direct(100, 300).is_none());
413 }
414
415 #[test]
416 fn find_path_direct() {
417 let registry = MigrationRegistry::new();
418
419 registry.register(mock_migration(100, 200)).unwrap();
420
421 let path = registry.find_path(100, 200).unwrap();
422 assert_eq!(path.len(), 1);
423 assert_eq!(path[0].from_hash, 100);
424 assert_eq!(path[0].to_hash, 200);
425 }
426
427 #[test]
428 fn find_path_multi_hop() {
429 let registry = MigrationRegistry::new();
430
431 registry.register(mock_migration(100, 200)).unwrap();
432 registry.register(mock_migration(200, 300)).unwrap();
433 registry.register(mock_migration(300, 400)).unwrap();
434
435 let path = registry.find_path(100, 200).unwrap();
437 assert_eq!(path.len(), 1);
438
439 let path = registry.find_path(100, 400).unwrap();
441 assert_eq!(path.len(), 3);
442 assert_eq!(path[0].from_hash, 100);
443 assert_eq!(path[1].from_hash, 200);
444 assert_eq!(path[2].from_hash, 300);
445 }
446
447 #[test]
448 fn find_path_shortest() {
449 let registry = MigrationRegistry::new();
450
451 registry.register(mock_migration(100, 200)).unwrap();
453 registry.register(mock_migration(200, 300)).unwrap();
454 registry.register(mock_migration(300, 400)).unwrap();
455
456 registry.register(mock_migration(100, 400)).unwrap();
458
459 let path = registry.find_path(100, 400).unwrap();
461 assert_eq!(path.len(), 1);
462 assert_eq!(path[0].to_hash, 400);
463 }
464
465 #[test]
466 fn no_path() {
467 let registry = MigrationRegistry::new();
468
469 registry.register(mock_migration(100, 200)).unwrap();
470 registry.register(mock_migration(300, 400)).unwrap();
471
472 assert!(registry.find_path(100, 400).is_none());
474 assert!(!registry.has_path(100, 400));
475 }
476
477 #[test]
478 fn same_version_empty_path() {
479 let registry = MigrationRegistry::new();
480
481 let path = registry.find_path(100, 100).unwrap();
482 assert!(path.is_empty());
483 assert!(registry.has_path(100, 100));
484 }
485
486 #[test]
487 fn replace_migration() {
488 let registry = MigrationRegistry::new();
489
490 let m1 = mock_migration(100, 200).with_description("First");
491 let m2 = mock_migration(100, 200).with_description("Second");
492
493 registry.register(m1).unwrap();
494 assert_eq!(registry.len(), 1);
495
496 registry.register(m2).unwrap();
497 assert_eq!(registry.len(), 1);
498
499 let found = registry.find_direct(100, 200).unwrap();
500 assert_eq!(found.description, "Second");
501 }
502
503 #[test]
504 fn migrations_from_and_to() {
505 let registry = MigrationRegistry::new();
506
507 registry.register(mock_migration(100, 200)).unwrap();
508 registry.register(mock_migration(100, 300)).unwrap();
509 registry.register(mock_migration(200, 300)).unwrap();
510
511 let from_100 = registry.migrations_from(100);
512 assert_eq!(from_100.len(), 2);
513
514 let to_300 = registry.migrations_to(300);
515 assert_eq!(to_300.len(), 2);
516 }
517
518 #[test]
519 fn extract_version_from_name() {
520 assert_eq!(
521 extract_version("OrderInput@v1"),
522 Some(SchemaVersion::new(1, 0))
523 );
524 assert_eq!(
525 extract_version("OrderInput@v2.1"),
526 Some(SchemaVersion::new(2, 1))
527 );
528 assert_eq!(extract_version("OrderInput"), None);
529 }
530}