1use std::collections::HashMap;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicU64, Ordering};
49
50use crate::error::{Result, SochDBError};
51use crate::soch::{SochRow, SochSchema, SochType, SochValue};
52
53pub type SchemaVersion = u64;
55
56#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct SchemaId {
59 pub name: String,
60 pub version: SchemaVersion,
61}
62
63impl SchemaId {
64 pub fn new(name: impl Into<String>, version: SchemaVersion) -> Self {
65 Self {
66 name: name.into(),
67 version,
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub enum SchemaChange {
75 AddColumn {
77 name: String,
78 column_type: SochType,
79 default: SochValue,
80 position: Option<usize>, },
82 DropColumn { name: String },
84 RenameColumn { old_name: String, new_name: String },
86 ChangeType {
88 name: String,
89 new_type: SochType,
90 converter: TypeConverter,
91 },
92 ReorderColumns { new_order: Vec<String> },
94}
95
96#[derive(Debug, Clone)]
98pub enum TypeConverter {
99 Identity,
101 IntToText,
103 TextToInt,
105 FloatToInt,
107 IntToFloat,
109 Custom(usize),
111}
112
113impl TypeConverter {
114 pub fn convert(
116 &self,
117 value: &SochValue,
118 custom_converters: &[fn(&SochValue) -> SochValue],
119 ) -> Result<SochValue> {
120 match self {
121 TypeConverter::Identity => Ok(value.clone()),
122 TypeConverter::IntToText => match value {
123 SochValue::Int(i) => Ok(SochValue::Text(i.to_string())),
124 SochValue::UInt(u) => Ok(SochValue::Text(u.to_string())),
125 _ => Err(SochDBError::SchemaEvolution(format!(
126 "Cannot convert {:?} to text via IntToText",
127 value
128 ))),
129 },
130 TypeConverter::TextToInt => match value {
131 SochValue::Text(s) => s.parse::<i64>().map(SochValue::Int).map_err(|_| {
132 SochDBError::SchemaEvolution(format!("Cannot parse '{}' as integer", s))
133 }),
134 _ => Err(SochDBError::SchemaEvolution(format!(
135 "Cannot convert {:?} to int via TextToInt",
136 value
137 ))),
138 },
139 TypeConverter::FloatToInt => match value {
140 SochValue::Float(f) => Ok(SochValue::Int(*f as i64)),
141 _ => Err(SochDBError::SchemaEvolution(format!(
142 "Cannot convert {:?} to int via FloatToInt",
143 value
144 ))),
145 },
146 TypeConverter::IntToFloat => match value {
147 SochValue::Int(i) => Ok(SochValue::Float(*i as f64)),
148 SochValue::UInt(u) => Ok(SochValue::Float(*u as f64)),
149 _ => Err(SochDBError::SchemaEvolution(format!(
150 "Cannot convert {:?} to float via IntToFloat",
151 value
152 ))),
153 },
154 TypeConverter::Custom(idx) => {
155 if *idx < custom_converters.len() {
156 Ok(custom_converters[*idx](value))
157 } else {
158 Err(SochDBError::SchemaEvolution(format!(
159 "Custom converter index {} out of bounds",
160 idx
161 )))
162 }
163 }
164 }
165 }
166}
167
168#[derive(Debug, Clone)]
170pub struct Migration {
171 pub from_version: SchemaVersion,
172 pub to_version: SchemaVersion,
173 pub changes: Vec<SchemaChange>,
174}
175
176impl Migration {
177 pub fn new(from: SchemaVersion, to: SchemaVersion) -> Self {
178 Self {
179 from_version: from,
180 to_version: to,
181 changes: Vec::new(),
182 }
183 }
184
185 pub fn add_column(
186 mut self,
187 name: impl Into<String>,
188 column_type: SochType,
189 default: SochValue,
190 ) -> Self {
191 self.changes.push(SchemaChange::AddColumn {
192 name: name.into(),
193 column_type,
194 default,
195 position: None,
196 });
197 self
198 }
199
200 pub fn add_column_at(
201 mut self,
202 name: impl Into<String>,
203 column_type: SochType,
204 default: SochValue,
205 position: usize,
206 ) -> Self {
207 self.changes.push(SchemaChange::AddColumn {
208 name: name.into(),
209 column_type,
210 default,
211 position: Some(position),
212 });
213 self
214 }
215
216 pub fn drop_column(mut self, name: impl Into<String>) -> Self {
217 self.changes
218 .push(SchemaChange::DropColumn { name: name.into() });
219 self
220 }
221
222 pub fn rename_column(
223 mut self,
224 old_name: impl Into<String>,
225 new_name: impl Into<String>,
226 ) -> Self {
227 self.changes.push(SchemaChange::RenameColumn {
228 old_name: old_name.into(),
229 new_name: new_name.into(),
230 });
231 self
232 }
233
234 pub fn change_type(
235 mut self,
236 name: impl Into<String>,
237 new_type: SochType,
238 converter: TypeConverter,
239 ) -> Self {
240 self.changes.push(SchemaChange::ChangeType {
241 name: name.into(),
242 new_type,
243 converter,
244 });
245 self
246 }
247}
248
249#[derive(Debug, Clone)]
251pub struct VersionedRow {
252 pub version: SchemaVersion,
253 pub data: SochRow,
254}
255
256impl VersionedRow {
257 pub fn new(version: SchemaVersion, data: SochRow) -> Self {
258 Self { version, data }
259 }
260}
261
262#[derive(Debug, Default)]
264pub struct EvolutionStats {
265 pub rows_migrated: AtomicU64,
266 pub migrations_applied: AtomicU64,
267 pub migration_errors: AtomicU64,
268 pub lazy_migrations: AtomicU64,
269 pub background_migrations: AtomicU64,
270}
271
272impl EvolutionStats {
273 pub fn record_lazy_migration(&self) {
274 self.lazy_migrations.fetch_add(1, Ordering::Relaxed);
275 self.rows_migrated.fetch_add(1, Ordering::Relaxed);
276 }
277
278 pub fn record_background_migration(&self, count: u64) {
279 self.background_migrations
280 .fetch_add(count, Ordering::Relaxed);
281 self.rows_migrated.fetch_add(count, Ordering::Relaxed);
282 }
283
284 pub fn record_migration_applied(&self) {
285 self.migrations_applied.fetch_add(1, Ordering::Relaxed);
286 }
287
288 pub fn record_error(&self) {
289 self.migration_errors.fetch_add(1, Ordering::Relaxed);
290 }
291
292 pub fn snapshot(&self) -> EvolutionStatsSnapshot {
293 EvolutionStatsSnapshot {
294 rows_migrated: self.rows_migrated.load(Ordering::Relaxed),
295 migrations_applied: self.migrations_applied.load(Ordering::Relaxed),
296 migration_errors: self.migration_errors.load(Ordering::Relaxed),
297 lazy_migrations: self.lazy_migrations.load(Ordering::Relaxed),
298 background_migrations: self.background_migrations.load(Ordering::Relaxed),
299 }
300 }
301}
302
303#[derive(Debug, Clone)]
304pub struct EvolutionStatsSnapshot {
305 pub rows_migrated: u64,
306 pub migrations_applied: u64,
307 pub migration_errors: u64,
308 pub lazy_migrations: u64,
309 pub background_migrations: u64,
310}
311
312pub struct SchemaRegistry {
314 schemas: HashMap<SchemaId, SochSchema>,
316 current_versions: HashMap<String, SchemaVersion>,
318 migrations: HashMap<(String, SchemaVersion), Migration>,
320 custom_converters: Vec<fn(&SochValue) -> SochValue>,
322 stats: Arc<EvolutionStats>,
324}
325
326impl SchemaRegistry {
327 pub fn new() -> Self {
328 Self {
329 schemas: HashMap::new(),
330 current_versions: HashMap::new(),
331 migrations: HashMap::new(),
332 custom_converters: Vec::new(),
333 stats: Arc::new(EvolutionStats::default()),
334 }
335 }
336
337 pub fn register_schema(&mut self, schema: SochSchema, version: SchemaVersion) {
339 let name = schema.name.clone();
340 let id = SchemaId::new(&name, version);
341 self.schemas.insert(id, schema);
342
343 let current = self.current_versions.entry(name).or_insert(0);
345 if version > *current {
346 *current = version;
347 }
348 }
349
350 pub fn register_migration(
352 &mut self,
353 name: impl Into<String>,
354 migration: Migration,
355 ) -> Result<()> {
356 let name = name.into();
357
358 let key = (name.clone(), migration.from_version);
360
361 if self.migrations.contains_key(&key) {
362 return Err(SochDBError::SchemaEvolution(format!(
363 "Migration from version {} already exists for {}",
364 migration.from_version, name
365 )));
366 }
367
368 self.migrations.insert(key, migration);
369 Ok(())
370 }
371
372 pub fn register_converter(&mut self, converter: fn(&SochValue) -> SochValue) -> usize {
374 let idx = self.custom_converters.len();
375 self.custom_converters.push(converter);
376 idx
377 }
378
379 pub fn current_version(&self, name: &str) -> Option<SchemaVersion> {
381 self.current_versions.get(name).copied()
382 }
383
384 pub fn get_schema(&self, name: &str, version: SchemaVersion) -> Option<&SochSchema> {
386 self.schemas.get(&SchemaId::new(name, version))
387 }
388
389 pub fn current_schema(&self, name: &str) -> Option<&SochSchema> {
391 self.current_version(name)
392 .and_then(|v| self.get_schema(name, v))
393 }
394
395 pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
397 let current_version = self.current_version(name).ok_or_else(|| {
398 SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
399 })?;
400
401 if row.version == current_version {
402 return Ok(row);
403 }
404
405 if row.version > current_version {
406 return Err(SochDBError::SchemaEvolution(format!(
407 "Row version {} is newer than current schema version {}",
408 row.version, current_version
409 )));
410 }
411
412 let mut current_row = row.data;
414 let mut version = row.version;
415
416 while version < current_version {
417 let migration = self
418 .migrations
419 .get(&(name.to_string(), version))
420 .ok_or_else(|| {
421 SochDBError::SchemaEvolution(format!(
422 "No migration path from version {} for '{}'",
423 version, name
424 ))
425 })?;
426
427 current_row = self.apply_migration(¤t_row, migration, name)?;
428 version = migration.to_version;
429 self.stats.record_migration_applied();
430 }
431
432 self.stats.record_lazy_migration();
433
434 Ok(VersionedRow::new(current_version, current_row))
435 }
436
437 fn apply_migration(
439 &self,
440 row: &SochRow,
441 migration: &Migration,
442 schema_name: &str,
443 ) -> Result<SochRow> {
444 let source_schema = self
446 .get_schema(schema_name, migration.from_version)
447 .ok_or_else(|| {
448 SochDBError::SchemaEvolution(format!(
449 "Source schema version {} not found",
450 migration.from_version
451 ))
452 })?;
453
454 let mut values = row.values.clone();
455 let mut column_names: Vec<String> = source_schema
456 .fields
457 .iter()
458 .map(|f| f.name.clone())
459 .collect();
460
461 for change in &migration.changes {
462 match change {
463 SchemaChange::AddColumn {
464 name,
465 default,
466 position,
467 ..
468 } => match position {
469 Some(pos) if *pos <= values.len() => {
470 values.insert(*pos, default.clone());
471 column_names.insert(*pos, name.clone());
472 }
473 _ => {
474 values.push(default.clone());
475 column_names.push(name.clone());
476 }
477 },
478 SchemaChange::DropColumn { name } => {
479 if let Some(idx) = column_names.iter().position(|n| n == name) {
480 values.remove(idx);
481 column_names.remove(idx);
482 }
483 }
484 SchemaChange::RenameColumn { old_name, new_name } => {
485 if let Some(idx) = column_names.iter().position(|n| n == old_name) {
486 column_names[idx] = new_name.clone();
487 }
488 }
489 SchemaChange::ChangeType {
490 name, converter, ..
491 } => {
492 if let Some(idx) = column_names.iter().position(|n| n == name) {
493 values[idx] = converter.convert(&values[idx], &self.custom_converters)?;
494 }
495 }
496 SchemaChange::ReorderColumns { new_order } => {
497 let mut new_values = Vec::with_capacity(new_order.len());
498 for col_name in new_order {
499 if let Some(idx) = column_names.iter().position(|n| n == col_name) {
500 new_values.push(values[idx].clone());
501 }
502 }
503 values = new_values;
504 column_names = new_order.clone();
505 }
506 }
507 }
508
509 Ok(SochRow::new(values))
510 }
511
512 pub fn stats(&self) -> Arc<EvolutionStats> {
514 Arc::clone(&self.stats)
515 }
516}
517
518impl Default for SchemaRegistry {
519 fn default() -> Self {
520 Self::new()
521 }
522}
523
524pub struct SchemaEvolutionManager {
526 registry: SchemaRegistry,
527 pending_migrations: Vec<(String, SchemaVersion, usize)>,
529}
530
531impl SchemaEvolutionManager {
532 pub fn new() -> Self {
533 Self {
534 registry: SchemaRegistry::new(),
535 pending_migrations: Vec::new(),
536 }
537 }
538
539 pub fn evolve_schema(
541 &mut self,
542 name: &str,
543 changes: Vec<SchemaChange>,
544 ) -> Result<SchemaVersion> {
545 let current_version = self.registry.current_version(name).ok_or_else(|| {
546 SochDBError::SchemaEvolution(format!("No schema registered for '{}'", name))
547 })?;
548
549 let current_schema = self
550 .registry
551 .current_schema(name)
552 .ok_or_else(|| {
553 SochDBError::SchemaEvolution(format!("Current schema not found for '{}'", name))
554 })?
555 .clone();
556
557 let new_version = current_version + 1;
559 let new_schema = self.apply_schema_changes(¤t_schema, &changes)?;
560
561 self.registry.register_schema(new_schema, new_version);
563
564 let migration = Migration {
565 from_version: current_version,
566 to_version: new_version,
567 changes,
568 };
569 self.registry.register_migration(name, migration)?;
570
571 Ok(new_version)
572 }
573
574 fn apply_schema_changes(
576 &self,
577 schema: &SochSchema,
578 changes: &[SchemaChange],
579 ) -> Result<SochSchema> {
580 let mut new_schema = schema.clone();
581
582 for change in changes {
583 match change {
584 SchemaChange::AddColumn {
585 name,
586 column_type,
587 position,
588 ..
589 } => {
590 let field = crate::soch::SochField {
591 name: name.clone(),
592 field_type: column_type.clone(),
593 nullable: true,
594 default: None,
595 };
596 match position {
597 Some(pos) if *pos <= new_schema.fields.len() => {
598 new_schema.fields.insert(*pos, field);
599 }
600 _ => {
601 new_schema.fields.push(field);
602 }
603 }
604 }
605 SchemaChange::DropColumn { name } => {
606 new_schema.fields.retain(|f| f.name != *name);
607 }
608 SchemaChange::RenameColumn { old_name, new_name } => {
609 for field in &mut new_schema.fields {
610 if field.name == *old_name {
611 field.name = new_name.clone();
612 }
613 }
614 }
615 SchemaChange::ChangeType { name, new_type, .. } => {
616 for field in &mut new_schema.fields {
617 if field.name == *name {
618 field.field_type = new_type.clone();
619 }
620 }
621 }
622 SchemaChange::ReorderColumns { new_order } => {
623 let mut new_fields = Vec::with_capacity(new_order.len());
624 for col_name in new_order {
625 if let Some(field) = new_schema.fields.iter().find(|f| &f.name == col_name)
626 {
627 new_fields.push(field.clone());
628 }
629 }
630 new_schema.fields = new_fields;
631 }
632 }
633 }
634
635 Ok(new_schema)
636 }
637
638 pub fn migrate_row(&self, name: &str, row: VersionedRow) -> Result<VersionedRow> {
640 self.registry.migrate_row(name, row)
641 }
642
643 pub fn schedule_background_migration(
645 &mut self,
646 name: impl Into<String>,
647 from_version: SchemaVersion,
648 row_count: usize,
649 ) {
650 self.pending_migrations
651 .push((name.into(), from_version, row_count));
652 }
653
654 pub fn pending_migrations(&self) -> &[(String, SchemaVersion, usize)] {
656 &self.pending_migrations
657 }
658
659 pub fn registry(&self) -> &SchemaRegistry {
661 &self.registry
662 }
663
664 pub fn registry_mut(&mut self) -> &mut SchemaRegistry {
666 &mut self.registry
667 }
668
669 pub fn stats(&self) -> Arc<EvolutionStats> {
671 self.registry.stats()
672 }
673}
674
675impl Default for SchemaEvolutionManager {
676 fn default() -> Self {
677 Self::new()
678 }
679}
680
681#[cfg(test)]
682mod tests {
683 use super::*;
684
685 fn create_test_schema() -> SochSchema {
686 SochSchema::new("users")
687 .field("id", SochType::UInt)
688 .field("name", SochType::Text)
689 }
690
691 #[test]
692 fn test_schema_registration() {
693 let mut registry = SchemaRegistry::new();
694 let schema = create_test_schema();
695
696 registry.register_schema(schema, 1);
697
698 assert_eq!(registry.current_version("users"), Some(1));
699 assert!(registry.get_schema("users", 1).is_some());
700 }
701
702 #[test]
703 fn test_add_column_migration() {
704 let mut registry = SchemaRegistry::new();
705
706 let schema_v1 = create_test_schema();
708 registry.register_schema(schema_v1, 1);
709
710 let schema_v2 = SochSchema::new("users")
712 .field("id", SochType::UInt)
713 .field("name", SochType::Text)
714 .field("email", SochType::Text);
715 registry.register_schema(schema_v2, 2);
716
717 let migration =
719 Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
720 registry.register_migration("users", migration).unwrap();
721
722 let row_v1 = VersionedRow::new(
724 1,
725 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
726 );
727
728 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
730
731 assert_eq!(row_v2.version, 2);
732 assert_eq!(row_v2.data.values.len(), 3);
733 assert_eq!(row_v2.data.values[2], SochValue::Text("".into()));
734 }
735
736 #[test]
737 fn test_drop_column_migration() {
738 let mut registry = SchemaRegistry::new();
739
740 let schema_v1 = SochSchema::new("users")
742 .field("id", SochType::UInt)
743 .field("name", SochType::Text)
744 .field("legacy", SochType::Text);
745 registry.register_schema(schema_v1, 1);
746
747 let schema_v2 = SochSchema::new("users")
749 .field("id", SochType::UInt)
750 .field("name", SochType::Text);
751 registry.register_schema(schema_v2, 2);
752
753 let migration = Migration::new(1, 2).drop_column("legacy");
755 registry.register_migration("users", migration).unwrap();
756
757 let row_v1 = VersionedRow::new(
759 1,
760 SochRow::new(vec![
761 SochValue::UInt(1),
762 SochValue::Text("Alice".into()),
763 SochValue::Text("old_data".into()),
764 ]),
765 );
766
767 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
769
770 assert_eq!(row_v2.version, 2);
771 assert_eq!(row_v2.data.values.len(), 2);
772 }
773
774 #[test]
775 fn test_rename_column_migration() {
776 let mut registry = SchemaRegistry::new();
777
778 let schema_v1 = SochSchema::new("users")
779 .field("id", SochType::UInt)
780 .field("name", SochType::Text);
781 registry.register_schema(schema_v1, 1);
782
783 let schema_v2 = SochSchema::new("users")
784 .field("id", SochType::UInt)
785 .field("full_name", SochType::Text);
786 registry.register_schema(schema_v2, 2);
787
788 let migration = Migration::new(1, 2).rename_column("name", "full_name");
789 registry.register_migration("users", migration).unwrap();
790
791 let row_v1 = VersionedRow::new(
792 1,
793 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
794 );
795
796 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
797
798 assert_eq!(row_v2.version, 2);
799 assert_eq!(row_v2.data.values.len(), 2);
800 assert_eq!(row_v2.data.values[1], SochValue::Text("Alice".into()));
801 }
802
803 #[test]
804 fn test_type_conversion_migration() {
805 let mut registry = SchemaRegistry::new();
806
807 let schema_v1 = SochSchema::new("products")
808 .field("id", SochType::UInt)
809 .field("price", SochType::Int);
810 registry.register_schema(schema_v1, 1);
811
812 let schema_v2 = SochSchema::new("products")
813 .field("id", SochType::UInt)
814 .field("price", SochType::Float);
815 registry.register_schema(schema_v2, 2);
816
817 let migration =
818 Migration::new(1, 2).change_type("price", SochType::Float, TypeConverter::IntToFloat);
819 registry.register_migration("products", migration).unwrap();
820
821 let row_v1 = VersionedRow::new(
822 1,
823 SochRow::new(vec![SochValue::UInt(1), SochValue::Int(100)]),
824 );
825
826 let row_v2 = registry.migrate_row("products", row_v1).unwrap();
827
828 assert_eq!(row_v2.version, 2);
829 assert_eq!(row_v2.data.values[1], SochValue::Float(100.0));
830 }
831
832 #[test]
833 fn test_multi_version_migration_chain() {
834 let mut registry = SchemaRegistry::new();
835
836 let schema_v1 = create_test_schema();
838 registry.register_schema(schema_v1, 1);
839
840 let schema_v2 = SochSchema::new("users")
842 .field("id", SochType::UInt)
843 .field("name", SochType::Text)
844 .field("email", SochType::Text);
845 registry.register_schema(schema_v2, 2);
846
847 let schema_v3 = SochSchema::new("users")
849 .field("id", SochType::UInt)
850 .field("full_name", SochType::Text)
851 .field("email", SochType::Text)
852 .field("created_at", SochType::Int);
853 registry.register_schema(schema_v3, 3);
854
855 let migration_1_2 =
857 Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
858 registry.register_migration("users", migration_1_2).unwrap();
859
860 let migration_2_3 = Migration::new(2, 3)
862 .rename_column("name", "full_name")
863 .add_column("created_at", SochType::Int, SochValue::Int(0));
864 registry.register_migration("users", migration_2_3).unwrap();
865
866 let row_v1 = VersionedRow::new(
868 1,
869 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
870 );
871
872 let row_v3 = registry.migrate_row("users", row_v1).unwrap();
873
874 assert_eq!(row_v3.version, 3);
875 assert_eq!(row_v3.data.values.len(), 4);
876 assert_eq!(row_v3.data.values[0], SochValue::UInt(1));
877 assert_eq!(row_v3.data.values[1], SochValue::Text("Alice".into()));
878 assert_eq!(row_v3.data.values[2], SochValue::Text("".into()));
879 assert_eq!(row_v3.data.values[3], SochValue::Int(0));
880 }
881
882 #[test]
883 fn test_evolve_schema() {
884 let mut manager = SchemaEvolutionManager::new();
885
886 let schema = create_test_schema();
888 manager.registry_mut().register_schema(schema, 1);
889
890 let changes = vec![SchemaChange::AddColumn {
892 name: "email".to_string(),
893 column_type: SochType::Text,
894 default: SochValue::Text("".into()),
895 position: None,
896 }];
897
898 let new_version = manager.evolve_schema("users", changes).unwrap();
899
900 assert_eq!(new_version, 2);
901 assert_eq!(manager.registry().current_version("users"), Some(2));
902
903 let current = manager.registry().current_schema("users").unwrap();
904 assert_eq!(current.fields.len(), 3);
905 }
906
907 #[test]
908 fn test_no_migration_needed_for_current_version() {
909 let mut registry = SchemaRegistry::new();
910 let schema = create_test_schema();
911 registry.register_schema(schema, 1);
912
913 let row = VersionedRow::new(
914 1,
915 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
916 );
917
918 let result = registry.migrate_row("users", row.clone()).unwrap();
919
920 assert_eq!(result.version, row.version);
921 assert_eq!(result.data.values, row.data.values);
922 }
923
924 #[test]
925 fn test_stats_tracking() {
926 let mut registry = SchemaRegistry::new();
927
928 let schema_v1 = create_test_schema();
929 registry.register_schema(schema_v1, 1);
930
931 let schema_v2 = SochSchema::new("users")
932 .field("id", SochType::UInt)
933 .field("name", SochType::Text)
934 .field("email", SochType::Text);
935 registry.register_schema(schema_v2, 2);
936
937 let migration =
938 Migration::new(1, 2).add_column("email", SochType::Text, SochValue::Text("".into()));
939 registry.register_migration("users", migration).unwrap();
940
941 let row = VersionedRow::new(
943 1,
944 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
945 );
946 registry.migrate_row("users", row).unwrap();
947
948 let stats = registry.stats().snapshot();
949 assert_eq!(stats.rows_migrated, 1);
950 assert_eq!(stats.migrations_applied, 1);
951 assert_eq!(stats.lazy_migrations, 1);
952 }
953
954 #[test]
955 fn test_error_on_future_version() {
956 let mut registry = SchemaRegistry::new();
957 let schema = create_test_schema();
958 registry.register_schema(schema, 1);
959
960 let row = VersionedRow::new(
961 99,
962 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("Alice".into())]),
963 );
964
965 let result = registry.migrate_row("users", row);
966 assert!(result.is_err());
967 }
968
969 #[test]
970 fn test_custom_type_converter() {
971 let mut registry = SchemaRegistry::new();
972
973 let converter_idx = registry.register_converter(|v| {
975 if let SochValue::Text(s) = v {
976 SochValue::Text(s.to_uppercase())
977 } else {
978 v.clone()
979 }
980 });
981
982 let schema_v1 = SochSchema::new("users")
983 .field("id", SochType::UInt)
984 .field("name", SochType::Text);
985 registry.register_schema(schema_v1, 1);
986
987 let schema_v2 = SochSchema::new("users")
988 .field("id", SochType::UInt)
989 .field("name", SochType::Text); registry.register_schema(schema_v2, 2);
991
992 let migration = Migration::new(1, 2).change_type(
993 "name",
994 SochType::Text,
995 TypeConverter::Custom(converter_idx),
996 );
997 registry.register_migration("users", migration).unwrap();
998
999 let row_v1 = VersionedRow::new(
1000 1,
1001 SochRow::new(vec![SochValue::UInt(1), SochValue::Text("alice".into())]),
1002 );
1003
1004 let row_v2 = registry.migrate_row("users", row_v1).unwrap();
1005 assert_eq!(row_v2.data.values[1], SochValue::Text("ALICE".into()));
1006 }
1007}