xerv_core/schema/
migration.rs

1//! Schema migration registry and transform logic.
2//!
3//! Provides infrastructure for registering and applying schema migrations.
4//! Migrations transform data from one schema version to another.
5
6use super::version::SchemaVersion;
7use crate::arena::Arena;
8use crate::error::{Result, XervError};
9use crate::types::ArenaOffset;
10use parking_lot::RwLock;
11use std::collections::HashMap;
12use std::sync::Arc;
13
14/// Migration function signature.
15///
16/// A migration function reads data at a source offset, transforms it,
17/// and writes the result to the arena, returning the new offset.
18///
19/// # Arguments
20///
21/// * `arena` - The arena containing the source data
22/// * `src_offset` - The offset of the source data in the arena
23///
24/// # Returns
25///
26/// The offset of the transformed data in the arena, or an error.
27pub type MigrationFn = Arc<dyn Fn(&Arena, ArenaOffset) -> Result<ArenaOffset> + Send + Sync>;
28
29/// A registered migration between schema versions.
30#[derive(Clone)]
31pub struct Migration {
32    /// Source schema name with version (e.g., "OrderInput@v1").
33    pub from_schema: String,
34    /// Source schema hash.
35    pub from_hash: u64,
36    /// Target schema name with version (e.g., "OrderInput@v2").
37    pub to_schema: String,
38    /// Target schema hash.
39    pub to_hash: u64,
40    /// Source version.
41    pub from_version: SchemaVersion,
42    /// Target version.
43    pub to_version: SchemaVersion,
44    /// The transform function.
45    pub transform: MigrationFn,
46    /// Human-readable description of what this migration does.
47    pub description: String,
48}
49
50impl Migration {
51    /// Create a new migration.
52    pub fn new<F>(
53        from_schema: impl Into<String>,
54        from_hash: u64,
55        to_schema: impl Into<String>,
56        to_hash: u64,
57        transform: F,
58    ) -> Self
59    where
60        F: Fn(&Arena, ArenaOffset) -> Result<ArenaOffset> + Send + Sync + 'static,
61    {
62        let from = from_schema.into();
63        let to = to_schema.into();
64
65        // Extract versions from schema names
66        let from_version = extract_version(&from).unwrap_or_default();
67        let to_version = extract_version(&to).unwrap_or_default();
68
69        Self {
70            from_schema: from,
71            from_hash,
72            to_schema: to,
73            to_hash,
74            from_version,
75            to_version,
76            transform: Arc::new(transform),
77            description: String::new(),
78        }
79    }
80
81    /// Add a description to this migration.
82    pub fn with_description(mut self, description: impl Into<String>) -> Self {
83        self.description = description.into();
84        self
85    }
86
87    /// Apply this migration to data at the given offset.
88    pub fn apply(&self, arena: &Arena, offset: ArenaOffset) -> Result<ArenaOffset> {
89        (self.transform)(arena, offset)
90    }
91}
92
93impl std::fmt::Debug for Migration {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("Migration")
96            .field("from_schema", &self.from_schema)
97            .field("from_hash", &self.from_hash)
98            .field("to_schema", &self.to_schema)
99            .field("to_hash", &self.to_hash)
100            .field("description", &self.description)
101            .finish()
102    }
103}
104
105/// Graph for computing migration paths.
106///
107/// Uses BFS to find the shortest migration path between schemas.
108struct MigrationGraph {
109    /// Edges: from_hash -> [(to_hash, migration_index)]
110    edges: HashMap<u64, Vec<(u64, usize)>>,
111}
112
113impl MigrationGraph {
114    fn new() -> Self {
115        Self {
116            edges: HashMap::new(),
117        }
118    }
119
120    fn add_edge(&mut self, from_hash: u64, to_hash: u64, migration_index: usize) {
121        self.edges
122            .entry(from_hash)
123            .or_default()
124            .push((to_hash, migration_index));
125    }
126
127    fn remove_edges_for(&mut self, from_hash: u64, to_hash: u64) {
128        if let Some(edges) = self.edges.get_mut(&from_hash) {
129            edges.retain(|(hash, _)| *hash != to_hash);
130        }
131    }
132
133    /// Find shortest path from source to target using BFS.
134    ///
135    /// Returns the migration indices in order.
136    fn shortest_path(&self, from_hash: u64, to_hash: u64) -> Option<Vec<usize>> {
137        if from_hash == to_hash {
138            return Some(Vec::new());
139        }
140
141        // BFS with parent tracking
142        let mut visited: HashMap<u64, (u64, usize)> = HashMap::new();
143        let mut queue = std::collections::VecDeque::new();
144
145        queue.push_back(from_hash);
146        visited.insert(from_hash, (0, usize::MAX)); // Sentinel for start
147
148        while let Some(current) = queue.pop_front() {
149            if let Some(neighbors) = self.edges.get(&current) {
150                for &(next_hash, migration_idx) in neighbors {
151                    if next_hash == to_hash {
152                        // Found the target - reconstruct path
153                        let mut path = vec![migration_idx];
154                        let mut node = current;
155
156                        while let Some(&(parent, idx)) = visited.get(&node) {
157                            if idx == usize::MAX {
158                                break; // Reached start
159                            }
160                            path.push(idx);
161                            node = parent;
162                        }
163
164                        path.reverse();
165                        return Some(path);
166                    }
167
168                    if !visited.contains_key(&next_hash) {
169                        visited.insert(next_hash, (current, migration_idx));
170                        queue.push_back(next_hash);
171                    }
172                }
173            }
174        }
175
176        None
177    }
178}
179
180/// Registry of schema migrations.
181///
182/// Stores migrations between schema versions and provides path-finding
183/// for multi-hop migrations (e.g., v1 → v2 → v3).
184pub struct MigrationRegistry {
185    /// All registered migrations.
186    migrations: RwLock<Vec<Migration>>,
187    /// Direct migrations keyed by (from_hash, to_hash).
188    direct: RwLock<HashMap<(u64, u64), usize>>,
189    /// Graph for path finding.
190    graph: RwLock<MigrationGraph>,
191}
192
193impl MigrationRegistry {
194    /// Create a new empty migration registry.
195    pub fn new() -> Self {
196        Self {
197            migrations: RwLock::new(Vec::new()),
198            direct: RwLock::new(HashMap::new()),
199            graph: RwLock::new(MigrationGraph::new()),
200        }
201    }
202
203    /// Register a migration.
204    ///
205    /// If a migration between the same schemas already exists, it is replaced.
206    pub fn register(&self, migration: Migration) -> Result<()> {
207        let key = (migration.from_hash, migration.to_hash);
208
209        let mut migrations = self.migrations.write();
210        let mut direct = self.direct.write();
211        let mut graph = self.graph.write();
212
213        // Check if migration already exists
214        if let Some(&existing_idx) = direct.get(&key) {
215            // Replace existing migration
216            migrations[existing_idx] = migration;
217        } else {
218            // Add new migration
219            let idx = migrations.len();
220            migrations.push(migration);
221            direct.insert(key, idx);
222            graph.add_edge(key.0, key.1, idx);
223        }
224
225        Ok(())
226    }
227
228    /// Unregister a migration.
229    pub fn unregister(&self, from_hash: u64, to_hash: u64) -> Option<Migration> {
230        let key = (from_hash, to_hash);
231
232        let migrations = self.migrations.read();
233        let mut direct = self.direct.write();
234        let mut graph = self.graph.write();
235
236        if let Some(idx) = direct.remove(&key) {
237            graph.remove_edges_for(from_hash, to_hash);
238            // We don't actually remove from the vec to keep indices stable
239            // Just return a clone
240            Some(migrations[idx].clone())
241        } else {
242            None
243        }
244    }
245
246    /// Find a direct migration from one schema to another.
247    pub fn find_direct(&self, from_hash: u64, to_hash: u64) -> Option<Migration> {
248        let direct = self.direct.read();
249        let migrations = self.migrations.read();
250
251        direct
252            .get(&(from_hash, to_hash))
253            .map(|&idx| migrations[idx].clone())
254    }
255
256    /// Find the migration path from one schema to another.
257    ///
258    /// Returns the migrations in order, or None if no path exists.
259    pub fn find_path(&self, from_hash: u64, to_hash: u64) -> Option<Vec<Migration>> {
260        if from_hash == to_hash {
261            return Some(Vec::new());
262        }
263
264        let graph = self.graph.read();
265        let migrations = self.migrations.read();
266
267        graph.shortest_path(from_hash, to_hash).map(|indices| {
268            indices
269                .into_iter()
270                .map(|idx| migrations[idx].clone())
271                .collect()
272        })
273    }
274
275    /// Check if a migration path exists.
276    pub fn has_path(&self, from_hash: u64, to_hash: u64) -> bool {
277        if from_hash == to_hash {
278            return true;
279        }
280
281        let graph = self.graph.read();
282        graph.shortest_path(from_hash, to_hash).is_some()
283    }
284
285    /// Apply migration to arena data.
286    ///
287    /// Finds the migration path and applies each migration in sequence.
288    pub fn migrate(
289        &self,
290        arena: &Arena,
291        offset: ArenaOffset,
292        from_hash: u64,
293        to_hash: u64,
294    ) -> Result<ArenaOffset> {
295        if from_hash == to_hash {
296            return Ok(offset);
297        }
298
299        let path =
300            self.find_path(from_hash, to_hash)
301                .ok_or_else(|| XervError::SchemaValidation {
302                    schema: format!("hash:{}", from_hash),
303                    cause: format!("No migration path to hash:{}", to_hash),
304                })?;
305
306        let mut current_offset = offset;
307        for migration in path {
308            current_offset = migration.apply(arena, current_offset)?;
309        }
310
311        Ok(current_offset)
312    }
313
314    /// Get all registered migrations.
315    pub fn list(&self) -> Vec<Migration> {
316        self.migrations.read().clone()
317    }
318
319    /// Get the number of registered migrations.
320    pub fn len(&self) -> usize {
321        self.migrations.read().len()
322    }
323
324    /// Check if the registry is empty.
325    pub fn is_empty(&self) -> bool {
326        self.migrations.read().is_empty()
327    }
328
329    /// Get migrations from a specific schema.
330    pub fn migrations_from(&self, from_hash: u64) -> Vec<Migration> {
331        let migrations = self.migrations.read();
332        let graph = self.graph.read();
333
334        graph
335            .edges
336            .get(&from_hash)
337            .map(|edges| {
338                edges
339                    .iter()
340                    .map(|&(_, idx)| migrations[idx].clone())
341                    .collect()
342            })
343            .unwrap_or_default()
344    }
345
346    /// Get migrations to a specific schema.
347    pub fn migrations_to(&self, to_hash: u64) -> Vec<Migration> {
348        let migrations = self.migrations.read();
349        let direct = self.direct.read();
350
351        direct
352            .iter()
353            .filter(|&(&(_, to), _)| to == to_hash)
354            .map(|(_, &idx)| migrations[idx].clone())
355            .collect()
356    }
357}
358
359impl Default for MigrationRegistry {
360    fn default() -> Self {
361        Self::new()
362    }
363}
364
365/// Extract version from a schema name like "OrderInput@v1".
366fn extract_version(schema_name: &str) -> Option<SchemaVersion> {
367    let version_part = schema_name.rsplit('@').next()?;
368    SchemaVersion::parse(version_part)
369}
370
371/// Compose multiple migration functions into one.
372///
373/// This is useful when you need to apply multiple migrations in sequence
374/// but want to treat them as a single operation.
375pub fn compose_migrations(migrations: Vec<Migration>) -> MigrationFn {
376    Arc::new(move |arena, offset| {
377        let mut current = offset;
378        for migration in &migrations {
379            current = migration.apply(arena, current)?;
380        }
381        Ok(current)
382    })
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    fn mock_migration(from_hash: u64, to_hash: u64) -> Migration {
390        Migration::new(
391            format!("Test@v{}", from_hash),
392            from_hash,
393            format!("Test@v{}", to_hash),
394            to_hash,
395            move |_arena, offset| {
396                // Mock: just return the same offset
397                Ok(offset)
398            },
399        )
400        .with_description(format!("Migrate {} to {}", from_hash, to_hash))
401    }
402
403    #[test]
404    fn register_and_find_direct() {
405        let registry = MigrationRegistry::new();
406
407        let migration = mock_migration(100, 200);
408        registry.register(migration).unwrap();
409
410        assert!(registry.find_direct(100, 200).is_some());
411        assert!(registry.find_direct(200, 100).is_none());
412        assert!(registry.find_direct(100, 300).is_none());
413    }
414
415    #[test]
416    fn find_path_direct() {
417        let registry = MigrationRegistry::new();
418
419        registry.register(mock_migration(100, 200)).unwrap();
420
421        let path = registry.find_path(100, 200).unwrap();
422        assert_eq!(path.len(), 1);
423        assert_eq!(path[0].from_hash, 100);
424        assert_eq!(path[0].to_hash, 200);
425    }
426
427    #[test]
428    fn find_path_multi_hop() {
429        let registry = MigrationRegistry::new();
430
431        registry.register(mock_migration(100, 200)).unwrap();
432        registry.register(mock_migration(200, 300)).unwrap();
433        registry.register(mock_migration(300, 400)).unwrap();
434
435        // Direct path
436        let path = registry.find_path(100, 200).unwrap();
437        assert_eq!(path.len(), 1);
438
439        // Multi-hop path
440        let path = registry.find_path(100, 400).unwrap();
441        assert_eq!(path.len(), 3);
442        assert_eq!(path[0].from_hash, 100);
443        assert_eq!(path[1].from_hash, 200);
444        assert_eq!(path[2].from_hash, 300);
445    }
446
447    #[test]
448    fn find_path_shortest() {
449        let registry = MigrationRegistry::new();
450
451        // Long path: 100 -> 200 -> 300 -> 400
452        registry.register(mock_migration(100, 200)).unwrap();
453        registry.register(mock_migration(200, 300)).unwrap();
454        registry.register(mock_migration(300, 400)).unwrap();
455
456        // Short path: 100 -> 400
457        registry.register(mock_migration(100, 400)).unwrap();
458
459        // Should find the shortest path
460        let path = registry.find_path(100, 400).unwrap();
461        assert_eq!(path.len(), 1);
462        assert_eq!(path[0].to_hash, 400);
463    }
464
465    #[test]
466    fn no_path() {
467        let registry = MigrationRegistry::new();
468
469        registry.register(mock_migration(100, 200)).unwrap();
470        registry.register(mock_migration(300, 400)).unwrap();
471
472        // No path from 100 to 400
473        assert!(registry.find_path(100, 400).is_none());
474        assert!(!registry.has_path(100, 400));
475    }
476
477    #[test]
478    fn same_version_empty_path() {
479        let registry = MigrationRegistry::new();
480
481        let path = registry.find_path(100, 100).unwrap();
482        assert!(path.is_empty());
483        assert!(registry.has_path(100, 100));
484    }
485
486    #[test]
487    fn replace_migration() {
488        let registry = MigrationRegistry::new();
489
490        let m1 = mock_migration(100, 200).with_description("First");
491        let m2 = mock_migration(100, 200).with_description("Second");
492
493        registry.register(m1).unwrap();
494        assert_eq!(registry.len(), 1);
495
496        registry.register(m2).unwrap();
497        assert_eq!(registry.len(), 1);
498
499        let found = registry.find_direct(100, 200).unwrap();
500        assert_eq!(found.description, "Second");
501    }
502
503    #[test]
504    fn migrations_from_and_to() {
505        let registry = MigrationRegistry::new();
506
507        registry.register(mock_migration(100, 200)).unwrap();
508        registry.register(mock_migration(100, 300)).unwrap();
509        registry.register(mock_migration(200, 300)).unwrap();
510
511        let from_100 = registry.migrations_from(100);
512        assert_eq!(from_100.len(), 2);
513
514        let to_300 = registry.migrations_to(300);
515        assert_eq!(to_300.len(), 2);
516    }
517
518    #[test]
519    fn extract_version_from_name() {
520        assert_eq!(
521            extract_version("OrderInput@v1"),
522            Some(SchemaVersion::new(1, 0))
523        );
524        assert_eq!(
525            extract_version("OrderInput@v2.1"),
526            Some(SchemaVersion::new(2, 1))
527        );
528        assert_eq!(extract_version("OrderInput"), None);
529    }
530}