Skip to main content

hematite/catalog/
table.rs

1//! Table definitions for database tables
2
3use super::column::Column;
4use super::ids::TableId;
5use super::types::Value;
6use crate::catalog::object::{NamedConstraint, NamedConstraintKind};
7use crate::HematiteError;
8use crate::Result;
9use std::collections::HashMap;
10
11#[derive(Debug, Clone)]
12pub struct Table {
13    pub id: TableId,
14    pub name: String,
15    pub columns: Vec<Column>,
16    pub column_indices: HashMap<String, usize>,
17    pub primary_key_columns: Vec<usize>,
18    pub secondary_indexes: Vec<SecondaryIndex>,
19    pub check_constraints: Vec<CheckConstraint>,
20    pub foreign_keys: Vec<ForeignKeyConstraint>,
21    pub root_page_id: u32,
22    pub primary_key_index_root_page_id: u32,
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct SecondaryIndex {
27    pub name: String,
28    pub column_indices: Vec<usize>,
29    pub root_page_id: u32,
30    pub unique: bool,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq)]
34pub struct CheckConstraint {
35    pub name: Option<String>,
36    pub expression_sql: String,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
40pub struct ForeignKeyConstraint {
41    pub name: Option<String>,
42    pub column_indices: Vec<usize>,
43    pub referenced_table: String,
44    pub referenced_columns: Vec<String>,
45    pub on_delete: ForeignKeyAction,
46    pub on_update: ForeignKeyAction,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum ForeignKeyAction {
51    Restrict,
52    Cascade,
53    SetNull,
54}
55
56impl ForeignKeyConstraint {
57    fn validate_for_table(&self, table_name: &str, column_count: usize) -> Result<()> {
58        if self.column_indices.is_empty() {
59            return Err(HematiteError::StorageError(
60                "Foreign key must reference at least one local column".to_string(),
61            ));
62        }
63        for &column_index in &self.column_indices {
64            if column_index >= column_count {
65                return Err(HematiteError::StorageError(format!(
66                    "Foreign key references invalid column index {}",
67                    column_index
68                )));
69            }
70        }
71        if self.column_indices.len() != self.referenced_columns.len() {
72            return Err(HematiteError::StorageError(format!(
73                "Foreign key has {} local columns but {} referenced columns",
74                self.column_indices.len(),
75                self.referenced_columns.len()
76            )));
77        }
78        if self.referenced_table.is_empty() {
79            return Err(HematiteError::StorageError(format!(
80                "Foreign key on table '{}' must reference a table name",
81                table_name
82            )));
83        }
84        Ok(())
85    }
86
87    fn serialize(&self, buffer: &mut Vec<u8>) {
88        write_optional_string(buffer, self.name.as_deref());
89        buffer.extend_from_slice(&(self.column_indices.len() as u32).to_le_bytes());
90        for &column_index in &self.column_indices {
91            buffer.extend_from_slice(&(column_index as u32).to_le_bytes());
92        }
93        write_string(buffer, &self.referenced_table);
94        buffer.extend_from_slice(&(self.referenced_columns.len() as u32).to_le_bytes());
95        for referenced_column in &self.referenced_columns {
96            write_string(buffer, referenced_column);
97        }
98        buffer.push(foreign_key_action_to_byte(self.on_delete));
99        buffer.push(foreign_key_action_to_byte(self.on_update));
100    }
101
102    fn deserialize(buffer: &[u8], offset: &mut usize) -> Result<Self> {
103        let name = read_optional_string(buffer, offset, "foreign key name")?;
104        let local_column_count = read_len(buffer, offset, "foreign key local column count")?;
105        let mut column_indices = Vec::with_capacity(local_column_count);
106        for _ in 0..local_column_count {
107            column_indices.push(read_len(buffer, offset, "foreign key column index")?);
108        }
109        let table_len = read_len(buffer, offset, "foreign key table length")?;
110        let referenced_table = read_string(buffer, offset, table_len, "foreign key table")?;
111        let referenced_column_count =
112            read_len(buffer, offset, "foreign key referenced column count")?;
113        let mut referenced_columns = Vec::with_capacity(referenced_column_count);
114        for _ in 0..referenced_column_count {
115            let column_len = read_len(buffer, offset, "foreign key column length")?;
116            referenced_columns.push(read_string(
117                buffer,
118                offset,
119                column_len,
120                "foreign key column",
121            )?);
122        }
123        let on_delete = read_foreign_key_action(buffer, offset, "foreign key ON DELETE action")?;
124        let on_update = read_foreign_key_action(buffer, offset, "foreign key ON UPDATE action")?;
125        Ok(Self {
126            name,
127            column_indices,
128            referenced_table,
129            referenced_columns,
130            on_delete,
131            on_update,
132        })
133    }
134}
135
136impl Table {
137    pub fn new(
138        id: TableId,
139        name: String,
140        mut columns: Vec<Column>,
141        root_page_id: u32,
142    ) -> Result<Self> {
143        let mut column_indices = HashMap::new();
144        let mut primary_key_columns = Vec::new();
145
146        for column in &mut columns {
147            if column.primary_key {
148                column.nullable = false;
149            }
150        }
151
152        for (index, column) in columns.iter().enumerate() {
153            column_indices.insert(column.name.clone(), index);
154            if column.primary_key {
155                primary_key_columns.push(index);
156            }
157        }
158
159        // Validate that at least one column exists
160        if columns.is_empty() {
161            return Err(HematiteError::StorageError(
162                "Table must have at least one column".to_string(),
163            ));
164        }
165
166        // Validate primary key
167        if primary_key_columns.is_empty() {
168            return Err(HematiteError::StorageError(
169                "Table must have at least one primary key column".to_string(),
170            ));
171        }
172
173        Ok(Self {
174            id,
175            name,
176            columns,
177            column_indices,
178            primary_key_columns,
179            secondary_indexes: Vec::new(),
180            check_constraints: Vec::new(),
181            foreign_keys: Vec::new(),
182            root_page_id,
183            primary_key_index_root_page_id: 0,
184        })
185    }
186
187    pub fn get_column_by_name(&self, name: &str) -> Option<&Column> {
188        self.column_indices
189            .get(name)
190            .map(|&index| &self.columns[index])
191    }
192
193    pub fn get_column_index(&self, name: &str) -> Option<usize> {
194        self.column_indices.get(name).copied()
195    }
196
197    pub fn column_count(&self) -> usize {
198        self.columns.len()
199    }
200
201    pub fn primary_key_count(&self) -> usize {
202        self.primary_key_columns.len()
203    }
204
205    pub fn validate_row(&self, values: &[Value]) -> Result<()> {
206        if values.len() != self.columns.len() {
207            return Err(HematiteError::StorageError(format!(
208                "Expected {} values, got {}",
209                self.columns.len(),
210                values.len()
211            )));
212        }
213
214        for (column, value) in self.columns.iter().zip(values.iter()) {
215            if !column.validate_value(value) {
216                return Err(HematiteError::StorageError(format!(
217                    "Invalid value for column '{}': {:?}",
218                    column.name, value
219                )));
220            }
221        }
222
223        Ok(())
224    }
225
226    pub fn get_primary_key_values(&self, values: &[Value]) -> Result<Vec<Value>> {
227        self.primary_key_columns
228            .iter()
229            .map(|&index| {
230                if index < values.len() {
231                    Ok(values[index].clone())
232                } else {
233                    Err(HematiteError::StorageError(
234                        "Primary key value not found".to_string(),
235                    ))
236                }
237            })
238            .collect()
239    }
240
241    pub fn row_size(&self) -> usize {
242        self.columns.iter().map(|col| col.size()).sum()
243    }
244
245    pub fn get_secondary_index(&self, name: &str) -> Option<&SecondaryIndex> {
246        self.secondary_indexes
247            .iter()
248            .find(|index| index.name == name)
249    }
250
251    pub fn add_secondary_index(&mut self, index: SecondaryIndex) -> Result<()> {
252        if self.get_secondary_index(&index.name).is_some() {
253            return Err(HematiteError::StorageError(format!(
254                "Secondary index '{}' already exists on table '{}'",
255                index.name, self.name
256            )));
257        }
258
259        if index.column_indices.is_empty() {
260            return Err(HematiteError::StorageError(
261                "Secondary index must reference at least one column".to_string(),
262            ));
263        }
264
265        for &column_index in &index.column_indices {
266            if column_index >= self.columns.len() {
267                return Err(HematiteError::StorageError(format!(
268                    "Secondary index '{}' references invalid column index {}",
269                    index.name, column_index
270                )));
271            }
272        }
273
274        self.secondary_indexes.push(index);
275        Ok(())
276    }
277
278    pub fn list_named_constraints(&self) -> Vec<NamedConstraint> {
279        let mut constraints = Vec::new();
280        constraints.extend(self.check_constraints.iter().filter_map(|constraint| {
281            constraint.name.as_ref().map(|name| NamedConstraint {
282                table_name: self.name.clone(),
283                name: name.clone(),
284                kind: NamedConstraintKind::Check,
285            })
286        }));
287        constraints.extend(self.foreign_keys.iter().filter_map(|constraint| {
288            constraint.name.as_ref().map(|name| NamedConstraint {
289                table_name: self.name.clone(),
290                name: name.clone(),
291                kind: NamedConstraintKind::ForeignKey,
292            })
293        }));
294        constraints.extend(
295            self.secondary_indexes
296                .iter()
297                .filter(|index| index.unique)
298                .map(|index| NamedConstraint {
299                    table_name: self.name.clone(),
300                    name: index.name.clone(),
301                    kind: NamedConstraintKind::Unique,
302                }),
303        );
304        constraints
305    }
306
307    pub fn named_constraint(&self, name: &str) -> Option<NamedConstraint> {
308        self.list_named_constraints()
309            .into_iter()
310            .find(|constraint| constraint.name == name)
311    }
312
313    pub fn add_column(&mut self, column: Column) -> Result<()> {
314        if self.column_indices.contains_key(&column.name) {
315            return Err(HematiteError::StorageError(format!(
316                "Column '{}' already exists in table '{}'",
317                column.name, self.name
318            )));
319        }
320        if column.primary_key {
321            return Err(HematiteError::StorageError(
322                "Cannot add a primary-key column to an existing table".to_string(),
323            ));
324        }
325
326        let index = self.columns.len();
327        self.column_indices.insert(column.name.clone(), index);
328        self.columns.push(column);
329        Ok(())
330    }
331
332    pub fn rename_column(&mut self, old_name: &str, new_name: String) -> Result<()> {
333        if self.column_indices.contains_key(&new_name) {
334            return Err(HematiteError::StorageError(format!(
335                "Column '{}' already exists in table '{}'",
336                new_name, self.name
337            )));
338        }
339
340        let index = self.get_column_index(old_name).ok_or_else(|| {
341            HematiteError::StorageError(format!(
342                "Column '{}' does not exist in table '{}'",
343                old_name, self.name
344            ))
345        })?;
346
347        self.columns[index].name = new_name.clone();
348        self.column_indices.remove(old_name);
349        self.column_indices.insert(new_name.clone(), index);
350        self.rewrite_check_constraints(old_name, &new_name)?;
351        Ok(())
352    }
353
354    pub fn drop_column(&mut self, name: &str) -> Result<usize> {
355        let index = self.get_column_index(name).ok_or_else(|| {
356            HematiteError::StorageError(format!(
357                "Column '{}' does not exist in table '{}'",
358                name, self.name
359            ))
360        })?;
361
362        if self.columns.len() == 1 {
363            return Err(HematiteError::StorageError(
364                "Cannot drop the last column from a table".to_string(),
365            ));
366        }
367        if self.primary_key_columns.contains(&index) {
368            return Err(HematiteError::StorageError(format!(
369                "Cannot drop primary-key column '{}'",
370                name
371            )));
372        }
373        if self
374            .secondary_indexes
375            .iter()
376            .any(|secondary_index| secondary_index.column_indices.contains(&index))
377        {
378            return Err(HematiteError::StorageError(format!(
379                "Cannot drop column '{}' because it is used by an index",
380                name
381            )));
382        }
383        if self
384            .foreign_keys
385            .iter()
386            .any(|foreign_key| foreign_key.column_indices.contains(&index))
387        {
388            return Err(HematiteError::StorageError(format!(
389                "Cannot drop column '{}' because it is used by a foreign key",
390                name
391            )));
392        }
393
394        self.columns.remove(index);
395        self.rebuild_column_indices();
396        self.primary_key_columns = self
397            .primary_key_columns
398            .iter()
399            .filter_map(|&primary_key_index| {
400                if primary_key_index == index {
401                    None
402                } else if primary_key_index > index {
403                    Some(primary_key_index - 1)
404                } else {
405                    Some(primary_key_index)
406                }
407            })
408            .collect();
409        for foreign_key in &mut self.foreign_keys {
410            for column_index in &mut foreign_key.column_indices {
411                if *column_index > index {
412                    *column_index -= 1;
413                }
414            }
415        }
416
417        Ok(index)
418    }
419
420    pub fn set_column_default(&mut self, name: &str, default_value: Option<Value>) -> Result<()> {
421        let index = self.get_column_index(name).ok_or_else(|| {
422            HematiteError::StorageError(format!(
423                "Column '{}' does not exist in table '{}'",
424                name, self.name
425            ))
426        })?;
427        self.columns[index].default_value = default_value;
428        Ok(())
429    }
430
431    pub fn set_column_nullable(&mut self, name: &str, nullable: bool) -> Result<()> {
432        let index = self.get_column_index(name).ok_or_else(|| {
433            HematiteError::StorageError(format!(
434                "Column '{}' does not exist in table '{}'",
435                name, self.name
436            ))
437        })?;
438        if self.columns[index].primary_key && nullable {
439            return Err(HematiteError::StorageError(format!(
440                "Primary-key column '{}' cannot be nullable",
441                name
442            )));
443        }
444        if self.columns[index].auto_increment && nullable {
445            return Err(HematiteError::StorageError(format!(
446                "AUTO_INCREMENT column '{}' cannot be nullable",
447                name
448            )));
449        }
450        self.columns[index].nullable = nullable;
451        Ok(())
452    }
453
454    pub fn rewrite_inbound_referenced_column(
455        &mut self,
456        referenced_table: &str,
457        old_name: &str,
458        new_name: &str,
459    ) {
460        for foreign_key in &mut self.foreign_keys {
461            if foreign_key.referenced_table == referenced_table {
462                for referenced_column in &mut foreign_key.referenced_columns {
463                    if referenced_column == old_name {
464                        *referenced_column = new_name.to_string();
465                    }
466                }
467            }
468        }
469    }
470
471    fn rewrite_check_constraints(&mut self, old_name: &str, new_name: &str) -> Result<()> {
472        for constraint in &mut self.check_constraints {
473            let mut condition =
474                crate::parser::parser::parse_condition_fragment(&constraint.expression_sql)?;
475            condition.rename_column_references(old_name, new_name, Some(&self.name));
476            constraint.expression_sql = condition.to_sql();
477        }
478        Ok(())
479    }
480
481    fn rebuild_column_indices(&mut self) {
482        self.column_indices = self
483            .columns
484            .iter()
485            .enumerate()
486            .map(|(index, column)| (column.name.clone(), index))
487            .collect();
488    }
489
490    pub fn add_check_constraint(&mut self, constraint: CheckConstraint) -> Result<()> {
491        if let Some(name) = &constraint.name {
492            if self.named_constraint(name).is_some() {
493                return Err(HematiteError::StorageError(format!(
494                    "Constraint '{}' already exists on table '{}'",
495                    name, self.name
496                )));
497            }
498        }
499        self.check_constraints.push(constraint);
500        Ok(())
501    }
502
503    pub fn add_foreign_key(&mut self, constraint: ForeignKeyConstraint) -> Result<()> {
504        constraint.validate_for_table(&self.name, self.columns.len())?;
505        if let Some(name) = &constraint.name {
506            if self.named_constraint(name).is_some() {
507                return Err(HematiteError::StorageError(format!(
508                    "Constraint '{}' already exists on table '{}'",
509                    name, self.name
510                )));
511            }
512        }
513        self.foreign_keys.push(constraint);
514        Ok(())
515    }
516
517    pub fn drop_secondary_index(&mut self, name: &str) -> Result<SecondaryIndex> {
518        let index = self
519            .secondary_indexes
520            .iter()
521            .position(|index| index.name == name)
522            .ok_or_else(|| {
523                HematiteError::StorageError(format!(
524                    "Secondary index '{}' does not exist on table '{}'",
525                    name, self.name
526                ))
527            })?;
528
529        Ok(self.secondary_indexes.remove(index))
530    }
531
532    pub fn drop_named_constraint(&mut self, name: &str) -> Result<NamedConstraintKind> {
533        if let Some(index) = self
534            .check_constraints
535            .iter()
536            .position(|constraint| constraint.name.as_deref() == Some(name))
537        {
538            self.check_constraints.remove(index);
539            return Ok(NamedConstraintKind::Check);
540        }
541
542        if let Some(index) = self
543            .foreign_keys
544            .iter()
545            .position(|constraint| constraint.name.as_deref() == Some(name))
546        {
547            self.foreign_keys.remove(index);
548            return Ok(NamedConstraintKind::ForeignKey);
549        }
550
551        if let Some(index) = self
552            .secondary_indexes
553            .iter()
554            .position(|constraint| constraint.unique && constraint.name == name)
555        {
556            self.secondary_indexes.remove(index);
557            return Ok(NamedConstraintKind::Unique);
558        }
559
560        Err(HematiteError::StorageError(format!(
561            "Constraint '{}' does not exist on table '{}'",
562            name, self.name
563        )))
564    }
565
566    pub fn serialize(&self, buffer: &mut Vec<u8>) -> Result<()> {
567        // Table ID (4 bytes)
568        buffer.extend_from_slice(&self.id.as_u32().to_le_bytes());
569
570        // Name length (4 bytes) + name
571        let name_bytes = self.name.as_bytes();
572        buffer.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
573        buffer.extend_from_slice(name_bytes);
574
575        // Root page ID (4 bytes)
576        buffer.extend_from_slice(&self.root_page_id.to_le_bytes());
577        buffer.extend_from_slice(&self.primary_key_index_root_page_id.to_le_bytes());
578
579        // Column count (4 bytes)
580        buffer.extend_from_slice(&(self.columns.len() as u32).to_le_bytes());
581
582        // Columns
583        for column in &self.columns {
584            column.serialize(buffer)?;
585        }
586
587        // Primary key column count (4 bytes)
588        buffer.extend_from_slice(&(self.primary_key_columns.len() as u32).to_le_bytes());
589
590        // Primary key column indices
591        for &index in &self.primary_key_columns {
592            buffer.extend_from_slice(&(index as u32).to_le_bytes());
593        }
594
595        // Secondary indexes
596        buffer.extend_from_slice(&(self.secondary_indexes.len() as u32).to_le_bytes());
597        for index in &self.secondary_indexes {
598            let name_bytes = index.name.as_bytes();
599            buffer.extend_from_slice(&(name_bytes.len() as u32).to_le_bytes());
600            buffer.extend_from_slice(name_bytes);
601            buffer.extend_from_slice(&index.root_page_id.to_le_bytes());
602            buffer.push(index.unique as u8);
603            buffer.extend_from_slice(&(index.column_indices.len() as u32).to_le_bytes());
604            for &column_index in &index.column_indices {
605                buffer.extend_from_slice(&(column_index as u32).to_le_bytes());
606            }
607        }
608
609        buffer.extend_from_slice(&(self.check_constraints.len() as u32).to_le_bytes());
610        for constraint in &self.check_constraints {
611            match &constraint.name {
612                Some(name) => {
613                    buffer.push(1);
614                    let bytes = name.as_bytes();
615                    buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
616                    buffer.extend_from_slice(bytes);
617                }
618                None => buffer.push(0),
619            }
620            let expression_bytes = constraint.expression_sql.as_bytes();
621            buffer.extend_from_slice(&(expression_bytes.len() as u32).to_le_bytes());
622            buffer.extend_from_slice(expression_bytes);
623        }
624
625        buffer.extend_from_slice(&(self.foreign_keys.len() as u32).to_le_bytes());
626        for constraint in &self.foreign_keys {
627            constraint.serialize(buffer);
628        }
629
630        Ok(())
631    }
632
633    pub fn deserialize(buffer: &[u8], offset: &mut usize) -> Result<Self> {
634        if *offset + 16 > buffer.len() {
635            return Err(HematiteError::CorruptedData(
636                "Invalid table header".to_string(),
637            ));
638        }
639
640        // Table ID
641        let id = TableId::new(u32::from_le_bytes([
642            buffer[*offset],
643            buffer[*offset + 1],
644            buffer[*offset + 2],
645            buffer[*offset + 3],
646        ]));
647        *offset += 4;
648
649        // Name
650        let name_len = u32::from_le_bytes([
651            buffer[*offset],
652            buffer[*offset + 1],
653            buffer[*offset + 2],
654            buffer[*offset + 3],
655        ]) as usize;
656        *offset += 4;
657
658        if *offset + name_len > buffer.len() {
659            return Err(HematiteError::CorruptedData(
660                "Invalid table name".to_string(),
661            ));
662        }
663        let name = String::from_utf8(buffer[*offset..*offset + name_len].to_vec())
664            .map_err(|_| HematiteError::CorruptedData("Invalid UTF-8 in table name".to_string()))?;
665        *offset += name_len;
666
667        // Root page ID
668        let root_page_id = u32::from_le_bytes([
669            buffer[*offset],
670            buffer[*offset + 1],
671            buffer[*offset + 2],
672            buffer[*offset + 3],
673        ]);
674        *offset += 4;
675
676        let primary_key_index_root_page_id = u32::from_le_bytes([
677            buffer[*offset],
678            buffer[*offset + 1],
679            buffer[*offset + 2],
680            buffer[*offset + 3],
681        ]);
682        *offset += 4;
683
684        // Column count
685        let column_count = u32::from_le_bytes([
686            buffer[*offset],
687            buffer[*offset + 1],
688            buffer[*offset + 2],
689            buffer[*offset + 3],
690        ]) as usize;
691        *offset += 4;
692
693        // Columns
694        let mut columns = Vec::with_capacity(column_count);
695        for _ in 0..column_count {
696            columns.push(Column::deserialize(buffer, offset)?);
697        }
698
699        // Primary key column count
700        if *offset + 4 > buffer.len() {
701            return Err(HematiteError::CorruptedData(
702                "Invalid primary key count".to_string(),
703            ));
704        }
705        let pk_count = u32::from_le_bytes([
706            buffer[*offset],
707            buffer[*offset + 1],
708            buffer[*offset + 2],
709            buffer[*offset + 3],
710        ]) as usize;
711        *offset += 4;
712
713        // Primary key column indices
714        let mut primary_key_columns = Vec::with_capacity(pk_count);
715        for _ in 0..pk_count {
716            if *offset + 4 > buffer.len() {
717                return Err(HematiteError::CorruptedData(
718                    "Invalid primary key index".to_string(),
719                ));
720            }
721            let index = u32::from_le_bytes([
722                buffer[*offset],
723                buffer[*offset + 1],
724                buffer[*offset + 2],
725                buffer[*offset + 3],
726            ]) as usize;
727            *offset += 4;
728            primary_key_columns.push(index);
729        }
730
731        let mut table = Self::new(id, name, columns, root_page_id)?;
732        table.primary_key_index_root_page_id = primary_key_index_root_page_id;
733
734        if *offset == buffer.len() {
735            return Ok(table);
736        }
737
738        if *offset + 4 > buffer.len() {
739            return Err(HematiteError::CorruptedData(
740                "Invalid secondary index count".to_string(),
741            ));
742        }
743        let secondary_index_count = u32::from_le_bytes([
744            buffer[*offset],
745            buffer[*offset + 1],
746            buffer[*offset + 2],
747            buffer[*offset + 3],
748        ]) as usize;
749        *offset += 4;
750
751        for _ in 0..secondary_index_count {
752            if *offset + 4 > buffer.len() {
753                return Err(HematiteError::CorruptedData(
754                    "Invalid secondary index name length".to_string(),
755                ));
756            }
757            let name_len = u32::from_le_bytes([
758                buffer[*offset],
759                buffer[*offset + 1],
760                buffer[*offset + 2],
761                buffer[*offset + 3],
762            ]) as usize;
763            *offset += 4;
764
765            if *offset + name_len > buffer.len() {
766                return Err(HematiteError::CorruptedData(
767                    "Invalid secondary index name".to_string(),
768                ));
769            }
770            let name =
771                String::from_utf8(buffer[*offset..*offset + name_len].to_vec()).map_err(|_| {
772                    HematiteError::CorruptedData(
773                        "Invalid UTF-8 in secondary index name".to_string(),
774                    )
775                })?;
776            *offset += name_len;
777
778            if *offset + 9 > buffer.len() {
779                return Err(HematiteError::CorruptedData(
780                    "Invalid secondary index metadata".to_string(),
781                ));
782            }
783            let index_root_page_id = u32::from_le_bytes([
784                buffer[*offset],
785                buffer[*offset + 1],
786                buffer[*offset + 2],
787                buffer[*offset + 3],
788            ]);
789            *offset += 4;
790
791            let unique = match buffer[*offset] {
792                0 => false,
793                1 => true,
794                _ => {
795                    return Err(HematiteError::CorruptedData(
796                        "Invalid secondary index uniqueness flag".to_string(),
797                    ))
798                }
799            };
800            *offset += 1;
801
802            let column_count = u32::from_le_bytes([
803                buffer[*offset],
804                buffer[*offset + 1],
805                buffer[*offset + 2],
806                buffer[*offset + 3],
807            ]) as usize;
808            *offset += 4;
809
810            let mut column_indices = Vec::with_capacity(column_count);
811            for _ in 0..column_count {
812                if *offset + 4 > buffer.len() {
813                    return Err(HematiteError::CorruptedData(
814                        "Invalid secondary index column index".to_string(),
815                    ));
816                }
817                let column_index = u32::from_le_bytes([
818                    buffer[*offset],
819                    buffer[*offset + 1],
820                    buffer[*offset + 2],
821                    buffer[*offset + 3],
822                ]) as usize;
823                *offset += 4;
824                column_indices.push(column_index);
825            }
826
827            table.add_secondary_index(SecondaryIndex {
828                name,
829                column_indices,
830                root_page_id: index_root_page_id,
831                unique,
832            })?;
833        }
834
835        if *offset == buffer.len() {
836            return Ok(table);
837        }
838
839        if *offset + 4 > buffer.len() {
840            return Err(HematiteError::CorruptedData(
841                "Invalid check constraint count".to_string(),
842            ));
843        }
844        let check_count = u32::from_le_bytes([
845            buffer[*offset],
846            buffer[*offset + 1],
847            buffer[*offset + 2],
848            buffer[*offset + 3],
849        ]) as usize;
850        *offset += 4;
851
852        for _ in 0..check_count {
853            if *offset >= buffer.len() {
854                return Err(HematiteError::CorruptedData(
855                    "Invalid check constraint metadata".to_string(),
856                ));
857            }
858            let name = if buffer[*offset] == 1 {
859                *offset += 1;
860                let len = read_len(buffer, offset, "check constraint name length")?;
861                let value = read_string(buffer, offset, len, "check constraint name")?;
862                Some(value)
863            } else {
864                *offset += 1;
865                None
866            };
867            let len = read_len(buffer, offset, "check constraint expression length")?;
868            let expression_sql = read_string(buffer, offset, len, "check constraint expression")?;
869            table.add_check_constraint(CheckConstraint {
870                name,
871                expression_sql,
872            })?;
873        }
874
875        if *offset == buffer.len() {
876            return Ok(table);
877        }
878
879        let foreign_key_count = read_len(buffer, offset, "foreign key count")?;
880        for _ in 0..foreign_key_count {
881            table.add_foreign_key(ForeignKeyConstraint::deserialize(buffer, offset)?)?;
882        }
883
884        Ok(table)
885    }
886
887    /// Convert table to bytes for storage in schema B-tree
888    pub fn to_bytes(&self) -> Result<Vec<u8>> {
889        let mut buffer = Vec::new();
890        self.serialize(&mut buffer)?;
891        Ok(buffer)
892    }
893
894    /// Create table from bytes stored in schema B-tree
895    pub fn from_bytes(bytes: &[u8]) -> Result<Self> {
896        let mut offset = 0;
897        Self::deserialize(bytes, &mut offset)
898    }
899}
900
901fn read_len(buffer: &[u8], offset: &mut usize, field: &str) -> Result<usize> {
902    if *offset + 4 > buffer.len() {
903        return Err(HematiteError::CorruptedData(format!("Invalid {}", field)));
904    }
905    let value = u32::from_le_bytes([
906        buffer[*offset],
907        buffer[*offset + 1],
908        buffer[*offset + 2],
909        buffer[*offset + 3],
910    ]) as usize;
911    *offset += 4;
912    Ok(value)
913}
914
915fn write_string(buffer: &mut Vec<u8>, value: &str) {
916    let bytes = value.as_bytes();
917    buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
918    buffer.extend_from_slice(bytes);
919}
920
921fn write_optional_string(buffer: &mut Vec<u8>, value: Option<&str>) {
922    match value {
923        Some(value) => {
924            buffer.push(1);
925            write_string(buffer, value);
926        }
927        None => buffer.push(0),
928    }
929}
930
931fn read_string(buffer: &[u8], offset: &mut usize, len: usize, field: &str) -> Result<String> {
932    if *offset + len > buffer.len() {
933        return Err(HematiteError::CorruptedData(format!("Invalid {}", field)));
934    }
935    let value = String::from_utf8(buffer[*offset..*offset + len].to_vec())
936        .map_err(|_| HematiteError::CorruptedData(format!("Invalid UTF-8 in {}", field)))?;
937    *offset += len;
938    Ok(value)
939}
940
941fn read_optional_string(buffer: &[u8], offset: &mut usize, field: &str) -> Result<Option<String>> {
942    if *offset >= buffer.len() {
943        return Err(HematiteError::CorruptedData(format!("Invalid {}", field)));
944    }
945    let present = buffer[*offset];
946    *offset += 1;
947    match present {
948        0 => Ok(None),
949        1 => {
950            let len = read_len(buffer, offset, &format!("{} length", field))?;
951            read_string(buffer, offset, len, field).map(Some)
952        }
953        _ => Err(HematiteError::CorruptedData(format!(
954            "Invalid {} marker",
955            field
956        ))),
957    }
958}
959
960fn foreign_key_action_to_byte(action: ForeignKeyAction) -> u8 {
961    match action {
962        ForeignKeyAction::Restrict => 0,
963        ForeignKeyAction::Cascade => 1,
964        ForeignKeyAction::SetNull => 2,
965    }
966}
967
968fn read_foreign_key_action(
969    buffer: &[u8],
970    offset: &mut usize,
971    field: &str,
972) -> Result<ForeignKeyAction> {
973    if *offset >= buffer.len() {
974        return Err(HematiteError::CorruptedData(format!("Invalid {}", field)));
975    }
976    let action = match buffer[*offset] {
977        0 => ForeignKeyAction::Restrict,
978        1 => ForeignKeyAction::Cascade,
979        2 => ForeignKeyAction::SetNull,
980        _ => {
981            return Err(HematiteError::CorruptedData(format!(
982                "Invalid {} value",
983                field
984            )))
985        }
986    };
987    *offset += 1;
988    Ok(action)
989}