Skip to main content

reddb_server/storage/schema/
registry.rs

1//! Schema Registry
2//!
3//! Manages table definitions and schema versioning.
4//! The registry stores table metadata in the database and handles migrations.
5
6use super::table::{TableDef, TableDefError};
7use std::collections::HashMap;
8use std::fmt;
9
10/// Schema registry for managing table definitions
11pub struct SchemaRegistry {
12    /// Table definitions by name
13    tables: HashMap<String, TableDef>,
14    /// Schema version
15    version: u32,
16    /// Migration history
17    migrations: Vec<Migration>,
18}
19
20impl SchemaRegistry {
21    /// Create a new empty schema registry
22    pub fn new() -> Self {
23        Self {
24            tables: HashMap::new(),
25            version: 1,
26            migrations: Vec::new(),
27        }
28    }
29
30    /// Get the current schema version
31    pub fn version(&self) -> u32 {
32        self.version
33    }
34
35    /// Create a new table
36    pub fn create_table(&mut self, table: TableDef) -> Result<(), SchemaError> {
37        // Validate table definition
38        table.validate().map_err(SchemaError::TableDef)?;
39
40        // Check if table already exists
41        if self.tables.contains_key(&table.name) {
42            return Err(SchemaError::TableExists(table.name.clone()));
43        }
44
45        // Record migration
46        self.migrations.push(Migration {
47            version: self.version,
48            operation: MigrationOp::CreateTable(table.name.clone()),
49            timestamp: current_timestamp(),
50        });
51
52        self.tables.insert(table.name.clone(), table);
53        self.version += 1;
54
55        Ok(())
56    }
57
58    /// Drop a table
59    pub fn drop_table(&mut self, name: &str) -> Result<TableDef, SchemaError> {
60        let table = self
61            .tables
62            .remove(name)
63            .ok_or_else(|| SchemaError::TableNotFound(name.to_string()))?;
64
65        // Record migration
66        self.migrations.push(Migration {
67            version: self.version,
68            operation: MigrationOp::DropTable(name.to_string()),
69            timestamp: current_timestamp(),
70        });
71
72        self.version += 1;
73
74        Ok(table)
75    }
76
77    /// Get a table definition by name
78    pub fn get_table(&self, name: &str) -> Option<&TableDef> {
79        self.tables.get(name)
80    }
81
82    /// Get a mutable table definition by name
83    pub fn get_table_mut(&mut self, name: &str) -> Option<&mut TableDef> {
84        self.tables.get_mut(name)
85    }
86
87    /// List all table names
88    pub fn list_tables(&self) -> Vec<&str> {
89        self.tables.keys().map(|s| s.as_str()).collect()
90    }
91
92    /// Get number of tables
93    pub fn table_count(&self) -> usize {
94        self.tables.len()
95    }
96
97    /// Check if a table exists
98    pub fn table_exists(&self, name: &str) -> bool {
99        self.tables.contains_key(name)
100    }
101
102    /// Add a column to an existing table
103    pub fn add_column(
104        &mut self,
105        table_name: &str,
106        column: super::table::ColumnDef,
107    ) -> Result<(), SchemaError> {
108        let table = self
109            .tables
110            .get_mut(table_name)
111            .ok_or_else(|| SchemaError::TableNotFound(table_name.to_string()))?;
112
113        // Check if column already exists
114        if table.get_column(&column.name).is_some() {
115            return Err(SchemaError::ColumnExists(column.name.clone()));
116        }
117
118        // Record migration
119        self.migrations.push(Migration {
120            version: self.version,
121            operation: MigrationOp::AddColumn {
122                table: table_name.to_string(),
123                column: column.name.clone(),
124            },
125            timestamp: current_timestamp(),
126        });
127
128        table.columns.push(column);
129        table.updated_at = current_timestamp();
130        self.version += 1;
131
132        Ok(())
133    }
134
135    /// Drop a column from an existing table
136    pub fn drop_column(&mut self, table_name: &str, column_name: &str) -> Result<(), SchemaError> {
137        let table = self
138            .tables
139            .get_mut(table_name)
140            .ok_or_else(|| SchemaError::TableNotFound(table_name.to_string()))?;
141
142        // Check if column exists
143        let idx = table
144            .column_index(column_name)
145            .ok_or_else(|| SchemaError::ColumnNotFound(column_name.to_string()))?;
146
147        // Can't drop primary key columns
148        if table.is_primary_key_column(column_name) {
149            return Err(SchemaError::CannotDropPrimaryKey(column_name.to_string()));
150        }
151
152        // Record migration
153        self.migrations.push(Migration {
154            version: self.version,
155            operation: MigrationOp::DropColumn {
156                table: table_name.to_string(),
157                column: column_name.to_string(),
158            },
159            timestamp: current_timestamp(),
160        });
161
162        table.columns.remove(idx);
163        table.updated_at = current_timestamp();
164        self.version += 1;
165
166        Ok(())
167    }
168
169    /// Create an index on a table
170    pub fn create_index(
171        &mut self,
172        table_name: &str,
173        index: super::table::IndexDef,
174    ) -> Result<(), SchemaError> {
175        let table = self
176            .tables
177            .get_mut(table_name)
178            .ok_or_else(|| SchemaError::TableNotFound(table_name.to_string()))?;
179
180        // Check if index name already exists
181        if table.indexes.iter().any(|i| i.name == index.name) {
182            return Err(SchemaError::IndexExists(index.name.clone()));
183        }
184
185        // Validate index columns exist
186        for col in &index.columns {
187            if table.get_column(col).is_none() {
188                return Err(SchemaError::ColumnNotFound(col.clone()));
189            }
190        }
191
192        // Record migration
193        self.migrations.push(Migration {
194            version: self.version,
195            operation: MigrationOp::CreateIndex {
196                table: table_name.to_string(),
197                index: index.name.clone(),
198            },
199            timestamp: current_timestamp(),
200        });
201
202        table.indexes.push(index);
203        table.updated_at = current_timestamp();
204        self.version += 1;
205
206        Ok(())
207    }
208
209    /// Drop an index from a table
210    pub fn drop_index(&mut self, table_name: &str, index_name: &str) -> Result<(), SchemaError> {
211        let table = self
212            .tables
213            .get_mut(table_name)
214            .ok_or_else(|| SchemaError::TableNotFound(table_name.to_string()))?;
215
216        let idx = table
217            .indexes
218            .iter()
219            .position(|i| i.name == index_name)
220            .ok_or_else(|| SchemaError::IndexNotFound(index_name.to_string()))?;
221
222        // Record migration
223        self.migrations.push(Migration {
224            version: self.version,
225            operation: MigrationOp::DropIndex {
226                table: table_name.to_string(),
227                index: index_name.to_string(),
228            },
229            timestamp: current_timestamp(),
230        });
231
232        table.indexes.remove(idx);
233        table.updated_at = current_timestamp();
234        self.version += 1;
235
236        Ok(())
237    }
238
239    /// Rename a table
240    pub fn rename_table(&mut self, old_name: &str, new_name: &str) -> Result<(), SchemaError> {
241        if !self.tables.contains_key(old_name) {
242            return Err(SchemaError::TableNotFound(old_name.to_string()));
243        }
244
245        if self.tables.contains_key(new_name) {
246            return Err(SchemaError::TableExists(new_name.to_string()));
247        }
248
249        let mut table = self.tables.remove(old_name).unwrap();
250        table.name = new_name.to_string();
251        table.updated_at = current_timestamp();
252
253        // Record migration
254        self.migrations.push(Migration {
255            version: self.version,
256            operation: MigrationOp::RenameTable {
257                old_name: old_name.to_string(),
258                new_name: new_name.to_string(),
259            },
260            timestamp: current_timestamp(),
261        });
262
263        self.tables.insert(new_name.to_string(), table);
264        self.version += 1;
265
266        Ok(())
267    }
268
269    /// Get migration history
270    pub fn migrations(&self) -> &[Migration] {
271        &self.migrations
272    }
273
274    /// Serialize the schema registry to bytes
275    pub fn to_bytes(&self) -> Vec<u8> {
276        let mut buf = Vec::new();
277
278        // Magic bytes
279        buf.extend_from_slice(b"RSCH");
280
281        // Version
282        buf.extend_from_slice(&self.version.to_le_bytes());
283
284        // Number of tables
285        write_varint(&mut buf, self.tables.len() as u64);
286
287        // Serialize each table
288        for table in self.tables.values() {
289            let table_bytes = table.to_bytes();
290            write_varint(&mut buf, table_bytes.len() as u64);
291            buf.extend_from_slice(&table_bytes);
292        }
293
294        // Number of migrations
295        write_varint(&mut buf, self.migrations.len() as u64);
296
297        // Serialize migrations
298        for migration in &self.migrations {
299            migration.write_to(&mut buf);
300        }
301
302        buf
303    }
304
305    /// Deserialize from bytes
306    pub fn from_bytes(data: &[u8]) -> Result<Self, SchemaError> {
307        if data.len() < 4 {
308            return Err(SchemaError::TruncatedData);
309        }
310
311        // Check magic
312        if &data[0..4] != b"RSCH" {
313            return Err(SchemaError::InvalidMagic);
314        }
315
316        let mut offset = 4;
317
318        // Version
319        if data.len() < offset + 4 {
320            return Err(SchemaError::TruncatedData);
321        }
322        let version = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
323        offset += 4;
324
325        // Number of tables
326        let (table_count, varint_len) = read_varint(&data[offset..])?;
327        offset += varint_len;
328
329        let mut tables = HashMap::with_capacity(table_count as usize);
330
331        for _ in 0..table_count {
332            let (table_len, varint_len) = read_varint(&data[offset..])?;
333            offset += varint_len;
334
335            if data.len() < offset + table_len as usize {
336                return Err(SchemaError::TruncatedData);
337            }
338
339            let table = TableDef::from_bytes(&data[offset..offset + table_len as usize])
340                .map_err(SchemaError::TableDef)?;
341            offset += table_len as usize;
342
343            tables.insert(table.name.clone(), table);
344        }
345
346        // Number of migrations
347        let (migration_count, varint_len) = read_varint(&data[offset..])?;
348        offset += varint_len;
349
350        let mut migrations = Vec::with_capacity(migration_count as usize);
351
352        for _ in 0..migration_count {
353            let (migration, migration_len) = Migration::read_from(&data[offset..])?;
354            offset += migration_len;
355            migrations.push(migration);
356        }
357
358        Ok(Self {
359            tables,
360            version,
361            migrations,
362        })
363    }
364
365    /// Clear all tables (for testing)
366    #[cfg(test)]
367    pub fn clear(&mut self) {
368        self.tables.clear();
369        self.version = 1;
370        self.migrations.clear();
371    }
372}
373
374impl Default for SchemaRegistry {
375    fn default() -> Self {
376        Self::new()
377    }
378}
379
380impl fmt::Display for SchemaRegistry {
381    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382        writeln!(f, "Schema Registry v{}", self.version)?;
383        writeln!(f, "Tables: {}", self.tables.len())?;
384        for table in self.tables.values() {
385            writeln!(f, "  - {} ({} columns)", table.name, table.columns.len())?;
386        }
387        Ok(())
388    }
389}
390
391/// Migration record
392#[derive(Debug, Clone)]
393pub struct Migration {
394    /// Schema version at migration time
395    pub version: u32,
396    /// Migration operation
397    pub operation: MigrationOp,
398    /// Timestamp
399    pub timestamp: u64,
400}
401
402impl Migration {
403    fn write_to(&self, buf: &mut Vec<u8>) {
404        buf.extend_from_slice(&self.version.to_le_bytes());
405        buf.extend_from_slice(&self.timestamp.to_le_bytes());
406        self.operation.write_to(buf);
407    }
408
409    fn read_from(data: &[u8]) -> Result<(Self, usize), SchemaError> {
410        if data.len() < 12 {
411            return Err(SchemaError::TruncatedData);
412        }
413
414        let version = u32::from_le_bytes(data[0..4].try_into().unwrap());
415        let timestamp = u64::from_le_bytes(data[4..12].try_into().unwrap());
416
417        let (operation, op_len) = MigrationOp::read_from(&data[12..])?;
418
419        Ok((
420            Self {
421                version,
422                operation,
423                timestamp,
424            },
425            12 + op_len,
426        ))
427    }
428}
429
430/// Migration operation
431#[derive(Debug, Clone)]
432pub enum MigrationOp {
433    /// Create a new table
434    CreateTable(String),
435    /// Drop a table
436    DropTable(String),
437    /// Add a column
438    AddColumn { table: String, column: String },
439    /// Drop a column
440    DropColumn { table: String, column: String },
441    /// Create an index
442    CreateIndex { table: String, index: String },
443    /// Drop an index
444    DropIndex { table: String, index: String },
445    /// Rename a table
446    RenameTable { old_name: String, new_name: String },
447}
448
449impl MigrationOp {
450    fn write_to(&self, buf: &mut Vec<u8>) {
451        match self {
452            MigrationOp::CreateTable(name) => {
453                buf.push(1);
454                write_string(buf, name);
455            }
456            MigrationOp::DropTable(name) => {
457                buf.push(2);
458                write_string(buf, name);
459            }
460            MigrationOp::AddColumn { table, column } => {
461                buf.push(3);
462                write_string(buf, table);
463                write_string(buf, column);
464            }
465            MigrationOp::DropColumn { table, column } => {
466                buf.push(4);
467                write_string(buf, table);
468                write_string(buf, column);
469            }
470            MigrationOp::CreateIndex { table, index } => {
471                buf.push(5);
472                write_string(buf, table);
473                write_string(buf, index);
474            }
475            MigrationOp::DropIndex { table, index } => {
476                buf.push(6);
477                write_string(buf, table);
478                write_string(buf, index);
479            }
480            MigrationOp::RenameTable { old_name, new_name } => {
481                buf.push(7);
482                write_string(buf, old_name);
483                write_string(buf, new_name);
484            }
485        }
486    }
487
488    fn read_from(data: &[u8]) -> Result<(Self, usize), SchemaError> {
489        if data.is_empty() {
490            return Err(SchemaError::TruncatedData);
491        }
492
493        let op_type = data[0];
494        let mut offset = 1;
495
496        let op = match op_type {
497            1 => {
498                let (name, len) = read_string(&data[offset..])?;
499                offset += len;
500                MigrationOp::CreateTable(name)
501            }
502            2 => {
503                let (name, len) = read_string(&data[offset..])?;
504                offset += len;
505                MigrationOp::DropTable(name)
506            }
507            3 => {
508                let (table, len) = read_string(&data[offset..])?;
509                offset += len;
510                let (column, len) = read_string(&data[offset..])?;
511                offset += len;
512                MigrationOp::AddColumn { table, column }
513            }
514            4 => {
515                let (table, len) = read_string(&data[offset..])?;
516                offset += len;
517                let (column, len) = read_string(&data[offset..])?;
518                offset += len;
519                MigrationOp::DropColumn { table, column }
520            }
521            5 => {
522                let (table, len) = read_string(&data[offset..])?;
523                offset += len;
524                let (index, len) = read_string(&data[offset..])?;
525                offset += len;
526                MigrationOp::CreateIndex { table, index }
527            }
528            6 => {
529                let (table, len) = read_string(&data[offset..])?;
530                offset += len;
531                let (index, len) = read_string(&data[offset..])?;
532                offset += len;
533                MigrationOp::DropIndex { table, index }
534            }
535            7 => {
536                let (old_name, len) = read_string(&data[offset..])?;
537                offset += len;
538                let (new_name, len) = read_string(&data[offset..])?;
539                offset += len;
540                MigrationOp::RenameTable { old_name, new_name }
541            }
542            _ => return Err(SchemaError::InvalidMigrationOp),
543        };
544
545        Ok((op, offset))
546    }
547}
548
549/// Schema errors
550#[derive(Debug)]
551pub enum SchemaError {
552    /// Table already exists
553    TableExists(String),
554    /// Table not found
555    TableNotFound(String),
556    /// Column already exists
557    ColumnExists(String),
558    /// Column not found
559    ColumnNotFound(String),
560    /// Index already exists
561    IndexExists(String),
562    /// Index not found
563    IndexNotFound(String),
564    /// Cannot drop primary key column
565    CannotDropPrimaryKey(String),
566    /// Table definition error
567    TableDef(TableDefError),
568    /// Truncated data
569    TruncatedData,
570    /// Invalid magic bytes
571    InvalidMagic,
572    /// Invalid migration operation
573    InvalidMigrationOp,
574    /// Varint overflow
575    VarintOverflow,
576}
577
578impl fmt::Display for SchemaError {
579    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
580        match self {
581            SchemaError::TableExists(name) => write!(f, "table already exists: {}", name),
582            SchemaError::TableNotFound(name) => write!(f, "table not found: {}", name),
583            SchemaError::ColumnExists(name) => write!(f, "column already exists: {}", name),
584            SchemaError::ColumnNotFound(name) => write!(f, "column not found: {}", name),
585            SchemaError::IndexExists(name) => write!(f, "index already exists: {}", name),
586            SchemaError::IndexNotFound(name) => write!(f, "index not found: {}", name),
587            SchemaError::CannotDropPrimaryKey(name) => {
588                write!(f, "cannot drop primary key column: {}", name)
589            }
590            SchemaError::TableDef(e) => write!(f, "table definition error: {}", e),
591            SchemaError::TruncatedData => write!(f, "truncated data"),
592            SchemaError::InvalidMagic => write!(f, "invalid magic bytes"),
593            SchemaError::InvalidMigrationOp => write!(f, "invalid migration operation"),
594            SchemaError::VarintOverflow => write!(f, "varint overflow"),
595        }
596    }
597}
598
599impl std::error::Error for SchemaError {}
600
601impl From<TableDefError> for SchemaError {
602    fn from(e: TableDefError) -> Self {
603        SchemaError::TableDef(e)
604    }
605}
606
607/// Get current timestamp
608fn current_timestamp() -> u64 {
609    std::time::SystemTime::now()
610        .duration_since(std::time::UNIX_EPOCH)
611        .unwrap_or_default()
612        .as_secs()
613}
614
615/// Write a variable-length integer
616fn write_varint(buf: &mut Vec<u8>, mut value: u64) {
617    loop {
618        let mut byte = (value & 0x7F) as u8;
619        value >>= 7;
620        if value != 0 {
621            byte |= 0x80;
622        }
623        buf.push(byte);
624        if value == 0 {
625            break;
626        }
627    }
628}
629
630/// Read a variable-length integer
631fn read_varint(data: &[u8]) -> Result<(u64, usize), SchemaError> {
632    let mut result: u64 = 0;
633    let mut shift = 0;
634    let mut offset = 0;
635
636    loop {
637        if offset >= data.len() {
638            return Err(SchemaError::TruncatedData);
639        }
640        let byte = data[offset];
641        offset += 1;
642
643        if shift >= 64 {
644            return Err(SchemaError::VarintOverflow);
645        }
646
647        result |= ((byte & 0x7F) as u64) << shift;
648        shift += 7;
649
650        if byte & 0x80 == 0 {
651            break;
652        }
653    }
654
655    Ok((result, offset))
656}
657
658/// Write a length-prefixed string
659fn write_string(buf: &mut Vec<u8>, s: &str) {
660    let bytes = s.as_bytes();
661    write_varint(buf, bytes.len() as u64);
662    buf.extend_from_slice(bytes);
663}
664
665/// Read a length-prefixed string
666fn read_string(data: &[u8]) -> Result<(String, usize), SchemaError> {
667    let (len, varint_len) = read_varint(data)?;
668    let offset = varint_len;
669    if data.len() < offset + len as usize {
670        return Err(SchemaError::TruncatedData);
671    }
672    let s = String::from_utf8(data[offset..offset + len as usize].to_vec())
673        .map_err(|_| SchemaError::TruncatedData)?;
674    Ok((s, offset + len as usize))
675}
676
677#[cfg(test)]
678mod tests {
679    use super::super::table::{ColumnDef, IndexDef, IndexType};
680    use super::super::types::DataType;
681    use super::*;
682
683    #[test]
684    fn test_create_table() {
685        let mut registry = SchemaRegistry::new();
686
687        let table = TableDef::new("hosts")
688            .add_column(ColumnDef::new("id", DataType::UnsignedInteger).not_null())
689            .add_column(ColumnDef::new("ip", DataType::IpAddr).not_null())
690            .primary_key(vec!["id".to_string()]);
691
692        assert!(registry.create_table(table).is_ok());
693        assert!(registry.table_exists("hosts"));
694        assert_eq!(registry.table_count(), 1);
695    }
696
697    #[test]
698    fn test_duplicate_table() {
699        let mut registry = SchemaRegistry::new();
700
701        let table1 = TableDef::new("hosts").add_column(ColumnDef::new("id", DataType::Integer));
702
703        let table2 = TableDef::new("hosts").add_column(ColumnDef::new("id", DataType::Integer));
704
705        assert!(registry.create_table(table1).is_ok());
706        assert!(matches!(
707            registry.create_table(table2),
708            Err(SchemaError::TableExists(_))
709        ));
710    }
711
712    #[test]
713    fn test_drop_table() {
714        let mut registry = SchemaRegistry::new();
715
716        let table = TableDef::new("hosts").add_column(ColumnDef::new("id", DataType::Integer));
717
718        registry.create_table(table).unwrap();
719        assert!(registry.table_exists("hosts"));
720
721        let dropped = registry.drop_table("hosts").unwrap();
722        assert_eq!(dropped.name, "hosts");
723        assert!(!registry.table_exists("hosts"));
724    }
725
726    #[test]
727    fn test_add_column() {
728        let mut registry = SchemaRegistry::new();
729
730        let table = TableDef::new("hosts").add_column(ColumnDef::new("id", DataType::Integer));
731
732        registry.create_table(table).unwrap();
733
734        let new_col = ColumnDef::new("hostname", DataType::Text);
735        assert!(registry.add_column("hosts", new_col).is_ok());
736
737        let table = registry.get_table("hosts").unwrap();
738        assert_eq!(table.columns.len(), 2);
739        assert!(table.get_column("hostname").is_some());
740    }
741
742    #[test]
743    fn test_drop_column() {
744        let mut registry = SchemaRegistry::new();
745
746        let table = TableDef::new("hosts")
747            .add_column(ColumnDef::new("id", DataType::Integer).not_null())
748            .add_column(ColumnDef::new("hostname", DataType::Text))
749            .primary_key(vec!["id".to_string()]);
750
751        registry.create_table(table).unwrap();
752
753        // Can drop non-PK column
754        assert!(registry.drop_column("hosts", "hostname").is_ok());
755
756        // Cannot drop PK column
757        assert!(matches!(
758            registry.drop_column("hosts", "id"),
759            Err(SchemaError::CannotDropPrimaryKey(_))
760        ));
761    }
762
763    #[test]
764    fn test_create_index() {
765        let mut registry = SchemaRegistry::new();
766
767        let table = TableDef::new("hosts")
768            .add_column(ColumnDef::new("id", DataType::Integer))
769            .add_column(ColumnDef::new("ip", DataType::IpAddr));
770
771        registry.create_table(table).unwrap();
772
773        let index = IndexDef::new("idx_ip", vec!["ip".to_string()]).unique();
774        assert!(registry.create_index("hosts", index).is_ok());
775
776        let table = registry.get_table("hosts").unwrap();
777        assert_eq!(table.indexes.len(), 1);
778        assert!(table.indexes[0].unique);
779    }
780
781    #[test]
782    fn test_rename_table() {
783        let mut registry = SchemaRegistry::new();
784
785        let table = TableDef::new("old_name").add_column(ColumnDef::new("id", DataType::Integer));
786
787        registry.create_table(table).unwrap();
788        assert!(registry.rename_table("old_name", "new_name").is_ok());
789
790        assert!(!registry.table_exists("old_name"));
791        assert!(registry.table_exists("new_name"));
792
793        let table = registry.get_table("new_name").unwrap();
794        assert_eq!(table.name, "new_name");
795    }
796
797    #[test]
798    fn test_migration_history() {
799        let mut registry = SchemaRegistry::new();
800
801        let table = TableDef::new("hosts").add_column(ColumnDef::new("id", DataType::Integer));
802
803        registry.create_table(table).unwrap();
804        registry
805            .add_column("hosts", ColumnDef::new("ip", DataType::IpAddr))
806            .unwrap();
807        registry
808            .create_index("hosts", IndexDef::new("idx_ip", vec!["ip".to_string()]))
809            .unwrap();
810
811        assert_eq!(registry.migrations().len(), 3);
812        assert!(matches!(
813            &registry.migrations()[0].operation,
814            MigrationOp::CreateTable(_)
815        ));
816        assert!(matches!(
817            &registry.migrations()[1].operation,
818            MigrationOp::AddColumn { .. }
819        ));
820        assert!(matches!(
821            &registry.migrations()[2].operation,
822            MigrationOp::CreateIndex { .. }
823        ));
824    }
825
826    #[test]
827    fn test_registry_roundtrip() {
828        let mut registry = SchemaRegistry::new();
829
830        let table1 = TableDef::new("hosts")
831            .add_column(ColumnDef::new("id", DataType::UnsignedInteger).not_null())
832            .add_column(ColumnDef::new("ip", DataType::IpAddr).not_null())
833            .add_column(ColumnDef::new("embedding", DataType::Vector).with_vector_dim(128))
834            .primary_key(vec!["id".to_string()])
835            .add_index(IndexDef::new("idx_ip", vec!["ip".to_string()]).unique())
836            .add_index(
837                IndexDef::new("idx_embedding", vec!["embedding".to_string()])
838                    .with_type(IndexType::IvfFlat),
839            );
840
841        let table2 = TableDef::new("ports")
842            .add_column(ColumnDef::new("host_id", DataType::UnsignedInteger))
843            .add_column(ColumnDef::new("port", DataType::UnsignedInteger))
844            .add_column(ColumnDef::new("status", DataType::Text));
845
846        registry.create_table(table1).unwrap();
847        registry.create_table(table2).unwrap();
848
849        let bytes = registry.to_bytes();
850        let recovered = SchemaRegistry::from_bytes(&bytes).unwrap();
851
852        assert_eq!(registry.version(), recovered.version());
853        assert_eq!(registry.table_count(), recovered.table_count());
854
855        for name in registry.list_tables() {
856            assert!(recovered.table_exists(name));
857            let orig = registry.get_table(name).unwrap();
858            let rec = recovered.get_table(name).unwrap();
859            assert_eq!(orig.columns.len(), rec.columns.len());
860            assert_eq!(orig.indexes.len(), rec.indexes.len());
861        }
862    }
863
864    #[test]
865    fn test_registry_display() {
866        let mut registry = SchemaRegistry::new();
867
868        let table = TableDef::new("hosts")
869            .add_column(ColumnDef::new("id", DataType::Integer))
870            .add_column(ColumnDef::new("ip", DataType::IpAddr));
871
872        registry.create_table(table).unwrap();
873
874        let display = format!("{}", registry);
875        assert!(display.contains("Schema Registry"));
876        assert!(display.contains("hosts"));
877    }
878}