1use std::collections::HashMap;
50use std::sync::Arc;
51use std::sync::atomic::{AtomicU64, Ordering};
52
53use crate::error::{Result, SochDBError};
54use crate::soch::{SochRow, SochSchema, SochType, SochValue};
55
56pub type SchemaVersion = u64;
58
59#[derive(Debug, Clone, PartialEq, Eq, Hash)]
61pub struct SchemaId {
62 pub name: String,
63 pub version: SchemaVersion,
64}
65
66impl SchemaId {
67 pub fn new(name: impl Into<String>, version: SchemaVersion) -> Self {
68 Self {
69 name: name.into(),
70 version,
71 }
72 }
73}
74
75#[derive(Debug, Clone)]
77pub enum SchemaChange {
78 AddColumn {
80 name: String,
81 column_type: SochType,
82 default: SochValue,
83 position: Option<usize>, },
85 DropColumn { name: String },
87 RenameColumn { old_name: String, new_name: String },
89 ChangeType {
91 name: String,
92 new_type: SochType,
93 converter: TypeConverter,
94 },
95 ReorderColumns { new_order: Vec<String> },
97}
98
99#[derive(Debug, Clone)]
101pub enum TypeConverter {
102 Identity,
104 IntToText,
106 TextToInt,
108 FloatToInt,
110 IntToFloat,
112 Custom(usize),
114}
115
116impl TypeConverter {
117 pub fn convert(
119 &self,
120 value: &SochValue,
121 custom_converters: &[fn(&SochValue) -> SochValue],
122 ) -> Result<SochValue> {
123 match self {
124 TypeConverter::Identity => Ok(value.clone()),
125 TypeConverter::IntToText => match value {
126 SochValue::Int(i) => Ok(SochValue::Text(i.to_string())),
127 SochValue::UInt(u) => Ok(SochValue::Text(u.to_string())),
128 _ => Err(SochDBError::SchemaEvolution(format!(
129 "Cannot convert {:?} to text via IntToText",
130 value
131 ))),
132 },
133 TypeConverter::TextToInt => match value {
134 SochValue::Text(s) => s.parse::<i64>().map(SochValue::Int).map_err(|_| {
135 SochDBError::SchemaEvolution(format!("Cannot parse '{}' as integer", s))
136 }),
137 _ => Err(SochDBError::SchemaEvolution(format!(
138 "Cannot convert {:?} to int via TextToInt",
139 value
140 ))),
141 },
142 TypeConverter::FloatToInt => match value {
143 SochValue::Float(f) => Ok(SochValue::Int(*f as i64)),
144 _ => Err(SochDBError::SchemaEvolution(format!(
145 "Cannot convert {:?} to int via FloatToInt",
146 value
147 ))),
148 },
149 TypeConverter::IntToFloat => match value {
150 SochValue::Int(i) => Ok(SochValue::Float(*i as f64)),
151 SochValue::UInt(u) => Ok(SochValue::Float(*u as f64)),
152 _ => Err(SochDBError::SchemaEvolution(format!(
153 "Cannot convert {:?} to float via IntToFloat",
154 value
155 ))),
156 },
157 TypeConverter::Custom(idx) => {
158 if *idx < custom_converters.len() {
159 Ok(custom_converters[*idx](value))
160 } else {
161 Err(SochDBError::SchemaEvolution(format!(
162 "Custom converter index {} out of bounds",
163 idx
164 )))
165 }
166 }
167 }
168 }
169}
170
171#[derive(Debug, Clone)]
173pub struct Migration {
174 pub from_version: SchemaVersion,
175 pub to_version: SchemaVersion,
176 pub changes: Vec<SchemaChange>,
177}
178
179impl Migration {
180 pub fn new(from: SchemaVersion, to: SchemaVersion) -> Self {
181 Self {
182 from_version: from,
183 to_version: to,
184 changes: Vec::new(),
185 }
186 }
187
188 pub fn add_column(
189 mut self,
190 name: impl Into<String>,
191 column_type: SochType,
192 default: SochValue,
193 ) -> Self {
194 self.changes.push(SchemaChange::AddColumn {
195 name: name.into(),
196 column_type,
197 default,
198 position: None,
199 });
200 self
201 }
202
203 pub fn add_column_at(
204 mut self,
205 name: impl Into<String>,
206 column_type: SochType,
207 default: SochValue,
208 position: usize,
209 ) -> Self {
210 self.changes.push(SchemaChange::AddColumn {
211 name: name.into(),
212 column_type,
213 default,
214 position: Some(position),
215 });
216 self
217 }
218
219 pub fn drop_column(mut self, name: impl Into<String>) -> Self {
220 self.changes
221 .push(SchemaChange::DropColumn { name: name.into() });
222 self
223 }
224
225 pub fn rename_column(
226 mut self,
227 old_name: impl Into<String>,
228 new_name: impl Into<String>,
229 ) -> Self {
230 self.changes.push(SchemaChange::RenameColumn {
231 old_name: old_name.into(),
232 new_name: new_name.into(),
233 });
234 self
235 }
236
237 pub fn change_type(
238 mut self,
239 name: impl Into<String>,
240 new_type: SochType,
241 converter: TypeConverter,
242 ) -> Self {
243 self.changes.push(SchemaChange::ChangeType {
244 name: name.into(),
245 new_type,
246 converter,
247 });
248 self
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct VersionedRow {
255 pub version: SchemaVersion,
256 pub data: SochRow,
257}
258
259impl VersionedRow {
260 pub fn new(version: SchemaVersion, data: SochRow) -> Self {
261 Self { version, data }
262 }
263}
264
265#[derive(Debug, Default)]
267pub struct EvolutionStats {
268 pub rows_migrated: AtomicU64,
269 pub migrations_applied: AtomicU64,
270 pub migration_errors: AtomicU64,
271 pub lazy_migrations: AtomicU64,
272 pub background_migrations: AtomicU64,
273}
274
275impl EvolutionStats {
276 pub fn record_lazy_migration(&self) {
277 self.lazy_migrations.fetch_add(1, Ordering::Relaxed);
278 self.rows_migrated.fetch_add(1, Ordering::Relaxed);
279 }
280
281 pub fn record_background_migration(&self, count: u64) {
282 self.background_migrations
283 .fetch_add(count, Ordering::Relaxed);
284 self.rows_migrated.fetch_add(count, Ordering::Relaxed);
285 }
286
287 pub fn record_migration_applied(&self) {
288 self.migrations_applied.fetch_add(1, Ordering::Relaxed);
289 }
290
291 pub fn record_error(&self) {
292 self.migration_errors.fetch_add(1, Ordering::Relaxed);
293 }
294
295 pub fn snapshot(&self) -> EvolutionStatsSnapshot {
296 EvolutionStatsSnapshot {
297 rows_migrated: self.rows_migrated.load(Ordering::Relaxed),
298 migrations_applied: self.migrations_applied.load(Ordering::Relaxed),
299 migration_errors: self.migration_errors.load(Ordering::Relaxed),
300 lazy_migrations: self.lazy_migrations.load(Ordering::Relaxed),
301 background_migrations: self.background_migrations.load(Ordering::Relaxed),
302 }
303 }
304}
305
306#[derive(Debug, Clone)]
307pub struct EvolutionStatsSnapshot {
308 pub rows_migrated: u64,
309 pub migrations_applied: u64,
310 pub migration_errors: u64,
311 pub lazy_migrations: u64,
312 pub background_migrations: u64,
313}
314
315pub struct SchemaRegistry {
317 schemas: HashMap<SchemaId, SochSchema>,
319 current_versions: HashMap<String, SchemaVersion>,
321 migrations: HashMap<(String, SchemaVersion), Migration>,
323 custom_converters: Vec<fn(&SochValue) -> SochValue>,
325 stats: Arc<EvolutionStats>,
327}
328
329impl SchemaRegistry {
330 pub fn new() -> Self {
331 Self {
332 schemas: HashMap::new(),
333 current_versions: HashMap::new(),
334 migrations: HashMap::new(),
335 custom_converters: Vec::new(),
336 stats: Arc::new(EvolutionStats::default()),
337 }
338 }
339
340 pub fn register_schema(&mut self, schema: SochSchema, version: SchemaVersion) {
342 let name = schema.name.clone();
343 let id = SchemaId::new(&name, version);
344 self.schemas.insert(id, schema);
345
346 let current = self.current_versions.entry(name).or_insert(0);
348 if version > *current {
349 *current = version;
350 }
351 }
352
353 pub fn register_migration(
355 &mut self,
356 name: impl Into<String>,
357 migration: Migration,
358 ) -> Result<()> {
359 let name = name.into();
360
361 let key = (name.clone(), migration.from_version);
363
364 if self.migrations.contains_key(&key) {
365 return Err(SochDBError::SchemaEvolution(format!(
366 "Migration from version {} already exists for {}",
367 migration.from_version, name
368 )));
369 }
370
371 self.migrations.insert(key, migration);
372 Ok(())
373 }
374
375 pub fn register_converter(&mut self, converter: fn(&SochValue) -> SochValue) -> usize {
377 let idx = self.custom_converters.len();
378 self.custom_converters.push(converter);
379 idx
380 }
381
382 pub fn current_version(&self, name: &str) -> Option<SchemaVersion> {
384 self.current_versions.get(name).copied()
385 }
386
387 pub fn get_schema(&self, name: &str, version: SchemaVersion) -> Option<&SochSchema> {
389 self.schemas.get(&SchemaId::new(name, version))
390 }
391
392 pub fn current_schema(&self, name: &str) -> Option<&SochSchema> {
394 self.current_version(name)
395 .and_then(|v| self.get_schema(name, v))
396 }
397
398 pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
400 let current_version = self.current_version(name).ok_or_else(|| {
401 SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
402 })?;
403
404 if row.version == current_version {
405 return Ok(row);
406 }
407
408 if row.version > current_version {
409 return Err(SochDBError::SchemaEvolution(format!(
410 "Row version {} is newer than current schema version {}",
411 row.version, current_version
412 )));
413 }
414
415 let mut current_row = row.data;
417 let mut version = row.version;
418
419 while version < current_version {
420 let migration = self
421 .migrations
422 .get(&(name.to_string(), version))
423 .ok_or_else(|| {
424 SochDBError::SchemaEvolution(format!(
425 "No migration path from version {} for '{}'",
426 version, name
427 ))
428 })?;
429
430 current_row = self.apply_migration(¤t_row, migration, name)?;
431 version = migration.to_version;
432 self.stats.record_migration_applied();
433 }
434
435 self.stats.record_lazy_migration();
436
437 Ok(VersionedRow::new(current_version, current_row))
438 }
439
440 fn apply_migration(
442 &self,
443 row: &SochRow,
444 migration: &Migration,
445 schema_name: &str,
446 ) -> Result<SochRow> {
447 let source_schema = self
449 .get_schema(schema_name, migration.from_version)
450 .ok_or_else(|| {
451 SochDBError::SchemaEvolution(format!(
452 "Source schema version {} not found",
453 migration.from_version
454 ))
455 })?;
456
457 let mut values = row.values.clone();
458 let mut column_names: Vec<String> = source_schema
459 .fields
460 .iter()
461 .map(|f| f.name.clone())
462 .collect();
463
464 for change in &migration.changes {
465 match change {
466 SchemaChange::AddColumn {
467 name,
468 default,
469 position,
470 ..
471 } => match position {
472 Some(pos) if *pos <= values.len() => {
473 values.insert(*pos, default.clone());
474 column_names.insert(*pos, name.clone());
475 }
476 _ => {
477 values.push(default.clone());
478 column_names.push(name.clone());
479 }
480 },
481 SchemaChange::DropColumn { name } => {
482 if let Some(idx) = column_names.iter().position(|n| n == name) {
483 values.remove(idx);
484 column_names.remove(idx);
485 }
486 }
487 SchemaChange::RenameColumn { old_name, new_name } => {
488 if let Some(idx) = column_names.iter().position(|n| n == old_name) {
489 column_names[idx] = new_name.clone();
490 }
491 }
492 SchemaChange::ChangeType {
493 name, converter, ..
494 } => {
495 if let Some(idx) = column_names.iter().position(|n| n == name) {
496 values[idx] = converter.convert(&values[idx], &self.custom_converters)?;
497 }
498 }
499 SchemaChange::ReorderColumns { new_order } => {
500 let mut new_values = Vec::with_capacity(new_order.len());
501 for col_name in new_order {
502 if let Some(idx) = column_names.iter().position(|n| n == col_name) {
503 new_values.push(values[idx].clone());
504 }
505 }
506 values = new_values;
507 column_names = new_order.clone();
508 }
509 }
510 }
511
512 Ok(SochRow::new(values))
513 }
514
515 pub fn stats(&self) -> Arc<EvolutionStats> {
517 Arc::clone(&self.stats)
518 }
519}
520
521impl Default for SchemaRegistry {
522 fn default() -> Self {
523 Self::new()
524 }
525}
526
527pub struct SchemaEvolutionManager {
529 registry: SchemaRegistry,
530 pending_migrations: Vec<(String, SchemaVersion, usize)>,
532}
533
534impl SchemaEvolutionManager {
535 pub fn new() -> Self {
536 Self {
537 registry: SchemaRegistry::new(),
538 pending_migrations: Vec::new(),
539 }
540 }
541
542 pub fn evolve_schema(
544 &mut self,
545 name: &str,
546 changes: Vec<SchemaChange>,
547 ) -> Result<SchemaVersion> {
548 let current_version = self.registry.current_version(name).ok_or_else(|| {
549 SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
550 })?;
551
552 let current_schema = self
553 .registry
554 .current_schema(name)
555 .ok_or_else(|| {
556 SochDBError::SchemaEvolution(format!("Current schema not found for '{}'", name))
557 })?
558 .clone();
559
560 let new_version = current_version + 1;
562 let new_schema = self.apply_schema_changes(¤t_schema, &changes)?;
563
564 self.registry.register_schema(new_schema, new_version);
566
567 let migration = Migration {
568 from_version: current_version,
569 to_version: new_version,
570 changes,
571 };
572 self.registry.register_migration(name, migration)?;
573
574 Ok(new_version)
575 }
576
577 fn apply_schema_changes(
579 &self,
580 schema: &SochSchema,
581 changes: &[SchemaChange],
582 ) -> Result<SochSchema> {
583 let mut new_schema = schema.clone();
584
585 for change in changes {
586 match change {
587 SchemaChange::AddColumn {
588 name,
589 column_type,
590 position,
591 ..
592 } => {
593 let field = crate::soch::SochField {
594 name: name.clone(),
595 field_type: column_type.clone(),
596 nullable: true,
597 default: None,
598 };
599 match position {
600 Some(pos) if *pos <= new_schema.fields.len() => {
601 new_schema.fields.insert(*pos, field);
602 }
603 _ => {
604 new_schema.fields.push(field);
605 }
606 }
607 }
608 SchemaChange::DropColumn { name } => {
609 new_schema.fields.retain(|f| f.name != *name);
610 }
611 SchemaChange::RenameColumn { old_name, new_name } => {
612 for field in &mut new_schema.fields {
613 if field.name == *old_name {
614 field.name = new_name.clone();
615 }
616 }
617 }
618 SchemaChange::ChangeType { name, new_type, .. } => {
619 for field in &mut new_schema.fields {
620 if field.name == *name {
621 field.field_type = new_type.clone();
622 }
623 }
624 }
625 SchemaChange::ReorderColumns { new_order } => {
626 let mut new_fields = Vec::with_capacity(new_order.len());
627 for col_name in new_order {
628 if let Some(field) = new_schema.fields.iter().find(|f| &f.name == col_name)
629 {
630 new_fields.push(field.clone());
631 }
632 }
633 new_schema.fields = new_fields;
634 }
635 }
636 }
637
638 Ok(new_schema)
639 }
640
641 pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
643 self.registry.migrate_row(name, row)
644 }
645
646 pub fn schedule_background_migration(
648 &mut self,
649 name: impl Into<String>,
650 from_version: SchemaVersion,
651 row_count: usize,
652 ) {
653 self.pending_migrations
654 .push((name.into(), from_version, row_count));
655 }
656
657 pub fn pending_migrations(&self) -> &[(String, SchemaVersion, usize)] {
659 &self.pending_migrations
660 }
661
662 pub fn registry(&self) -> &SchemaRegistry {
664 &self.registry
665 }
666
667 pub fn registry_mut(&mut self) -> &mut SchemaRegistry {
669 &mut self.registry
670 }
671
672 pub fn stats(&self) -> Arc<EvolutionStats> {
674 self.registry.stats()
675 }
676}
677
678impl Default for SchemaEvolutionManager {
679 fn default() -> Self {
680 Self::new()
681 }
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687
688 fn create_test_schema() -> SochSchema {
689 SochSchema::new("users")
690 .field("id", SochType::UInt)
691 .field("name", SochType::Text)
692 }
693
694 #[test]
695 fn test_schema_registration() {
696 let mut registry = SchemaRegistry::new();
697 let schema = create_test_schema();
698
699 registry.register_schema(schema, 1);
700
701 assert_eq!(registry.current_version("users"), Some(1));
702 assert!(registry.get_schema("users", 1).is_some());
703 }
704
705 #[test]
706 fn test_add_column_migration() {
707 let mut registry = SchemaRegistry::new();
708
709 let schema_v1 = create_test_schema();
711 registry.register_schema(schema_v1, 1);
712
713 let schema_v2 = SochSchema::new("users")
715 .field("id", SochType::UInt)
716 .field("name", SochType::Text)
717 .field("email", SochType::Text);
718 registry.register_schema(schema_v2, 2);
719
720 let migration =
722 Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
723 registry.register_migration("users", migration).unwrap();
724
725 let row_v1 = VersionedRow::new(
727 1,
728 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
729 );
730
731 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
733
734 assert_eq!(row_v2.version, 2);
735 assert_eq!(row_v2.data.values.len(), 3);
736 assert_eq!(row_v2.data.values[2], SochValue::Text("".into()));
737 }
738
739 #[test]
740 fn test_drop_column_migration() {
741 let mut registry = SchemaRegistry::new();
742
743 let schema_v1 = SochSchema::new("users")
745 .field("id", SochType::UInt)
746 .field("name", SochType::Text)
747 .field("legacy", SochType::Text);
748 registry.register_schema(schema_v1, 1);
749
750 let schema_v2 = SochSchema::new("users")
752 .field("id", SochType::UInt)
753 .field("name", SochType::Text);
754 registry.register_schema(schema_v2, 2);
755
756 let migration = Migration::new(1, 2).drop_column("legacy");
758 registry.register_migration("users", migration).unwrap();
759
760 let row_v1 = VersionedRow::new(
762 1,
763 SochRow::new(vec![
764 SochValue::UInt(1),
765 SochValue::Text("Alice".into()),
766 SochValue::Text("old_data".into()),
767 ]),
768 );
769
770 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
772
773 assert_eq!(row_v2.version, 2);
774 assert_eq!(row_v2.data.values.len(), 2);
775 }
776
777 #[test]
778 fn test_rename_column_migration() {
779 let mut registry = SchemaRegistry::new();
780
781 let schema_v1 = SochSchema::new("users")
782 .field("id", SochType::UInt)
783 .field("name", SochType::Text);
784 registry.register_schema(schema_v1, 1);
785
786 let schema_v2 = SochSchema::new("users")
787 .field("id", SochType::UInt)
788 .field("full_name", SochType::Text);
789 registry.register_schema(schema_v2, 2);
790
791 let migration = Migration::new(1, 2).rename_column("name", "full_name");
792 registry.register_migration("users", migration).unwrap();
793
794 let row_v1 = VersionedRow::new(
795 1,
796 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
797 );
798
799 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
800
801 assert_eq!(row_v2.version, 2);
802 assert_eq!(row_v2.data.values.len(), 2);
803 assert_eq!(row_v2.data.values[1], SochValue::Text("Alice".into()));
804 }
805
806 #[test]
807 fn test_type_conversion_migration() {
808 let mut registry = SchemaRegistry::new();
809
810 let schema_v1 = SochSchema::new("products")
811 .field("id", SochType::UInt)
812 .field("price", SochType::Int);
813 registry.register_schema(schema_v1, 1);
814
815 let schema_v2 = SochSchema::new("products")
816 .field("id", SochType::UInt)
817 .field("price", SochType::Float);
818 registry.register_schema(schema_v2, 2);
819
820 let migration =
821 Migration::new(1, 2).change_type("price", SochType::Float, TypeConverter::IntToFloat);
822 registry.register_migration("products", migration).unwrap();
823
824 let row_v1 = VersionedRow::new(
825 1,
826 SochRow::new(vec![SochValue::UInt(1), SochValue::Int(100)]),
827 );
828
829 let row_v2 = registry.migrate_row("products", row_v1).unwrap();
830
831 assert_eq!(row_v2.version, 2);
832 assert_eq!(row_v2.data.values[1], SochValue::Float(100.0));
833 }
834
835 #[test]
836 fn test_multi_version_migration_chain() {
837 let mut registry = SchemaRegistry::new();
838
839 let schema_v1 = create_test_schema();
841 registry.register_schema(schema_v1, 1);
842
843 let schema_v2 = SochSchema::new("users")
845 .field("id", SochType::UInt)
846 .field("name", SochType::Text)
847 .field("email", SochType::Text);
848 registry.register_schema(schema_v2, 2);
849
850 let schema_v3 = SochSchema::new("users")
852 .field("id", SochType::UInt)
853 .field("full_name", SochType::Text)
854 .field("email", SochType::Text)
855 .field("created_at", SochType::Int);
856 registry.register_schema(schema_v3, 3);
857
858 let migration_1_2 =
860 Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
861 registry.register_migration("users", migration_1_2).unwrap();
862
863 let migration_2_3 = Migration::new(2, 3)
865 .rename_column("name", "full_name")
866 .add_column("created_at", SochType::Int, SochValue::Int(0));
867 registry.register_migration("users", migration_2_3).unwrap();
868
869 let row_v1 = VersionedRow::new(
871 1,
872 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
873 );
874
875 let row_v3 = registry.migrate_row("users", row_v1).unwrap();
876
877 assert_eq!(row_v3.version, 3);
878 assert_eq!(row_v3.data.values.len(), 4);
879 assert_eq!(row_v3.data.values[0], SochValue::UInt(1));
880 assert_eq!(row_v3.data.values[1], SochValue::Text("Alice".into()));
881 assert_eq!(row_v3.data.values[2], SochValue::Text("".into()));
882 assert_eq!(row_v3.data.values[3], SochValue::Int(0));
883 }
884
885 #[test]
886 fn test_evolve_schema() {
887 let mut manager = SchemaEvolutionManager::new();
888
889 let schema = create_test_schema();
891 manager.registry_mut().register_schema(schema, 1);
892
893 let changes = vec![SchemaChange::AddColumn {
895 name: "email".to_string(),
896 column_type: SochType::Text,
897 default: SochValue::Text("".into()),
898 position: None,
899 }];
900
901 let new_version = manager.evolve_schema("users", changes).unwrap();
902
903 assert_eq!(new_version, 2);
904 assert_eq!(manager.registry().current_version("users"), Some(2));
905
906 let current = manager.registry().current_schema("users").unwrap();
907 assert_eq!(current.fields.len(), 3);
908 }
909
910 #[test]
911 fn test_no_migration_needed_for_current_version() {
912 let mut registry = SchemaRegistry::new();
913 let schema = create_test_schema();
914 registry.register_schema(schema, 1);
915
916 let row = VersionedRow::new(
917 1,
918 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
919 );
920
921 let result = registry.migrate_row("users", row.clone()).unwrap();
922
923 assert_eq!(result.version, row.version);
924 assert_eq!(result.data.values, row.data.values);
925 }
926
927 #[test]
928 fn test_stats_tracking() {
929 let mut registry = SchemaRegistry::new();
930
931 let schema_v1 = create_test_schema();
932 registry.register_schema(schema_v1, 1);
933
934 let schema_v2 = SochSchema::new("users")
935 .field("id", SochType::UInt)
936 .field("name", SochType::Text)
937 .field("email", SochType::Text);
938 registry.register_schema(schema_v2, 2);
939
940 let migration =
941 Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
942 registry.register_migration("users", migration).unwrap();
943
944 let row = VersionedRow::new(
946 1,
947 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
948 );
949 registry.migrate_row("users", row).unwrap();
950
951 let stats = registry.stats().snapshot();
952 assert_eq!(stats.rows_migrated, 1);
953 assert_eq!(stats.migrations_applied, 1);
954 assert_eq!(stats.lazy_migrations, 1);
955 }
956
957 #[test]
958 fn test_error_on_future_version() {
959 let mut registry = SchemaRegistry::new();
960 let schema = create_test_schema();
961 registry.register_schema(schema, 1);
962
963 let row = VersionedRow::new(
964 99,
965 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
966 );
967
968 let result = registry.migrate_row("users", row);
969 assert!(result.is_err());
970 }
971
972 #[test]
973 fn test_custom_type_converter() {
974 let mut registry = SchemaRegistry::new();
975
976 let converter_idx = registry.register_converter(|v| {
978 if let SochValue::Text(s) = v {
979 SochValue::Text(s.to_uppercase())
980 } else {
981 v.clone()
982 }
983 });
984
985 let schema_v1 = SochSchema::new("users")
986 .field("id", SochType::UInt)
987 .field("name", SochType::Text);
988 registry.register_schema(schema_v1, 1);
989
990 let schema_v2 = SochSchema::new("users")
991 .field("id", SochType::UInt)
992 .field("name", SochType::Text); registry.register_schema(schema_v2, 2);
994
995 let migration = Migration::new(1, 2).change_type(
996 "name",
997 SochType::Text,
998 TypeConverter::Custom(converter_idx),
999 );
1000 registry.register_migration("users", migration).unwrap();
1001
1002 let row_v1 = VersionedRow::new(
1003 1,
1004 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("alice".into())]),
1005 );
1006
1007 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
1008 assert_eq!(row_v2.data.values[1], SochValue::Text("ALICE".into()));
1009 }
1010}