quill_sql/catalog/
catalog.rs

1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use super::registry::{global_index_registry, global_table_registry};
6use crate::catalog::{
7    key_schema_to_varchar, SchemaRef, COLUMNS_SCHEMA, INDEXES_SCHEMA, INFORMATION_SCHEMA_COLUMNS,
8    INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMAS,
9    INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHEMA, TABLES_SCHEMA,
10};
11use crate::storage::disk_manager::DiskManager;
12use crate::storage::page::{
13    BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE, EMPTY_TUPLE_META,
14};
15use crate::storage::table_heap::{TableHeap, TableIterator};
16use crate::storage::tuple::Tuple;
17use crate::transaction::{CommandId, TransactionId};
18use crate::utils::scalar::ScalarValue;
19use crate::utils::table_ref::TableReference;
20use crate::{
21    buffer::BufferManager,
22    error::{QuillSQLError, QuillSQLResult},
23    storage::index::btree_index::BPlusTreeIndex,
24};
25
26pub static DEFAULT_CATALOG_NAME: &str = "quillsql";
27pub static DEFAULT_SCHEMA_NAME: &str = "public";
28
29#[derive(Debug)]
30pub struct Catalog {
31    pub schemas: HashMap<String, CatalogSchema>,
32    pub buffer_pool: Arc<BufferManager>,
33    pub disk_manager: Arc<DiskManager>,
34}
35
36#[derive(Debug)]
37pub struct CatalogSchema {
38    pub name: String,
39    pub tables: HashMap<String, CatalogTable>,
40}
41
42impl CatalogSchema {
43    pub fn new(name: impl Into<String>) -> Self {
44        Self {
45            name: name.into(),
46            tables: HashMap::new(),
47        }
48    }
49}
50
51#[derive(Debug)]
52pub struct CatalogTable {
53    pub name: String,
54    pub table: Arc<TableHeap>,
55    pub indexes: HashMap<String, Arc<BPlusTreeIndex>>,
56}
57
58impl CatalogTable {
59    pub fn new(name: impl Into<String>, table: Arc<TableHeap>) -> Self {
60        Self {
61            name: name.into(),
62            table,
63            indexes: HashMap::new(),
64        }
65    }
66}
67
68const SYSTEM_TXN_ID: TransactionId = 0;
69const SYSTEM_COMMAND_ID: CommandId = 0;
70
71impl Catalog {
72    pub fn new(buffer_pool: Arc<BufferManager>, disk_manager: Arc<DiskManager>) -> Self {
73        Self {
74            schemas: HashMap::new(),
75            buffer_pool,
76            disk_manager,
77        }
78    }
79
80    pub fn create_schema(&mut self, schema_name: impl Into<String>) -> QuillSQLResult<()> {
81        let schema_name = schema_name.into();
82        if self.schemas.contains_key(&schema_name) {
83            return Err(QuillSQLError::Storage(
84                "Cannot create duplicated schema".to_string(),
85            ));
86        }
87        self.schemas
88            .insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));
89
90        // update system table
91        let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
92            return Err(QuillSQLError::Internal(
93                "catalog schema information_schema not created yet".to_string(),
94            ));
95        };
96        let Some(schemas_table) = information_schema
97            .tables
98            .get_mut(INFORMATION_SCHEMA_SCHEMAS)
99        else {
100            return Err(QuillSQLError::Internal(
101                "table information_schema.schemas not created yet".to_string(),
102            ));
103        };
104
105        let tuple = Tuple::new(
106            SCHEMAS_SCHEMA.clone(),
107            vec![
108                DEFAULT_CATALOG_NAME.to_string().into(),
109                schema_name.clone().into(),
110            ],
111        );
112        schemas_table
113            .table
114            .insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
115        Ok(())
116    }
117
118    pub fn create_table(
119        &mut self,
120        table_ref: TableReference,
121        schema: SchemaRef,
122    ) -> QuillSQLResult<Arc<TableHeap>> {
123        let catalog_name = table_ref
124            .catalog()
125            .unwrap_or(DEFAULT_CATALOG_NAME)
126            .to_string();
127        let catalog_schema_name = table_ref
128            .schema()
129            .unwrap_or(DEFAULT_SCHEMA_NAME)
130            .to_string();
131        let table_name = table_ref.table().to_string();
132
133        let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
134            return Err(QuillSQLError::Storage(format!(
135                "catalog schema {} not created yet",
136                catalog_schema_name
137            )));
138        };
139        if catalog_schema.tables.contains_key(table_ref.table()) {
140            return Err(QuillSQLError::Storage(
141                "Cannot create duplicated table".to_string(),
142            ));
143        }
144        let table_heap = Arc::new(TableHeap::try_new(
145            schema.clone(),
146            self.buffer_pool.clone(),
147        )?);
148        let catalog_table = CatalogTable {
149            name: table_name.clone(),
150            table: table_heap.clone(),
151            indexes: HashMap::new(),
152        };
153        catalog_schema
154            .tables
155            .insert(table_name.clone(), catalog_table);
156        global_table_registry().register(table_ref.clone(), table_heap.clone());
157
158        // update system table
159        let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
160            return Err(QuillSQLError::Internal(
161                "catalog schema information_schema not created yet".to_string(),
162            ));
163        };
164        let Some(tables_table) = information_schema.tables.get_mut(INFORMATION_SCHEMA_TABLES)
165        else {
166            return Err(QuillSQLError::Internal(
167                "table information_schema.tables not created yet".to_string(),
168            ));
169        };
170
171        let tuple = Tuple::new(
172            TABLES_SCHEMA.clone(),
173            vec![
174                catalog_name.clone().into(),
175                catalog_schema_name.clone().into(),
176                table_name.clone().into(),
177                (table_heap.first_page_id.load(Ordering::SeqCst)).into(),
178            ],
179        );
180        tables_table.table.insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
181
182        let Some(columns_table) = information_schema
183            .tables
184            .get_mut(INFORMATION_SCHEMA_COLUMNS)
185        else {
186            return Err(QuillSQLError::Internal(
187                "table information_schema.columns not created yet".to_string(),
188            ));
189        };
190        for col in schema.columns.iter() {
191            let sql_type: sqlparser::ast::DataType = (&col.data_type).into();
192            let tuple = Tuple::new(
193                COLUMNS_SCHEMA.clone(),
194                vec![
195                    catalog_name.clone().into(),
196                    catalog_schema_name.clone().into(),
197                    table_name.clone().into(),
198                    col.name.clone().into(),
199                    format!("{sql_type}").into(),
200                    col.nullable.into(),
201                    format!("{}", col.default).into(),
202                ],
203            );
204            columns_table
205                .table
206                .insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
207        }
208
209        Ok(table_heap)
210    }
211
212    pub fn drop_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<bool> {
213        let catalog_name = table_ref
214            .catalog()
215            .unwrap_or(DEFAULT_CATALOG_NAME)
216            .to_string();
217        let schema_name = table_ref
218            .schema()
219            .unwrap_or(DEFAULT_SCHEMA_NAME)
220            .to_string();
221        let table_name = table_ref.table().to_string();
222
223        if schema_name == INFORMATION_SCHEMA_NAME {
224            return Err(QuillSQLError::Execution(
225                "dropping information_schema tables is not allowed".to_string(),
226            ));
227        }
228
229        let Some(schema) = self.schemas.get_mut(&schema_name) else {
230            return Ok(false);
231        };
232
233        let Some(catalog_table) = schema.tables.remove(&table_name) else {
234            return Ok(false);
235        };
236
237        global_table_registry().unregister(table_ref);
238
239        for index_name in catalog_table.indexes.keys() {
240            self.unregister_index_variants(
241                &catalog_name,
242                &schema_name,
243                &table_name,
244                table_ref,
245                index_name,
246            );
247        }
248
249        self.remove_table_metadata(&catalog_name, &schema_name, &table_name)?;
250        Ok(true)
251    }
252
253    pub fn table_heap(&self, table_ref: &TableReference) -> QuillSQLResult<Arc<TableHeap>> {
254        let catalog_schema_name = table_ref
255            .schema()
256            .unwrap_or(DEFAULT_SCHEMA_NAME)
257            .to_string();
258        let table_name = table_ref.table().to_string();
259
260        let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
261            return Err(QuillSQLError::Storage(format!(
262                "catalog schema {} not created yet",
263                catalog_schema_name
264            )));
265        };
266        let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
267            return Err(QuillSQLError::Storage(format!(
268                "table {} not created yet",
269                table_name
270            )));
271        };
272        Ok(catalog_table.table.clone())
273    }
274
275    pub fn try_table_heap(&self, table_ref: &TableReference) -> Option<Arc<TableHeap>> {
276        let schema_name = table_ref
277            .schema()
278            .unwrap_or(DEFAULT_SCHEMA_NAME)
279            .to_string();
280        self.schemas
281            .get(&schema_name)
282            .and_then(|schema| schema.tables.get(table_ref.table()))
283            .map(|catalog_table| catalog_table.table.clone())
284    }
285
286    pub fn table_indexes(
287        &self,
288        table_ref: &TableReference,
289    ) -> QuillSQLResult<Vec<Arc<BPlusTreeIndex>>> {
290        let catalog_schema_name = table_ref
291            .schema()
292            .unwrap_or(DEFAULT_SCHEMA_NAME)
293            .to_string();
294        let table_name = table_ref.table().to_string();
295
296        let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
297            return Err(QuillSQLError::Storage(format!(
298                "catalog schema {} not created yet",
299                catalog_schema_name
300            )));
301        };
302        let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
303            return Err(QuillSQLError::Storage(format!(
304                "table {} not created yet",
305                table_name
306            )));
307        };
308        Ok(catalog_table.indexes.values().cloned().collect())
309    }
310
311    pub fn create_index(
312        &mut self,
313        index_name: String,
314        table_ref: &TableReference,
315        key_schema: SchemaRef,
316    ) -> QuillSQLResult<Arc<BPlusTreeIndex>> {
317        let catalog_name = table_ref
318            .catalog()
319            .unwrap_or(DEFAULT_CATALOG_NAME)
320            .to_string();
321        let catalog_schema_name = table_ref
322            .schema()
323            .unwrap_or(DEFAULT_SCHEMA_NAME)
324            .to_string();
325        let table_name = table_ref.table().to_string();
326
327        let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
328            return Err(QuillSQLError::Storage(format!(
329                "catalog schema {} not created yet",
330                catalog_schema_name
331            )));
332        };
333        let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
334            return Err(QuillSQLError::Storage(format!(
335                "table {} not created yet",
336                table_name
337            )));
338        };
339        if catalog_table.indexes.contains_key(&index_name) {
340            return Err(QuillSQLError::Storage(
341                "Cannot create duplicated index".to_string(),
342            ));
343        }
344
345        let b_plus_tree_index = Arc::new(BPlusTreeIndex::new(
346            key_schema.clone(),
347            self.buffer_pool.clone(),
348            BPLUS_INTERNAL_PAGE_MAX_SIZE as u32,
349            BPLUS_LEAF_PAGE_MAX_SIZE as u32,
350        ));
351        catalog_table
352            .indexes
353            .insert(index_name.clone(), b_plus_tree_index.clone());
354        // register for background maintenance
355        let table_heap = catalog_table.table.clone();
356        global_index_registry().register(
357            table_ref.clone(),
358            index_name.clone(),
359            b_plus_tree_index.clone(),
360            table_heap,
361        );
362
363        // update system table
364        let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
365            return Err(QuillSQLError::Internal(
366                "catalog schema information_schema not created yet".to_string(),
367            ));
368        };
369        let Some(indexes_table) = information_schema
370            .tables
371            .get_mut(INFORMATION_SCHEMA_INDEXES)
372        else {
373            return Err(QuillSQLError::Internal(
374                "table information_schema.indexes not created yet".to_string(),
375            ));
376        };
377
378        let tuple = Tuple::new(
379            INDEXES_SCHEMA.clone(),
380            vec![
381                catalog_name.clone().into(),
382                catalog_schema_name.clone().into(),
383                table_name.clone().into(),
384                index_name.clone().into(),
385                key_schema_to_varchar(&b_plus_tree_index.key_schema).into(),
386                b_plus_tree_index.internal_max_size.into(),
387                b_plus_tree_index.leaf_max_size.into(),
388                b_plus_tree_index.header_page_id.into(),
389            ],
390        );
391        indexes_table
392            .table
393            .insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
394
395        Ok(b_plus_tree_index)
396    }
397
398    pub fn drop_index(
399        &mut self,
400        table_ref: &TableReference,
401        index_name: &str,
402    ) -> QuillSQLResult<bool> {
403        let catalog_name = table_ref
404            .catalog()
405            .unwrap_or(DEFAULT_CATALOG_NAME)
406            .to_string();
407        let schema_name = table_ref
408            .schema()
409            .unwrap_or(DEFAULT_SCHEMA_NAME)
410            .to_string();
411        let table_name = table_ref.table().to_string();
412
413        if schema_name == INFORMATION_SCHEMA_NAME {
414            return Err(QuillSQLError::Execution(
415                "dropping indexes on information_schema tables is not allowed".to_string(),
416            ));
417        }
418
419        let Some(schema) = self.schemas.get_mut(&schema_name) else {
420            return Ok(false);
421        };
422        let Some(table) = schema.tables.get_mut(&table_name) else {
423            return Ok(false);
424        };
425
426        if table.indexes.remove(index_name).is_none() {
427            return Ok(false);
428        }
429
430        self.unregister_index_variants(
431            &catalog_name,
432            &schema_name,
433            &table_name,
434            table_ref,
435            index_name,
436        );
437
438        self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
439
440        Ok(true)
441    }
442
443    pub fn find_index_owner(
444        &self,
445        catalog_hint: Option<&str>,
446        schema_hint: Option<&str>,
447        index_name: &str,
448    ) -> Option<TableReference> {
449        let catalog_name = catalog_hint.unwrap_or(DEFAULT_CATALOG_NAME);
450
451        if let Some(schema_name) = schema_hint {
452            return self.find_index_in_schema(catalog_name, schema_name, index_name);
453        }
454
455        for schema_name in self.schemas.keys() {
456            if schema_name == INFORMATION_SCHEMA_NAME {
457                continue;
458            }
459            if let Some(table_ref) =
460                self.find_index_in_schema(catalog_name, schema_name, index_name)
461            {
462                return Some(table_ref);
463            }
464        }
465        None
466    }
467
468    pub fn index(
469        &self,
470        table_ref: &TableReference,
471        index_name: &str,
472    ) -> QuillSQLResult<Option<Arc<BPlusTreeIndex>>> {
473        let catalog_schema_name = table_ref
474            .schema()
475            .unwrap_or(DEFAULT_SCHEMA_NAME)
476            .to_string();
477        let table_name = table_ref.table().to_string();
478
479        let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
480            return Err(QuillSQLError::Storage(format!(
481                "catalog schema {} not created yet",
482                catalog_schema_name
483            )));
484        };
485        let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
486            return Err(QuillSQLError::Storage(format!(
487                "table {} not created yet",
488                table_name
489            )));
490        };
491        Ok(catalog_table.indexes.get(index_name).cloned())
492    }
493
494    pub fn load_schema(&mut self, name: impl Into<String>, schema: CatalogSchema) {
495        self.schemas.insert(name.into(), schema);
496    }
497
498    pub fn load_table(
499        &mut self,
500        table_ref: TableReference,
501        table: CatalogTable,
502    ) -> QuillSQLResult<()> {
503        let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
504        let table_name = table_ref.table().to_string();
505        let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
506            return Err(QuillSQLError::Storage(format!(
507                "catalog schema {} not created yet",
508                catalog_schema_name
509            )));
510        };
511        global_table_registry().register(table_ref.clone(), table.table.clone());
512        catalog_schema.tables.insert(table_name, table);
513        Ok(())
514    }
515
516    pub fn load_index(
517        &mut self,
518        table_ref: TableReference,
519        index_name: impl Into<String>,
520        index: Arc<BPlusTreeIndex>,
521    ) -> QuillSQLResult<()> {
522        let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
523        let table_name = table_ref.table().to_string();
524        let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
525            return Err(QuillSQLError::Storage(format!(
526                "catalog schema {} not created yet",
527                catalog_schema_name
528            )));
529        };
530        let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
531            return Err(QuillSQLError::Storage(format!(
532                "catalog table {} not created yet",
533                table_name
534            )));
535        };
536        let idx_name: String = index_name.into();
537        catalog_table.indexes.insert(idx_name.clone(), index);
538        // register as well
539        if let Some(idx) = catalog_table.indexes.get(&idx_name) {
540            let table_heap = catalog_table.table.clone();
541            global_index_registry().register(table_ref.clone(), idx_name, idx.clone(), table_heap);
542        }
543        Ok(())
544    }
545
546    fn find_index_in_schema(
547        &self,
548        catalog_name: &str,
549        schema_name: &str,
550        index_name: &str,
551    ) -> Option<TableReference> {
552        if schema_name == INFORMATION_SCHEMA_NAME {
553            return None;
554        }
555        let schema = self.schemas.get(schema_name)?;
556        for (table_name, table) in &schema.tables {
557            if table.indexes.contains_key(index_name) {
558                if catalog_name == DEFAULT_CATALOG_NAME {
559                    if schema_name == DEFAULT_SCHEMA_NAME {
560                        return Some(TableReference::Bare {
561                            table: table_name.clone(),
562                        });
563                    }
564                    return Some(TableReference::Partial {
565                        schema: schema_name.to_string(),
566                        table: table_name.clone(),
567                    });
568                }
569                return Some(TableReference::Full {
570                    catalog: catalog_name.to_string(),
571                    schema: schema_name.to_string(),
572                    table: table_name.clone(),
573                });
574            }
575        }
576        None
577    }
578
579    fn remove_table_metadata(
580        &mut self,
581        catalog_name: &str,
582        schema_name: &str,
583        table_name: &str,
584    ) -> QuillSQLResult<()> {
585        let (tables_heap, columns_heap, indexes_heap) = {
586            let information_schema =
587                self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
588                    QuillSQLError::Internal(
589                        "catalog schema information_schema not created yet".to_string(),
590                    )
591                })?;
592            let tables_heap = information_schema
593                .tables
594                .get(INFORMATION_SCHEMA_TABLES)
595                .ok_or_else(|| {
596                    QuillSQLError::Internal(
597                        "table information_schema.tables not created yet".to_string(),
598                    )
599                })?
600                .table
601                .clone();
602            let columns_heap = information_schema
603                .tables
604                .get(INFORMATION_SCHEMA_COLUMNS)
605                .ok_or_else(|| {
606                    QuillSQLError::Internal(
607                        "table information_schema.columns not created yet".to_string(),
608                    )
609                })?
610                .table
611                .clone();
612            let indexes_heap = information_schema
613                .tables
614                .get(INFORMATION_SCHEMA_INDEXES)
615                .ok_or_else(|| {
616                    QuillSQLError::Internal(
617                        "table information_schema.indexes not created yet".to_string(),
618                    )
619                })?
620                .table
621                .clone();
622            (tables_heap, columns_heap, indexes_heap)
623        };
624
625        Self::delete_matching_rows(tables_heap, |tuple| {
626            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
627                return Ok(false);
628            };
629            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
630                return Ok(false);
631            };
632            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
633                return Ok(false);
634            };
635            Ok(catalog == catalog_name && schema == schema_name && table == table_name)
636        })?;
637
638        Self::delete_matching_rows(columns_heap, |tuple| {
639            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
640                return Ok(false);
641            };
642            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
643                return Ok(false);
644            };
645            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
646                return Ok(false);
647            };
648            Ok(catalog == catalog_name && schema == schema_name && table == table_name)
649        })?;
650
651        Self::delete_matching_rows(indexes_heap, |tuple| {
652            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
653                return Ok(false);
654            };
655            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
656                return Ok(false);
657            };
658            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
659                return Ok(false);
660            };
661            Ok(catalog == catalog_name && schema == schema_name && table == table_name)
662        })?;
663
664        Ok(())
665    }
666
667    fn remove_index_metadata(
668        &mut self,
669        catalog_name: &str,
670        schema_name: &str,
671        table_name: &str,
672        index_name: &str,
673    ) -> QuillSQLResult<()> {
674        let indexes_heap = {
675            let information_schema =
676                self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
677                    QuillSQLError::Internal(
678                        "catalog schema information_schema not created yet".to_string(),
679                    )
680                })?;
681            information_schema
682                .tables
683                .get(INFORMATION_SCHEMA_INDEXES)
684                .ok_or_else(|| {
685                    QuillSQLError::Internal(
686                        "table information_schema.indexes not created yet".to_string(),
687                    )
688                })?
689                .table
690                .clone()
691        };
692
693        Self::delete_matching_rows(indexes_heap, |tuple| {
694            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
695                return Ok(false);
696            };
697            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
698                return Ok(false);
699            };
700            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
701                return Ok(false);
702            };
703            let ScalarValue::Varchar(Some(index)) = tuple.value(3)? else {
704                return Ok(false);
705            };
706            Ok(catalog == catalog_name
707                && schema == schema_name
708                && table == table_name
709                && index == index_name)
710        })?;
711
712        Ok(())
713    }
714
715    fn delete_matching_rows<F>(heap: Arc<TableHeap>, mut predicate: F) -> QuillSQLResult<()>
716    where
717        F: FnMut(&Tuple) -> QuillSQLResult<bool>,
718    {
719        let mut iterator = TableIterator::new(heap.clone(), ..);
720        while let Some((rid, _meta, tuple)) = iterator.next()? {
721            if predicate(&tuple)? {
722                heap.delete_tuple(rid, SYSTEM_TXN_ID, SYSTEM_COMMAND_ID)?;
723            }
724        }
725        Ok(())
726    }
727
728    fn unregister_index_variants(
729        &self,
730        catalog_name: &str,
731        schema_name: &str,
732        table_name: &str,
733        original_ref: &TableReference,
734        index_name: &str,
735    ) {
736        let registry = global_index_registry();
737        registry.unregister(original_ref, index_name);
738        registry.unregister(
739            &TableReference::Bare {
740                table: table_name.to_string(),
741            },
742            index_name,
743        );
744        registry.unregister(
745            &TableReference::Partial {
746                schema: schema_name.to_string(),
747                table: table_name.to_string(),
748            },
749            index_name,
750        );
751        registry.unregister(
752            &TableReference::Full {
753                catalog: catalog_name.to_string(),
754                schema: schema_name.to_string(),
755                table: table_name.to_string(),
756            },
757            index_name,
758        );
759    }
760}
761
762#[cfg(test)]
763mod tests {
764    use std::sync::Arc;
765
766    use crate::utils::table_ref::TableReference;
767    use crate::{
768        catalog::{Column, DataType, Schema},
769        database::Database,
770    };
771
772    #[test]
773    pub fn test_catalog_create_table() {
774        let mut db = Database::new_temp().unwrap();
775
776        let table_ref1 = TableReference::Bare {
777            table: "test_table1".to_string(),
778        };
779        let schema = Arc::new(Schema::new(vec![
780            Column::new("a", DataType::Int8, true),
781            Column::new("b", DataType::Int16, true),
782            Column::new("c", DataType::Int32, true),
783        ]));
784        let table_info = db
785            .catalog
786            .create_table(table_ref1.clone(), schema.clone())
787            .unwrap();
788        assert_eq!(table_info.schema, schema);
789
790        let table_ref2 = TableReference::Bare {
791            table: "test_table2".to_string(),
792        };
793        let schema = Arc::new(Schema::new(vec![
794            Column::new("d", DataType::Int32, true),
795            Column::new("e", DataType::Int16, true),
796            Column::new("f", DataType::Int8, true),
797        ]));
798        let table_info = db
799            .catalog
800            .create_table(table_ref2.clone(), schema.clone())
801            .unwrap();
802        assert_eq!(table_info.schema, schema);
803
804        let table_info = db.catalog.table_heap(&table_ref1).unwrap();
805        assert_eq!(table_info.schema.column_count(), 3);
806
807        let table_info = db.catalog.table_heap(&table_ref2).unwrap();
808        assert_eq!(table_info.schema.column_count(), 3);
809    }
810
811    #[test]
812    pub fn test_catalog_create_index() {
813        let mut db = Database::new_temp().unwrap();
814
815        let table_ref = TableReference::Bare {
816            table: "test_table1".to_string(),
817        };
818        let schema = Arc::new(Schema::new(vec![
819            Column::new("a", DataType::Int8, true),
820            Column::new("b", DataType::Int16, true),
821            Column::new("c", DataType::Int32, true),
822        ]));
823        let _ = db.catalog.create_table(table_ref.clone(), schema.clone());
824
825        let index_name1 = "test_index1".to_string();
826        let key_schema1 = Arc::new(schema.project(&[0, 2]).unwrap());
827        let index1 = db
828            .catalog
829            .create_index(index_name1.clone(), &table_ref, key_schema1.clone())
830            .unwrap();
831        assert_eq!(index1.key_schema, key_schema1);
832
833        let index_name2 = "test_index2".to_string();
834        let key_schema2 = Arc::new(schema.project(&[1]).unwrap());
835        let index2 = db
836            .catalog
837            .create_index(index_name2.clone(), &table_ref, key_schema2.clone())
838            .unwrap();
839        assert_eq!(index2.key_schema, key_schema2);
840
841        let index3 = db
842            .catalog
843            .index(&table_ref, index_name1.as_str())
844            .unwrap()
845            .unwrap();
846        assert_eq!(index3.key_schema, key_schema1);
847    }
848}