Skip to main content

sochdb_core/
schema_evolution.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Schema Evolution - Online Schema Changes
19//!
20//! This module provides backwards-compatible schema evolution for SochDB,
21//! allowing schema changes without full table rewrites through:
22//!
23//! - **Schema Versioning**: Each schema has a monotonic version number
24//! - **Migration Registry**: Registered transformations between versions
25//! - **Lazy Migration**: Rows are migrated on-read when version mismatch detected
26//! - **Background Compaction**: Asynchronous migration during idle time
27//!
28//! # Design
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────┐
32//! │                    Schema Version Graph                      │
33//! │                                                             │
34//! │  v1 ──────────────────────────────────────────────────────→ │
35//! │   ↓                                                         │
36//! │  v2 (add column "email")                                    │
37//! │   ↓                                                         │
38//! │  v3 (rename "name" → "full_name", add "created_at")        │
39//! │   ↓                                                         │
40//! │  v4 (drop "legacy_field")                                   │
41//! └─────────────────────────────────────────────────────────────┘
42//!
43//! Rows carry their schema version. On read:
44//! 1. Check row version vs current schema version
45//! 2. If mismatch, apply migration chain (v_row → v_current)
46//! 3. Return migrated row (optionally rewrite in background)
47//! ```
48
49use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicU64, Ordering};
52
53use crate::error::{Result, SochDBError};
54use crate::soch::{SochRow, SochSchema, SochType, SochValue};
55
56/// Schema version identifier
57pub type SchemaVersion = u64;
58
59/// Unique schema identifier (table/collection name + version)
60#[derive(Debug, Clone, PartialEq, Eq, Hash)]
61pub struct SchemaId {
62    pub name: String,
63    pub version: SchemaVersion,
64}
65
66impl SchemaId {
67    pub fn new(name: impl Into<String>, version: SchemaVersion) -> Self {
68        Self {
69            name: name.into(),
70            version,
71        }
72    }
73}
74
75/// Describes a single schema change operation
76#[derive(Debug, Clone)]
77pub enum SchemaChange {
78    /// Add a new column with default value
79    AddColumn {
80        name: String,
81        column_type: SochType,
82        default: SochValue,
83        position: Option<usize>, // None = append at end
84    },
85    /// Drop an existing column
86    DropColumn { name: String },
87    /// Rename a column
88    RenameColumn { old_name: String, new_name: String },
89    /// Change column type with conversion function
90    ChangeType {
91        name: String,
92        new_type: SochType,
93        converter: TypeConverter,
94    },
95    /// Reorder columns
96    ReorderColumns { new_order: Vec<String> },
97}
98
99/// Type conversion function for schema evolution
100#[derive(Debug, Clone)]
101pub enum TypeConverter {
102    /// Identity (no conversion needed, types are compatible)
103    Identity,
104    /// Convert integer to text
105    IntToText,
106    /// Convert text to integer (may fail)
107    TextToInt,
108    /// Convert float to integer (truncate)
109    FloatToInt,
110    /// Convert integer to float
111    IntToFloat,
112    /// Custom conversion with closure index (stored in registry)
113    Custom(usize),
114}
115
116impl TypeConverter {
117    /// Apply the type conversion
118    pub fn convert(
119        &self,
120        value: &SochValue,
121        custom_converters: &[fn(&SochValue) -> SochValue],
122    ) -> Result<SochValue> {
123        match self {
124            TypeConverter::Identity => Ok(value.clone()),
125            TypeConverter::IntToText => match value {
126                SochValue::Int(i) => Ok(SochValue::Text(i.to_string())),
127                SochValue::UInt(u) => Ok(SochValue::Text(u.to_string())),
128                _ => Err(SochDBError::SchemaEvolution(format!(
129                    "Cannot convert {:?} to text via IntToText",
130                    value
131                ))),
132            },
133            TypeConverter::TextToInt => match value {
134                SochValue::Text(s) => s.parse::<i64>().map(SochValue::Int).map_err(|_| {
135                    SochDBError::SchemaEvolution(format!("Cannot parse '{}' as integer", s))
136                }),
137                _ => Err(SochDBError::SchemaEvolution(format!(
138                    "Cannot convert {:?} to int via TextToInt",
139                    value
140                ))),
141            },
142            TypeConverter::FloatToInt => match value {
143                SochValue::Float(f) => Ok(SochValue::Int(*f as i64)),
144                _ => Err(SochDBError::SchemaEvolution(format!(
145                    "Cannot convert {:?} to int via FloatToInt",
146                    value
147                ))),
148            },
149            TypeConverter::IntToFloat => match value {
150                SochValue::Int(i) => Ok(SochValue::Float(*i as f64)),
151                SochValue::UInt(u) => Ok(SochValue::Float(*u as f64)),
152                _ => Err(SochDBError::SchemaEvolution(format!(
153                    "Cannot convert {:?} to float via IntToFloat",
154                    value
155                ))),
156            },
157            TypeConverter::Custom(idx) => {
158                if *idx < custom_converters.len() {
159                    Ok(custom_converters[*idx](value))
160                } else {
161                    Err(SochDBError::SchemaEvolution(format!(
162                        "Custom converter index {} out of bounds",
163                        idx
164                    )))
165                }
166            }
167        }
168    }
169}
170
171/// Migration between two schema versions
172#[derive(Debug, Clone)]
173pub struct Migration {
174    pub from_version: SchemaVersion,
175    pub to_version: SchemaVersion,
176    pub changes: Vec<SchemaChange>,
177}
178
179impl Migration {
180    pub fn new(from: SchemaVersion, to: SchemaVersion) -> Self {
181        Self {
182            from_version: from,
183            to_version: to,
184            changes: Vec::new(),
185        }
186    }
187
188    pub fn add_column(
189        mut self,
190        name: impl Into<String>,
191        column_type: SochType,
192        default: SochValue,
193    ) -> Self {
194        self.changes.push(SchemaChange::AddColumn {
195            name: name.into(),
196            column_type,
197            default,
198            position: None,
199        });
200        self
201    }
202
203    pub fn add_column_at(
204        mut self,
205        name: impl Into<String>,
206        column_type: SochType,
207        default: SochValue,
208        position: usize,
209    ) -> Self {
210        self.changes.push(SchemaChange::AddColumn {
211            name: name.into(),
212            column_type,
213            default,
214            position: Some(position),
215        });
216        self
217    }
218
219    pub fn drop_column(mut self, name: impl Into<String>) -> Self {
220        self.changes
221            .push(SchemaChange::DropColumn { name: name.into() });
222        self
223    }
224
225    pub fn rename_column(
226        mut self,
227        old_name: impl Into<String>,
228        new_name: impl Into<String>,
229    ) -> Self {
230        self.changes.push(SchemaChange::RenameColumn {
231            old_name: old_name.into(),
232            new_name: new_name.into(),
233        });
234        self
235    }
236
237    pub fn change_type(
238        mut self,
239        name: impl Into<String>,
240        new_type: SochType,
241        converter: TypeConverter,
242    ) -> Self {
243        self.changes.push(SchemaChange::ChangeType {
244            name: name.into(),
245            new_type,
246            converter,
247        });
248        self
249    }
250}
251
252/// Row with embedded schema version for lazy migration
253#[derive(Debug, Clone)]
254pub struct VersionedRow {
255    pub version: SchemaVersion,
256    pub data: SochRow,
257}
258
259impl VersionedRow {
260    pub fn new(version: SchemaVersion, data: SochRow) -> Self {
261        Self { version, data }
262    }
263}
264
265/// Statistics for schema evolution operations
266#[derive(Debug, Default)]
267pub struct EvolutionStats {
268    pub rows_migrated: AtomicU64,
269    pub migrations_applied: AtomicU64,
270    pub migration_errors: AtomicU64,
271    pub lazy_migrations: AtomicU64,
272    pub background_migrations: AtomicU64,
273}
274
275impl EvolutionStats {
276    pub fn record_lazy_migration(&self) {
277        self.lazy_migrations.fetch_add(1, Ordering::Relaxed);
278        self.rows_migrated.fetch_add(1, Ordering::Relaxed);
279    }
280
281    pub fn record_background_migration(&self, count: u64) {
282        self.background_migrations
283            .fetch_add(count, Ordering::Relaxed);
284        self.rows_migrated.fetch_add(count, Ordering::Relaxed);
285    }
286
287    pub fn record_migration_applied(&self) {
288        self.migrations_applied.fetch_add(1, Ordering::Relaxed);
289    }
290
291    pub fn record_error(&self) {
292        self.migration_errors.fetch_add(1, Ordering::Relaxed);
293    }
294
295    pub fn snapshot(&self) -> EvolutionStatsSnapshot {
296        EvolutionStatsSnapshot {
297            rows_migrated: self.rows_migrated.load(Ordering::Relaxed),
298            migrations_applied: self.migrations_applied.load(Ordering::Relaxed),
299            migration_errors: self.migration_errors.load(Ordering::Relaxed),
300            lazy_migrations: self.lazy_migrations.load(Ordering::Relaxed),
301            background_migrations: self.background_migrations.load(Ordering::Relaxed),
302        }
303    }
304}
305
306#[derive(Debug, Clone)]
307pub struct EvolutionStatsSnapshot {
308    pub rows_migrated: u64,
309    pub migrations_applied: u64,
310    pub migration_errors: u64,
311    pub lazy_migrations: u64,
312    pub background_migrations: u64,
313}
314
315/// Schema version registry with migration graph
316pub struct SchemaRegistry {
317    /// Schemas by (name, version)
318    schemas: HashMap<SchemaId, SochSchema>,
319    /// Current version for each schema name
320    current_versions: HashMap<String, SchemaVersion>,
321    /// Migrations indexed by (name, from_version)
322    migrations: HashMap<(String, SchemaVersion), Migration>,
323    /// Custom type converters
324    custom_converters: Vec<fn(&SochValue) -> SochValue>,
325    /// Statistics
326    stats: Arc<EvolutionStats>,
327}
328
329impl SchemaRegistry {
330    pub fn new() -> Self {
331        Self {
332            schemas: HashMap::new(),
333            current_versions: HashMap::new(),
334            migrations: HashMap::new(),
335            custom_converters: Vec::new(),
336            stats: Arc::new(EvolutionStats::default()),
337        }
338    }
339
340    /// Register initial schema version
341    pub fn register_schema(&mut self, schema: SochSchema, version: SchemaVersion) {
342        let name = schema.name.clone();
343        let id = SchemaId::new(&name, version);
344        self.schemas.insert(id, schema);
345
346        // Update current version if this is newer
347        let current = self.current_versions.entry(name).or_insert(0);
348        if version > *current {
349            *current = version;
350        }
351    }
352
353    /// Register a migration between versions
354    pub fn register_migration(
355        &mut self,
356        name: impl Into<String>,
357        migration: Migration,
358    ) -> Result<()> {
359        let name = name.into();
360
361        // Validate versions exist or will exist
362        let key = (name.clone(), migration.from_version);
363
364        if self.migrations.contains_key(&key) {
365            return Err(SochDBError::SchemaEvolution(format!(
366                "Migration from version {} already exists for {}",
367                migration.from_version, name
368            )));
369        }
370
371        self.migrations.insert(key, migration);
372        Ok(())
373    }
374
375    /// Register a custom type converter
376    pub fn register_converter(&mut self, converter: fn(&SochValue) -> SochValue) -> usize {
377        let idx = self.custom_converters.len();
378        self.custom_converters.push(converter);
379        idx
380    }
381
382    /// Get current schema version for a name
383    pub fn current_version(&self, name: &str) -> Option<SchemaVersion> {
384        self.current_versions.get(name).copied()
385    }
386
387    /// Get schema by name and version
388    pub fn get_schema(&self, name: &str, version: SchemaVersion) -> Option<&SochSchema> {
389        self.schemas.get(&SchemaId::new(name, version))
390    }
391
392    /// Get current schema by name
393    pub fn current_schema(&self, name: &str) -> Option<&SochSchema> {
394        self.current_version(name)
395            .and_then(|v| self.get_schema(name, v))
396    }
397
398    /// Migrate a row from old version to current version
399    pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
400        let current_version = self.current_version(name).ok_or_else(|| {
401            SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
402        })?;
403
404        if row.version == current_version {
405            return Ok(row);
406        }
407
408        if row.version > current_version {
409            return Err(SochDBError::SchemaEvolution(format!(
410                "Row version {} is newer than current schema version {}",
411                row.version, current_version
412            )));
413        }
414
415        // Build migration chain
416        let mut current_row = row.data;
417        let mut version = row.version;
418
419        while version < current_version {
420            let migration = self
421                .migrations
422                .get(&(name.to_string(), version))
423                .ok_or_else(|| {
424                    SochDBError::SchemaEvolution(format!(
425                        "No migration path from version {} for '{}'",
426                        version, name
427                    ))
428                })?;
429
430            current_row = self.apply_migration(&current_row, migration, name)?;
431            version = migration.to_version;
432            self.stats.record_migration_applied();
433        }
434
435        self.stats.record_lazy_migration();
436
437        Ok(VersionedRow::new(current_version, current_row))
438    }
439
440    /// Apply a single migration to a row
441    fn apply_migration(
442        &self,
443        row: &SochRow,
444        migration: &Migration,
445        schema_name: &str,
446    ) -> Result<SochRow> {
447        // Get source schema to understand column positions
448        let source_schema = self
449            .get_schema(schema_name, migration.from_version)
450            .ok_or_else(|| {
451                SochDBError::SchemaEvolution(format!(
452                    "Source schema version {} not found",
453                    migration.from_version
454                ))
455            })?;
456
457        let mut values = row.values.clone();
458        let mut column_names: Vec<String> = source_schema
459            .fields
460            .iter()
461            .map(|f| f.name.clone())
462            .collect();
463
464        for change in &migration.changes {
465            match change {
466                SchemaChange::AddColumn {
467                    name,
468                    default,
469                    position,
470                    ..
471                } => match position {
472                    Some(pos) if *pos <= values.len() => {
473                        values.insert(*pos, default.clone());
474                        column_names.insert(*pos, name.clone());
475                    }
476                    _ => {
477                        values.push(default.clone());
478                        column_names.push(name.clone());
479                    }
480                },
481                SchemaChange::DropColumn { name } => {
482                    if let Some(idx) = column_names.iter().position(|n| n == name) {
483                        values.remove(idx);
484                        column_names.remove(idx);
485                    }
486                }
487                SchemaChange::RenameColumn { old_name, new_name } => {
488                    if let Some(idx) = column_names.iter().position(|n| n == old_name) {
489                        column_names[idx] = new_name.clone();
490                    }
491                }
492                SchemaChange::ChangeType {
493                    name, converter, ..
494                } => {
495                    if let Some(idx) = column_names.iter().position(|n| n == name) {
496                        values[idx] = converter.convert(&values[idx], &self.custom_converters)?;
497                    }
498                }
499                SchemaChange::ReorderColumns { new_order } => {
500                    let mut new_values = Vec::with_capacity(new_order.len());
501                    for col_name in new_order {
502                        if let Some(idx) = column_names.iter().position(|n| n == col_name) {
503                            new_values.push(values[idx].clone());
504                        }
505                    }
506                    values = new_values;
507                    column_names = new_order.clone();
508                }
509            }
510        }
511
512        Ok(SochRow::new(values))
513    }
514
515    /// Get evolution statistics
516    pub fn stats(&self) -> Arc<EvolutionStats> {
517        Arc::clone(&self.stats)
518    }
519}
520
521impl Default for SchemaRegistry {
522    fn default() -> Self {
523        Self::new()
524    }
525}
526
527/// Schema evolution manager that wraps a registry with additional features
528pub struct SchemaEvolutionManager {
529    registry: SchemaRegistry,
530    /// Pending background migrations (schema_name, from_version, row_count)
531    pending_migrations: Vec<(String, SchemaVersion, usize)>,
532}
533
534impl SchemaEvolutionManager {
535    pub fn new() -> Self {
536        Self {
537            registry: SchemaRegistry::new(),
538            pending_migrations: Vec::new(),
539        }
540    }
541
542    /// Create a new schema version with changes from current
543    pub fn evolve_schema(
544        &mut self,
545        name: &str,
546        changes: Vec<SchemaChange>,
547    ) -> Result<SchemaVersion> {
548        let current_version = self.registry.current_version(name).ok_or_else(|| {
549            SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
550        })?;
551
552        let current_schema = self
553            .registry
554            .current_schema(name)
555            .ok_or_else(|| {
556                SochDBError::SchemaEvolution(format!("Current schema not found for '{}'", name))
557            })?
558            .clone();
559
560        // Build new schema by applying changes
561        let new_version = current_version + 1;
562        let new_schema = self.apply_schema_changes(&current_schema, &changes)?;
563
564        // Register new schema and migration
565        self.registry.register_schema(new_schema, new_version);
566
567        let migration = Migration {
568            from_version: current_version,
569            to_version: new_version,
570            changes,
571        };
572        self.registry.register_migration(name, migration)?;
573
574        Ok(new_version)
575    }
576
577    /// Apply schema changes to create new schema definition
578    fn apply_schema_changes(
579        &self,
580        schema: &SochSchema,
581        changes: &[SchemaChange],
582    ) -> Result<SochSchema> {
583        let mut new_schema = schema.clone();
584
585        for change in changes {
586            match change {
587                SchemaChange::AddColumn {
588                    name,
589                    column_type,
590                    position,
591                    ..
592                } => {
593                    let field = crate::soch::SochField {
594                        name: name.clone(),
595                        field_type: column_type.clone(),
596                        nullable: true,
597                        default: None,
598                    };
599                    match position {
600                        Some(pos) if *pos <= new_schema.fields.len() => {
601                            new_schema.fields.insert(*pos, field);
602                        }
603                        _ => {
604                            new_schema.fields.push(field);
605                        }
606                    }
607                }
608                SchemaChange::DropColumn { name } => {
609                    new_schema.fields.retain(|f| f.name != *name);
610                }
611                SchemaChange::RenameColumn { old_name, new_name } => {
612                    for field in &mut new_schema.fields {
613                        if field.name == *old_name {
614                            field.name = new_name.clone();
615                        }
616                    }
617                }
618                SchemaChange::ChangeType { name, new_type, .. } => {
619                    for field in &mut new_schema.fields {
620                        if field.name == *name {
621                            field.field_type = new_type.clone();
622                        }
623                    }
624                }
625                SchemaChange::ReorderColumns { new_order } => {
626                    let mut new_fields = Vec::with_capacity(new_order.len());
627                    for col_name in new_order {
628                        if let Some(field) = new_schema.fields.iter().find(|f| &f.name == col_name)
629                        {
630                            new_fields.push(field.clone());
631                        }
632                    }
633                    new_schema.fields = new_fields;
634                }
635            }
636        }
637
638        Ok(new_schema)
639    }
640
641    /// Migrate a versioned row to current schema
642    pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
643        self.registry.migrate_row(name, row)
644    }
645
646    /// Schedule background migration for old rows
647    pub fn schedule_background_migration(
648        &mut self,
649        name: impl Into<String>,
650        from_version: SchemaVersion,
651        row_count: usize,
652    ) {
653        self.pending_migrations
654            .push((name.into(), from_version, row_count));
655    }
656
657    /// Get pending background migrations
658    pub fn pending_migrations(&self) -> &[(String, SchemaVersion, usize)] {
659        &self.pending_migrations
660    }
661
662    /// Access the underlying registry
663    pub fn registry(&self) -> &SchemaRegistry {
664        &self.registry
665    }
666
667    /// Access the underlying registry mutably
668    pub fn registry_mut(&mut self) -> &mut SchemaRegistry {
669        &mut self.registry
670    }
671
672    /// Get evolution statistics
673    pub fn stats(&self) -> Arc<EvolutionStats> {
674        self.registry.stats()
675    }
676}
677
678impl Default for SchemaEvolutionManager {
679    fn default() -> Self {
680        Self::new()
681    }
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    fn create_test_schema() -> SochSchema {
689        SochSchema::new("users")
690            .field("id", SochType::UInt)
691            .field("name", SochType::Text)
692    }
693
694    #[test]
695    fn test_schema_registration() {
696        let mut registry = SchemaRegistry::new();
697        let schema = create_test_schema();
698
699        registry.register_schema(schema, 1);
700
701        assert_eq!(registry.current_version("users"), Some(1));
702        assert!(registry.get_schema("users", 1).is_some());
703    }
704
705    #[test]
706    fn test_add_column_migration() {
707        let mut registry = SchemaRegistry::new();
708
709        // Register v1 schema
710        let schema_v1 = create_test_schema();
711        registry.register_schema(schema_v1, 1);
712
713        // Register v2 schema with email column
714        let schema_v2 = SochSchema::new("users")
715            .field("id", SochType::UInt)
716            .field("name", SochType::Text)
717            .field("email", SochType::Text);
718        registry.register_schema(schema_v2, 2);
719
720        // Register migration
721        let migration =
722            Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
723        registry.register_migration("users", migration).unwrap();
724
725        // Create v1 row
726        let row_v1 = VersionedRow::new(
727            1,
728            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
729        );
730
731        // Migrate to v2
732        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
733
734        assert_eq!(row_v2.version, 2);
735        assert_eq!(row_v2.data.values.len(), 3);
736        assert_eq!(row_v2.data.values[2], SochValue::Text("".into()));
737    }
738
739    #[test]
740    fn test_drop_column_migration() {
741        let mut registry = SchemaRegistry::new();
742
743        // Register v1 schema with legacy field
744        let schema_v1 = SochSchema::new("users")
745            .field("id", SochType::UInt)
746            .field("name", SochType::Text)
747            .field("legacy", SochType::Text);
748        registry.register_schema(schema_v1, 1);
749
750        // Register v2 schema without legacy
751        let schema_v2 = SochSchema::new("users")
752            .field("id", SochType::UInt)
753            .field("name", SochType::Text);
754        registry.register_schema(schema_v2, 2);
755
756        // Register migration
757        let migration = Migration::new(1, 2).drop_column("legacy");
758        registry.register_migration("users", migration).unwrap();
759
760        // Create v1 row with legacy field
761        let row_v1 = VersionedRow::new(
762            1,
763            SochRow::new(vec![
764                SochValue::UInt(1),
765                SochValue::Text("Alice".into()),
766                SochValue::Text("old_data".into()),
767            ]),
768        );
769
770        // Migrate to v2
771        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
772
773        assert_eq!(row_v2.version, 2);
774        assert_eq!(row_v2.data.values.len(), 2);
775    }
776
777    #[test]
778    fn test_rename_column_migration() {
779        let mut registry = SchemaRegistry::new();
780
781        let schema_v1 = SochSchema::new("users")
782            .field("id", SochType::UInt)
783            .field("name", SochType::Text);
784        registry.register_schema(schema_v1, 1);
785
786        let schema_v2 = SochSchema::new("users")
787            .field("id", SochType::UInt)
788            .field("full_name", SochType::Text);
789        registry.register_schema(schema_v2, 2);
790
791        let migration = Migration::new(1, 2).rename_column("name", "full_name");
792        registry.register_migration("users", migration).unwrap();
793
794        let row_v1 = VersionedRow::new(
795            1,
796            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
797        );
798
799        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
800
801        assert_eq!(row_v2.version, 2);
802        assert_eq!(row_v2.data.values.len(), 2);
803        assert_eq!(row_v2.data.values[1], SochValue::Text("Alice".into()));
804    }
805
806    #[test]
807    fn test_type_conversion_migration() {
808        let mut registry = SchemaRegistry::new();
809
810        let schema_v1 = SochSchema::new("products")
811            .field("id", SochType::UInt)
812            .field("price", SochType::Int);
813        registry.register_schema(schema_v1, 1);
814
815        let schema_v2 = SochSchema::new("products")
816            .field("id", SochType::UInt)
817            .field("price", SochType::Float);
818        registry.register_schema(schema_v2, 2);
819
820        let migration =
821            Migration::new(1, 2).change_type("price", SochType::Float, TypeConverter::IntToFloat);
822        registry.register_migration("products", migration).unwrap();
823
824        let row_v1 = VersionedRow::new(
825            1,
826            SochRow::new(vec![SochValue::UInt(1), SochValue::Int(100)]),
827        );
828
829        let row_v2 = registry.migrate_row("products", row_v1).unwrap();
830
831        assert_eq!(row_v2.version, 2);
832        assert_eq!(row_v2.data.values[1], SochValue::Float(100.0));
833    }
834
835    #[test]
836    fn test_multi_version_migration_chain() {
837        let mut registry = SchemaRegistry::new();
838
839        // v1: id, name
840        let schema_v1 = create_test_schema();
841        registry.register_schema(schema_v1, 1);
842
843        // v2: id, name, email
844        let schema_v2 = SochSchema::new("users")
845            .field("id", SochType::UInt)
846            .field("name", SochType::Text)
847            .field("email", SochType::Text);
848        registry.register_schema(schema_v2, 2);
849
850        // v3: id, full_name, email, created_at
851        let schema_v3 = SochSchema::new("users")
852            .field("id", SochType::UInt)
853            .field("full_name", SochType::Text)
854            .field("email", SochType::Text)
855            .field("created_at", SochType::Int);
856        registry.register_schema(schema_v3, 3);
857
858        // v1 -> v2: add email
859        let migration_1_2 =
860            Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
861        registry.register_migration("users", migration_1_2).unwrap();
862
863        // v2 -> v3: rename name -> full_name, add created_at
864        let migration_2_3 = Migration::new(2, 3)
865            .rename_column("name", "full_name")
866            .add_column("created_at", SochType::Int, SochValue::Int(0));
867        registry.register_migration("users", migration_2_3).unwrap();
868
869        // Migrate from v1 to v3
870        let row_v1 = VersionedRow::new(
871            1,
872            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
873        );
874
875        let row_v3 = registry.migrate_row("users", row_v1).unwrap();
876
877        assert_eq!(row_v3.version, 3);
878        assert_eq!(row_v3.data.values.len(), 4);
879        assert_eq!(row_v3.data.values[0], SochValue::UInt(1));
880        assert_eq!(row_v3.data.values[1], SochValue::Text("Alice".into()));
881        assert_eq!(row_v3.data.values[2], SochValue::Text("".into()));
882        assert_eq!(row_v3.data.values[3], SochValue::Int(0));
883    }
884
885    #[test]
886    fn test_evolve_schema() {
887        let mut manager = SchemaEvolutionManager::new();
888
889        // Register initial schema
890        let schema = create_test_schema();
891        manager.registry_mut().register_schema(schema, 1);
892
893        // Evolve to add email
894        let changes = vec![SchemaChange::AddColumn {
895            name: "email".to_string(),
896            column_type: SochType::Text,
897            default: SochValue::Text("".into()),
898            position: None,
899        }];
900
901        let new_version = manager.evolve_schema("users", changes).unwrap();
902
903        assert_eq!(new_version, 2);
904        assert_eq!(manager.registry().current_version("users"), Some(2));
905
906        let current = manager.registry().current_schema("users").unwrap();
907        assert_eq!(current.fields.len(), 3);
908    }
909
910    #[test]
911    fn test_no_migration_needed_for_current_version() {
912        let mut registry = SchemaRegistry::new();
913        let schema = create_test_schema();
914        registry.register_schema(schema, 1);
915
916        let row = VersionedRow::new(
917            1,
918            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
919        );
920
921        let result = registry.migrate_row("users", row.clone()).unwrap();
922
923        assert_eq!(result.version, row.version);
924        assert_eq!(result.data.values, row.data.values);
925    }
926
927    #[test]
928    fn test_stats_tracking() {
929        let mut registry = SchemaRegistry::new();
930
931        let schema_v1 = create_test_schema();
932        registry.register_schema(schema_v1, 1);
933
934        let schema_v2 = SochSchema::new("users")
935            .field("id", SochType::UInt)
936            .field("name", SochType::Text)
937            .field("email", SochType::Text);
938        registry.register_schema(schema_v2, 2);
939
940        let migration =
941            Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
942        registry.register_migration("users", migration).unwrap();
943
944        // Migrate a row
945        let row = VersionedRow::new(
946            1,
947            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
948        );
949        registry.migrate_row("users", row).unwrap();
950
951        let stats = registry.stats().snapshot();
952        assert_eq!(stats.rows_migrated, 1);
953        assert_eq!(stats.migrations_applied, 1);
954        assert_eq!(stats.lazy_migrations, 1);
955    }
956
957    #[test]
958    fn test_error_on_future_version() {
959        let mut registry = SchemaRegistry::new();
960        let schema = create_test_schema();
961        registry.register_schema(schema, 1);
962
963        let row = VersionedRow::new(
964            99,
965            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
966        );
967
968        let result = registry.migrate_row("users", row);
969        assert!(result.is_err());
970    }
971
972    #[test]
973    fn test_custom_type_converter() {
974        let mut registry = SchemaRegistry::new();
975
976        // Register custom converter that uppercases text
977        let converter_idx = registry.register_converter(|v| {
978            if let SochValue::Text(s) = v {
979                SochValue::Text(s.to_uppercase())
980            } else {
981                v.clone()
982            }
983        });
984
985        let schema_v1 = SochSchema::new("users")
986            .field("id", SochType::UInt)
987            .field("name", SochType::Text);
988        registry.register_schema(schema_v1, 1);
989
990        let schema_v2 = SochSchema::new("users")
991            .field("id", SochType::UInt)
992            .field("name", SochType::Text); // Same type, different format
993        registry.register_schema(schema_v2, 2);
994
995        let migration = Migration::new(1, 2).change_type(
996            "name",
997            SochType::Text,
998            TypeConverter::Custom(converter_idx),
999        );
1000        registry.register_migration("users", migration).unwrap();
1001
1002        let row_v1 = VersionedRow::new(
1003            1,
1004            SochRow::new(vec![SochValue::UInt(1), SochValue::Text("alice".into())]),
1005        );
1006
1007        let row_v2 = registry.migrate_row("users", row_v1).unwrap();
1008        assert_eq!(row_v2.data.values[1], SochValue::Text("ALICE".into()));
1009    }
1010}