sochdb_core/
schema_evolution.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Schema Evolution - Online Schema Changes
16//!
17//! This module provides backwards-compatible schema evolution for SochDB,
18//! allowing schema changes without full table rewrites through:
19//!
20//! - **Schema Versioning**: Each schema has a monotonic version number
21//! - **Migration Registry**: Registered transformations between versions
22//! - **Lazy Migration**: Rows are migrated on-read when version mismatch detected
23//! - **Background Compaction**: Asynchronous migration during idle time
24//!
25//! # Design
26//!
27//! ```text
28//! ┌─────────────────────────────────────────────────────────────┐
29//! │                    Schema Version Graph                      │
30//! │                                                             │
31//! │  v1 ──────────────────────────────────────────────────────→ │
32//! │   ↓                                                         │
33//! │  v2 (add column "email")                                    │
34//! │   ↓                                                         │
35//! │  v3 (rename "name" → "full_name", add "created_at")        │
36//! │   ↓                                                         │
37//! │  v4 (drop "legacy_field")                                   │
38//! └─────────────────────────────────────────────────────────────┘
39//!
40//! Rows carry their schema version. On read:
41//! 1. Check row version vs current schema version
42//! 2. If mismatch, apply migration chain (v_row → v_current)
43//! 3. Return migrated row (optionally rewrite in background)
44//! ```
45
46use std::collections::HashMap;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicU64, Ordering};
49
50use crate::error::{Result, SochDBError};
51use crate::soch::{SochRow, SochSchema, SochType, SochValue};
52
53/// Schema version identifier
54pub type SchemaVersion = u64;
55
56/// Unique schema identifier (table/collection name + version)
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct SchemaId {
59    pub name: String,
60    pub version: SchemaVersion,
61}
62
63impl SchemaId {
64    pub fn new(name: impl Into<String>, version: SchemaVersion) -> Self {
65        Self {
66            name: name.into(),
67            version,
68        }
69    }
70}
71
72/// Describes a single schema change operation
73#[derive(Debug, Clone)]
74pub enum SchemaChange {
75    /// Add a new column with default value
76    AddColumn {
77        name: String,
78        column_type: SochType,
79        default: SochValue,
80        position: Option<usize>, // None = append at end
81    },
82    /// Drop an existing column
83    DropColumn { name: String },
84    /// Rename a column
85    RenameColumn { old_name: String, new_name: String },
86    /// Change column type with conversion function
87    ChangeType {
88        name: String,
89        new_type: SochType,
90        converter: TypeConverter,
91    },
92    /// Reorder columns
93    ReorderColumns { new_order: Vec<String> },
94}
95
96/// Type conversion function for schema evolution
97#[derive(Debug, Clone)]
98pub enum TypeConverter {
99    /// Identity (no conversion needed, types are compatible)
100    Identity,
101    /// Convert integer to text
102    IntToText,
103    /// Convert text to integer (may fail)
104    TextToInt,
105    /// Convert float to integer (truncate)
106    FloatToInt,
107    /// Convert integer to float
108    IntToFloat,
109    /// Custom conversion with closure index (stored in registry)
110    Custom(usize),
111}
112
113impl TypeConverter {
114    /// Apply the type conversion
115    pub fn convert(
116        &self,
117        value: &SochValue,
118        custom_converters: &[fn(&SochValue) -> SochValue],
119    ) -> Result<SochValue> {
120        match self {
121            TypeConverter::Identity => Ok(value.clone()),
122            TypeConverter::IntToText => match value {
123                SochValue::Int(i) => Ok(SochValue::Text(i.to_string())),
124                SochValue::UInt(u) => Ok(SochValue::Text(u.to_string())),
125                _ => Err(SochDBError::SchemaEvolution(format!(
126                    "Cannot convert {:?} to text via IntToText",
127                    value
128                ))),
129            },
130            TypeConverter::TextToInt => match value {
131                SochValue::Text(s) => s.parse::<i64>().map(SochValue::Int).map_err(|_| {
132                    SochDBError::SchemaEvolution(format!("Cannot parse '{}' as integer", s))
133                }),
134                _ => Err(SochDBError::SchemaEvolution(format!(
135                    "Cannot convert {:?} to int via TextToInt",
136                    value
137                ))),
138            },
139            TypeConverter::FloatToInt => match value {
140                SochValue::Float(f) => Ok(SochValue::Int(*f as i64)),
141                _ => Err(SochDBError::SchemaEvolution(format!(
142                    "Cannot convert {:?} to int via FloatToInt",
143                    value
144                ))),
145            },
146            TypeConverter::IntToFloat => match value {
147                SochValue::Int(i) => Ok(SochValue::Float(*i as f64)),
148                SochValue::UInt(u) => Ok(SochValue::Float(*u as f64)),
149                _ => Err(SochDBError::SchemaEvolution(format!(
150                    "Cannot convert {:?} to float via IntToFloat",
151                    value
152                ))),
153            },
154            TypeConverter::Custom(idx) => {
155                if *idx < custom_converters.len() {
156                    Ok(custom_converters[*idx](value))
157                } else {
158                    Err(SochDBError::SchemaEvolution(format!(
159                        "Custom converter index {} out of bounds",
160                        idx
161                    )))
162                }
163            }
164        }
165    }
166}
167
168/// Migration between two schema versions
169#[derive(Debug, Clone)]
170pub struct Migration {
171    pub from_version: SchemaVersion,
172    pub to_version: SchemaVersion,
173    pub changes: Vec<SchemaChange>,
174}
175
176impl Migration {
177    pub fn new(from: SchemaVersion, to: SchemaVersion) -> Self {
178        Self {
179            from_version: from,
180            to_version: to,
181            changes: Vec::new(),
182        }
183    }
184
185    pub fn add_column(
186        mut self,
187        name: impl Into<String>,
188        column_type: SochType,
189        default: SochValue,
190    ) -> Self {
191        self.changes.push(SchemaChange::AddColumn {
192            name: name.into(),
193            column_type,
194            default,
195            position: None,
196        });
197        self
198    }
199
200    pub fn add_column_at(
201        mut self,
202        name: impl Into<String>,
203        column_type: SochType,
204        default: SochValue,
205        position: usize,
206    ) -> Self {
207        self.changes.push(SchemaChange::AddColumn {
208            name: name.into(),
209            column_type,
210            default,
211            position: Some(position),
212        });
213        self
214    }
215
216    pub fn drop_column(mut self, name: impl Into<String>) -> Self {
217        self.changes
218            .push(SchemaChange::DropColumn { name: name.into() });
219        self
220    }
221
222    pub fn rename_column(
223        mut self,
224        old_name: impl Into<String>,
225        new_name: impl Into<String>,
226    ) -> Self {
227        self.changes.push(SchemaChange::RenameColumn {
228            old_name: old_name.into(),
229            new_name: new_name.into(),
230        });
231        self
232    }
233
234    pub fn change_type(
235        mut self,
236        name: impl Into<String>,
237        new_type: SochType,
238        converter: TypeConverter,
239    ) -> Self {
240        self.changes.push(SchemaChange::ChangeType {
241            name: name.into(),
242            new_type,
243            converter,
244        });
245        self
246    }
247}
248
249/// Row with embedded schema version for lazy migration
250#[derive(Debug, Clone)]
251pub struct VersionedRow {
252    pub version: SchemaVersion,
253    pub data: SochRow,
254}
255
256impl VersionedRow {
257    pub fn new(version: SchemaVersion, data: SochRow) -> Self {
258        Self { version, data }
259    }
260}
261
262/// Statistics for schema evolution operations
263#[derive(Debug, Default)]
264pub struct EvolutionStats {
265    pub rows_migrated: AtomicU64,
266    pub migrations_applied: AtomicU64,
267    pub migration_errors: AtomicU64,
268    pub lazy_migrations: AtomicU64,
269    pub background_migrations: AtomicU64,
270}
271
272impl EvolutionStats {
273    pub fn record_lazy_migration(&self) {
274        self.lazy_migrations.fetch_add(1, Ordering::Relaxed);
275        self.rows_migrated.fetch_add(1, Ordering::Relaxed);
276    }
277
278    pub fn record_background_migration(&self, count: u64) {
279        self.background_migrations
280            .fetch_add(count, Ordering::Relaxed);
281        self.rows_migrated.fetch_add(count, Ordering::Relaxed);
282    }
283
284    pub fn record_migration_applied(&self) {
285        self.migrations_applied.fetch_add(1, Ordering::Relaxed);
286    }
287
288    pub fn record_error(&self) {
289        self.migration_errors.fetch_add(1, Ordering::Relaxed);
290    }
291
292    pub fn snapshot(&self) -> EvolutionStatsSnapshot {
293        EvolutionStatsSnapshot {
294            rows_migrated: self.rows_migrated.load(Ordering::Relaxed),
295            migrations_applied: self.migrations_applied.load(Ordering::Relaxed),
296            migration_errors: self.migration_errors.load(Ordering::Relaxed),
297            lazy_migrations: self.lazy_migrations.load(Ordering::Relaxed),
298            background_migrations: self.background_migrations.load(Ordering::Relaxed),
299        }
300    }
301}
302
303#[derive(Debug, Clone)]
304pub struct EvolutionStatsSnapshot {
305    pub rows_migrated: u64,
306    pub migrations_applied: u64,
307    pub migration_errors: u64,
308    pub lazy_migrations: u64,
309    pub background_migrations: u64,
310}
311
312/// Schema version registry with migration graph
313pub struct SchemaRegistry {
314    /// Schemas by (name, version)
315    schemas: HashMap<SchemaId, SochSchema>,
316    /// Current version for each schema name
317    current_versions: HashMap<String, SchemaVersion>,
318    /// Migrations indexed by (name, from_version)
319    migrations: HashMap<(String, SchemaVersion), Migration>,
320    /// Custom type converters
321    custom_converters: Vec<fn(&SochValue) -> SochValue>,
322    /// Statistics
323    stats: Arc<EvolutionStats>,
324}
325
326impl SchemaRegistry {
327    pub fn new() -> Self {
328        Self {
329            schemas: HashMap::new(),
330            current_versions: HashMap::new(),
331            migrations: HashMap::new(),
332            custom_converters: Vec::new(),
333            stats: Arc::new(EvolutionStats::default()),
334        }
335    }
336
337    /// Register initial schema version
338    pub fn register_schema(&mut self, schema: SochSchema, version: SchemaVersion) {
339        let name = schema.name.clone();
340        let id = SchemaId::new(&name, version);
341        self.schemas.insert(id, schema);
342
343        // Update current version if this is newer
344        let current = self.current_versions.entry(name).or_insert(0);
345        if version > *current {
346            *current = version;
347        }
348    }
349
350    /// Register a migration between versions
351    pub fn register_migration(
352        &mut self,
353        name: impl Into<String>,
354        migration: Migration,
355    ) -> Result<()> {
356        let name = name.into();
357
358        // Validate versions exist or will exist
359        let key = (name.clone(), migration.from_version);
360
361        if self.migrations.contains_key(&key) {
362            return Err(SochDBError::SchemaEvolution(format!(
363                "Migration from version {} already exists for {}",
364                migration.from_version, name
365            )));
366        }
367
368        self.migrations.insert(key, migration);
369        Ok(())
370    }
371
372    /// Register a custom type converter
373    pub fn register_converter(&mut self, converter: fn(&SochValue) -> SochValue) -> usize {
374        let idx = self.custom_converters.len();
375        self.custom_converters.push(converter);
376        idx
377    }
378
379    /// Get current schema version for a name
380    pub fn current_version(&self, name: &str) -> Option<SchemaVersion> {
381        self.current_versions.get(name).copied()
382    }
383
384    /// Get schema by name and version
385    pub fn get_schema(&self, name: &str, version: SchemaVersion) -> Option<&SochSchema> {
386        self.schemas.get(&SchemaId::new(name, version))
387    }
388
389    /// Get current schema by name
390    pub fn current_schema(&self, name: &str) -> Option<&SochSchema> {
391        self.current_version(name)
392            .and_then(|v| self.get_schema(name, v))
393    }
394
395    /// Migrate a row from old version to current version
396    pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
397        let current_version = self.current_version(name).ok_or_else(|| {
398            SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
399        })?;
400
401        if row.version == current_version {
402            return Ok(row);
403        }
404
405        if row.version > current_version {
406            return Err(SochDBError::SchemaEvolution(format!(
407                "Row version {} is newer than current schema version {}",
408                row.version, current_version
409            )));
410        }
411
412        // Build migration chain
413        let mut current_row = row.data;
414        let mut version = row.version;
415
416        while version < current_version {
417            let migration = self
418                .migrations
419                .get(&(name.to_string(), version))
420                .ok_or_else(|| {
421                    SochDBError::SchemaEvolution(format!(
422                        "No migration path from version {} for '{}'",
423                        version, name
424                    ))
425                })?;
426
427            current_row = self.apply_migration(&current_row, migration, name)?;
428            version = migration.to_version;
429            self.stats.record_migration_applied();
430        }
431
432        self.stats.record_lazy_migration();
433
434        Ok(VersionedRow::new(current_version, current_row))
435    }
436
437    /// Apply a single migration to a row
438    fn apply_migration(
439        &self,
440        row: &SochRow,
441        migration: &Migration,
442        schema_name: &str,
443    ) -> Result<SochRow> {
444        // Get source schema to understand column positions
445        let source_schema = self
446            .get_schema(schema_name, migration.from_version)
447            .ok_or_else(|| {
448                SochDBError::SchemaEvolution(format!(
449                    "Source schema version {} not found",
450                    migration.from_version
451                ))
452            })?;
453
454        let mut values = row.values.clone();
455        let mut column_names: Vec<String> = source_schema
456            .fields
457            .iter()
458            .map(|f| f.name.clone())
459            .collect();
460
461        for change in &migration.changes {
462            match change {
463                SchemaChange::AddColumn {
464                    name,
465                    default,
466                    position,
467                    ..
468                } => match position {
469                    Some(pos) if *pos <= values.len() => {
470                        values.insert(*pos, default.clone());
471                        column_names.insert(*pos, name.clone());
472                    }
473                    _ => {
474                        values.push(default.clone());
475                        column_names.push(name.clone());
476                    }
477                },
478                SchemaChange::DropColumn { name } => {
479                    if let Some(idx) = column_names.iter().position(|n| n == name) {
480                        values.remove(idx);
481                        column_names.remove(idx);
482                    }
483                }
484                SchemaChange::RenameColumn { old_name, new_name } => {
485                    if let Some(idx) = column_names.iter().position(|n| n == old_name) {
486                        column_names[idx] = new_name.clone();
487                    }
488                }
489                SchemaChange::ChangeType {
490                    name, converter, ..
491                } => {
492                    if let Some(idx) = column_names.iter().position(|n| n == name) {
493                        values[idx] = converter.convert(&values[idx], &self.custom_converters)?;
494                    }
495                }
496                SchemaChange::ReorderColumns { new_order } => {
497                    let mut new_values = Vec::with_capacity(new_order.len());
498                    for col_name in new_order {
499                        if let Some(idx) = column_names.iter().position(|n| n == col_name) {
500                            new_values.push(values[idx].clone());
501                        }
502                    }
503                    values = new_values;
504                    column_names = new_order.clone();
505                }
506            }
507        }
508
509        Ok(SochRow::new(values))
510    }
511
512    /// Get evolution statistics
513    pub fn stats(&self) -> Arc<EvolutionStats> {
514        Arc::clone(&self.stats)
515    }
516}
517
518impl Default for SchemaRegistry {
519    fn default() -> Self {
520        Self::new()
521    }
522}
523
524/// Schema evolution manager that wraps a registry with additional features
525pub struct SchemaEvolutionManager {
526    registry: SchemaRegistry,
527    /// Pending background migrations (schema_name, from_version, row_count)
528    pending_migrations: Vec<(String, SchemaVersion, usize)>,
529}
530
531impl SchemaEvolutionManager {
532    pub fn new() -> Self {
533        Self {
534            registry: SchemaRegistry::new(),
535            pending_migrations: Vec::new(),
536        }
537    }
538
539    /// Create a new schema version with changes from current
540    pub fn evolve_schema(
541        &mut self,
542        name: &str,
543        changes: Vec<SchemaChange>,
544    ) -> Result<SchemaVersion> {
545        let current_version = self.registry.current_version(name).ok_or_else(|| {
546            SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
547        })?;
548
549        let current_schema = self
550            .registry
551            .current_schema(name)
552            .ok_or_else(|| {
553                SochDBError::SchemaEvolution(format!("Current schema not found for '{}'", name))
554            })?
555            .clone();
556
557        // Build new schema by applying changes
558        let new_version = current_version + 1;
559        let new_schema = self.apply_schema_changes(&current_schema, &changes)?;
560
561        // Register new schema and migration
562        self.registry.register_schema(new_schema, new_version);
563
564        let migration = Migration {
565            from_version: current_version,
566            to_version: new_version,
567            changes,
568        };
569        self.registry.register_migration(name, migration)?;
570
571        Ok(new_version)
572    }
573
574    /// Apply schema changes to create new schema definition
575    fn apply_schema_changes(
576        &self,
577        schema: &SochSchema,
578        changes: &[SchemaChange],
579    ) -> Result<SochSchema> {
580        let mut new_schema = schema.clone();
581
582        for change in changes {
583            match change {
584                SchemaChange::AddColumn {
585                    name,
586                    column_type,
587                    position,
588                    ..
589                } => {
590                    let field = crate::soch::SochField {
591                        name: name.clone(),
592                        field_type: column_type.clone(),
593                        nullable: true,
594                        default: None,
595                    };
596                    match position {
597                        Some(pos) if *pos <= new_schema.fields.len() => {
598                            new_schema.fields.insert(*pos, field);
599                        }
600                        _ => {
601                            new_schema.fields.push(field);
602                        }
603                    }
604                }
605                SchemaChange::DropColumn { name } => {
606                    new_schema.fields.retain(|f| f.name != *name);
607                }
608                SchemaChange::RenameColumn { old_name, new_name } => {
609                    for field in &mut new_schema.fields {
610                        if field.name == *old_name {
611                            field.name = new_name.clone();
612                        }
613                    }
614                }
615                SchemaChange::ChangeType { name, new_type, .. } => {
616                    for field in &mut new_schema.fields {
617                        if field.name == *name {
618                            field.field_type = new_type.clone();
619                        }
620                    }
621                }
622                SchemaChange::ReorderColumns { new_order } => {
623                    let mut new_fields = Vec::with_capacity(new_order.len());
624                    for col_name in new_order {
625                        if let Some(field) = new_schema.fields.iter().find(|f| &f.name == col_name)
626                        {
627                            new_fields.push(field.clone());
628                        }
629                    }
630                    new_schema.fields = new_fields;
631                }
632            }
633        }
634
635        Ok(new_schema)
636    }
637
638    /// Migrate a versioned row to current schema
639    pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
640        self.registry.migrate_row(name, row)
641    }
642
643    /// Schedule background migration for old rows
644    pub fn schedule_background_migration(
645        &mut self,
646        name: impl Into<String>,
647        from_version: SchemaVersion,
648        row_count: usize,
649    ) {
650        self.pending_migrations
651            .push((name.into(), from_version, row_count));
652    }
653
654    /// Get pending background migrations
655    pub fn pending_migrations(&self) -> &[(String, SchemaVersion, usize)] {
656        &self.pending_migrations
657    }
658
659    /// Access the underlying registry
660    pub fn registry(&self) -> &SchemaRegistry {
661        &self.registry
662    }
663
664    /// Access the underlying registry mutably
665    pub fn registry_mut(&mut self) -> &mut SchemaRegistry {
666        &mut self.registry
667    }
668
669    /// Get evolution statistics
670    pub fn stats(&self) -> Arc<EvolutionStats> {
671        self.registry.stats()
672    }
673}
674
675impl Default for SchemaEvolutionManager {
676    fn default() -> Self {
677        Self::new()
678    }
679}
680
681#[cfg(test)]
682mod tests {
683    use super::*;
684
685    fn create_test_schema() -> SochSchema {
686        SochSchema::new("users")
687            .field("id", SochType::UInt)
688            .field("name", SochType::Text)
689    }
690
691    #[test]
692    fn test_schema_registration() {
693        let mut registry = SchemaRegistry::new();
694        let schema = create_test_schema();
695
696        registry.register_schema(schema, 1);
697
698        assert_eq!(registry.current_version("users"), Some(1));
699        assert!(registry.get_schema("users", 1).is_some());
700    }
701
702    #[test]
703    fn test_add_column_migration() {
704        let mut registry = SchemaRegistry::new();
705
706        // Register v1 schema
707        let schema_v1 = create_test_schema();
708        registry.register_schema(schema_v1, 1);
709
710        // Register v2 schema with email column
711        let schema_v2 = SochSchema::new("users")
712            .field("id", SochType::UInt)
713            .field("name", SochType::Text)
714            .field("email", SochType::Text);
715        registry.register_schema(schema_v2, 2);
716
717        // Register migration
718        let migration =
719            Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
720        registry.register_migration("users", migration).unwrap();
721
722        // Create v1 row
723        let row_v1 = VersionedRow::new(
724            1,
725            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
726        );
727
728        // Migrate to v2
729        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
730
731        assert_eq!(row_v2.version, 2);
732        assert_eq!(row_v2.data.values.len(), 3);
733        assert_eq!(row_v2.data.values[2], SochValue::Text("".into()));
734    }
735
736    #[test]
737    fn test_drop_column_migration() {
738        let mut registry = SchemaRegistry::new();
739
740        // Register v1 schema with legacy field
741        let schema_v1 = SochSchema::new("users")
742            .field("id", SochType::UInt)
743            .field("name", SochType::Text)
744            .field("legacy", SochType::Text);
745        registry.register_schema(schema_v1, 1);
746
747        // Register v2 schema without legacy
748        let schema_v2 = SochSchema::new("users")
749            .field("id", SochType::UInt)
750            .field("name", SochType::Text);
751        registry.register_schema(schema_v2, 2);
752
753        // Register migration
754        let migration = Migration::new(1, 2).drop_column("legacy");
755        registry.register_migration("users", migration).unwrap();
756
757        // Create v1 row with legacy field
758        let row_v1 = VersionedRow::new(
759            1,
760            SochRow::new(vec![
761                SochValue::UInt(1),
762                SochValue::Text("Alice".into()),
763                SochValue::Text("old_data".into()),
764            ]),
765        );
766
767        // Migrate to v2
768        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
769
770        assert_eq!(row_v2.version, 2);
771        assert_eq!(row_v2.data.values.len(), 2);
772    }
773
774    #[test]
775    fn test_rename_column_migration() {
776        let mut registry = SchemaRegistry::new();
777
778        let schema_v1 = SochSchema::new("users")
779            .field("id", SochType::UInt)
780            .field("name", SochType::Text);
781        registry.register_schema(schema_v1, 1);
782
783        let schema_v2 = SochSchema::new("users")
784            .field("id", SochType::UInt)
785            .field("full_name", SochType::Text);
786        registry.register_schema(schema_v2, 2);
787
788        let migration = Migration::new(1, 2).rename_column("name", "full_name");
789        registry.register_migration("users", migration).unwrap();
790
791        let row_v1 = VersionedRow::new(
792            1,
793            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
794        );
795
796        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
797
798        assert_eq!(row_v2.version, 2);
799        assert_eq!(row_v2.data.values.len(), 2);
800        assert_eq!(row_v2.data.values[1], SochValue::Text("Alice".into()));
801    }
802
803    #[test]
804    fn test_type_conversion_migration() {
805        let mut registry = SchemaRegistry::new();
806
807        let schema_v1 = SochSchema::new("products")
808            .field("id", SochType::UInt)
809            .field("price", SochType::Int);
810        registry.register_schema(schema_v1, 1);
811
812        let schema_v2 = SochSchema::new("products")
813            .field("id", SochType::UInt)
814            .field("price", SochType::Float);
815        registry.register_schema(schema_v2, 2);
816
817        let migration =
818            Migration::new(1, 2).change_type("price", SochType::Float, TypeConverter::IntToFloat);
819        registry.register_migration("products", migration).unwrap();
820
821        let row_v1 = VersionedRow::new(
822            1,
823            SochRow::new(vec![SochValue::UInt(1), SochValue::Int(100)]),
824        );
825
826        let row_v2 = registry.migrate_row("products", row_v1).unwrap();
827
828        assert_eq!(row_v2.version, 2);
829        assert_eq!(row_v2.data.values[1], SochValue::Float(100.0));
830    }
831
832    #[test]
833    fn test_multi_version_migration_chain() {
834        let mut registry = SchemaRegistry::new();
835
836        // v1: id, name
837        let schema_v1 = create_test_schema();
838        registry.register_schema(schema_v1, 1);
839
840        // v2: id, name, email
841        let schema_v2 = SochSchema::new("users")
842            .field("id", SochType::UInt)
843            .field("name", SochType::Text)
844            .field("email", SochType::Text);
845        registry.register_schema(schema_v2, 2);
846
847        // v3: id, full_name, email, created_at
848        let schema_v3 = SochSchema::new("users")
849            .field("id", SochType::UInt)
850            .field("full_name", SochType::Text)
851            .field("email", SochType::Text)
852            .field("created_at", SochType::Int);
853        registry.register_schema(schema_v3, 3);
854
855        // v1 -> v2: add email
856        let migration_1_2 =
857            Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
858        registry.register_migration("users", migration_1_2).unwrap();
859
860        // v2 -> v3: rename name -> full_name, add created_at
861        let migration_2_3 = Migration::new(2, 3)
862            .rename_column("name", "full_name")
863            .add_column("created_at", SochType::Int, SochValue::Int(0));
864        registry.register_migration("users", migration_2_3).unwrap();
865
866        // Migrate from v1 to v3
867        let row_v1 = VersionedRow::new(
868            1,
869            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
870        );
871
872        let row_v3 = registry.migrate_row("users", row_v1).unwrap();
873
874        assert_eq!(row_v3.version, 3);
875        assert_eq!(row_v3.data.values.len(), 4);
876        assert_eq!(row_v3.data.values[0], SochValue::UInt(1));
877        assert_eq!(row_v3.data.values[1], SochValue::Text("Alice".into()));
878        assert_eq!(row_v3.data.values[2], SochValue::Text("".into()));
879        assert_eq!(row_v3.data.values[3], SochValue::Int(0));
880    }
881
882    #[test]
883    fn test_evolve_schema() {
884        let mut manager = SchemaEvolutionManager::new();
885
886        // Register initial schema
887        let schema = create_test_schema();
888        manager.registry_mut().register_schema(schema, 1);
889
890        // Evolve to add email
891        let changes = vec![SchemaChange::AddColumn {
892            name: "email".to_string(),
893            column_type: SochType::Text,
894            default: SochValue::Text("".into()),
895            position: None,
896        }];
897
898        let new_version = manager.evolve_schema("users", changes).unwrap();
899
900        assert_eq!(new_version, 2);
901        assert_eq!(manager.registry().current_version("users"), Some(2));
902
903        let current = manager.registry().current_schema("users").unwrap();
904        assert_eq!(current.fields.len(), 3);
905    }
906
907    #[test]
908    fn test_no_migration_needed_for_current_version() {
909        let mut registry = SchemaRegistry::new();
910        let schema = create_test_schema();
911        registry.register_schema(schema, 1);
912
913        let row = VersionedRow::new(
914            1,
915            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
916        );
917
918        let result = registry.migrate_row("users", row.clone()).unwrap();
919
920        assert_eq!(result.version, row.version);
921        assert_eq!(result.data.values, row.data.values);
922    }
923
924    #[test]
925    fn test_stats_tracking() {
926        let mut registry = SchemaRegistry::new();
927
928        let schema_v1 = create_test_schema();
929        registry.register_schema(schema_v1, 1);
930
931        let schema_v2 = SochSchema::new("users")
932            .field("id", SochType::UInt)
933            .field("name", SochType::Text)
934            .field("email", SochType::Text);
935        registry.register_schema(schema_v2, 2);
936
937        let migration =
938            Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
939        registry.register_migration("users", migration).unwrap();
940
941        // Migrate a row
942        let row = VersionedRow::new(
943            1,
944            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
945        );
946        registry.migrate_row("users", row).unwrap();
947
948        let stats = registry.stats().snapshot();
949        assert_eq!(stats.rows_migrated, 1);
950        assert_eq!(stats.migrations_applied, 1);
951        assert_eq!(stats.lazy_migrations, 1);
952    }
953
954    #[test]
955    fn test_error_on_future_version() {
956        let mut registry = SchemaRegistry::new();
957        let schema = create_test_schema();
958        registry.register_schema(schema, 1);
959
960        let row = VersionedRow::new(
961            99,
962            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
963        );
964
965        let result = registry.migrate_row("users", row);
966        assert!(result.is_err());
967    }
968
969    #[test]
970    fn test_custom_type_converter() {
971        let mut registry = SchemaRegistry::new();
972
973        // Register custom converter that uppercases text
974        let converter_idx = registry.register_converter(|v| {
975            if let SochValue::Text(s) = v {
976                SochValue::Text(s.to_uppercase())
977            } else {
978                v.clone()
979            }
980        });
981
982        let schema_v1 = SochSchema::new("users")
983            .field("id", SochType::UInt)
984            .field("name", SochType::Text);
985        registry.register_schema(schema_v1, 1);
986
987        let schema_v2 = SochSchema::new("users")
988            .field("id", SochType::UInt)
989            .field("name", SochType::Text); // Same type, different format
990        registry.register_schema(schema_v2, 2);
991
992        let migration = Migration::new(1, 2).change_type(
993            "name",
994            SochType::Text,
995            TypeConverter::Custom(converter_idx),
996        );
997        registry.register_migration("users", migration).unwrap();
998
999        let row_v1 = VersionedRow::new(
1000            1,
1001            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("alice".into())]),
1002        );
1003
1004        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
1005        assert_eq!(row_v2.data.values[1], SochValue::Text("ALICE".into()));
1006    }
1007}