quill_sql/catalog/
catalog.rs

1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use super::registry::TableRegistry;
6use crate::catalog::{
7    key_schema_to_varchar, SchemaRef, TableStatistics, COLUMNS_SCHEMA, INDEXES_SCHEMA,
8    INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME,
9    INFORMATION_SCHEMA_SCHEMAS, INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHEMA, TABLES_SCHEMA,
10};
11use crate::storage::disk_manager::DiskManager;
12use crate::storage::heap::MvccHeap;
13use crate::storage::page::{BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE};
14use crate::storage::table_heap::{TableHeap, TableIterator};
15use crate::storage::tuple::Tuple;
16use crate::transaction::{CommandId, TransactionId};
17use crate::utils::scalar::ScalarValue;
18use crate::utils::table_ref::TableReference;
19use crate::{
20    buffer::BufferManager,
21    error::{QuillSQLError, QuillSQLResult},
22    storage::index::btree_index::BPlusTreeIndex,
23};
24
25pub static DEFAULT_CATALOG_NAME: &str = "quillsql";
26pub static DEFAULT_SCHEMA_NAME: &str = "public";
27
28#[derive(Debug)]
29pub struct Catalog {
30    pub schemas: HashMap<String, CatalogSchema>,
31    pub buffer_pool: Arc<BufferManager>,
32    pub disk_manager: Arc<DiskManager>,
33    table_registry: Arc<TableRegistry>,
34}
35
36#[derive(Debug)]
37pub struct CatalogSchema {
38    pub name: String,
39    pub tables: HashMap<String, CatalogTable>,
40}
41
42impl CatalogSchema {
43    pub fn new(name: impl Into<String>) -> Self {
44        Self {
45            name: name.into(),
46            tables: HashMap::new(),
47        }
48    }
49}
50
51#[derive(Debug)]
52pub struct CatalogTable {
53    pub name: String,
54    pub table: Arc<TableHeap>,
55    pub indexes: HashMap<String, Arc<BPlusTreeIndex>>,
56    pub stats: Option<TableStatistics>,
57}
58
59impl CatalogTable {
60    pub fn new(name: impl Into<String>, table: Arc<TableHeap>) -> Self {
61        Self {
62            name: name.into(),
63            table,
64            indexes: HashMap::new(),
65            stats: None,
66        }
67    }
68}
69
70const SYSTEM_TXN_ID: TransactionId = 0;
71const SYSTEM_COMMAND_ID: CommandId = 0;
72
73impl Catalog {
74    pub fn new(
75        buffer_pool: Arc<BufferManager>,
76        disk_manager: Arc<DiskManager>,
77        table_registry: Arc<TableRegistry>,
78    ) -> Self {
79        Self {
80            schemas: HashMap::new(),
81            buffer_pool,
82            disk_manager,
83            table_registry,
84        }
85    }
86
87    pub fn table_registry(&self) -> Arc<TableRegistry> {
88        self.table_registry.clone()
89    }
90
91    pub fn create_schema(&mut self, schema_name: impl Into<String>) -> QuillSQLResult<()> {
92        let schema_name = schema_name.into();
93        if self.schemas.contains_key(&schema_name) {
94            return Err(QuillSQLError::Storage(
95                "Cannot create duplicated schema".to_string(),
96            ));
97        }
98        self.schemas
99            .insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));
100
101        // update system table
102        let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
103            return Err(QuillSQLError::Internal(
104                "catalog schema information_schema not created yet".to_string(),
105            ));
106        };
107        let Some(schemas_table) = information_schema
108            .tables
109            .get_mut(INFORMATION_SCHEMA_SCHEMAS)
110        else {
111            return Err(QuillSQLError::Internal(
112                "table information_schema.schemas not created yet".to_string(),
113            ));
114        };
115
116        let tuple = Tuple::new(
117            SCHEMAS_SCHEMA.clone(),
118            vec![
119                DEFAULT_CATALOG_NAME.to_string().into(),
120                schema_name.clone().into(),
121            ],
122        );
123        let _ = MvccHeap::new(schemas_table.table.clone()).insert(
124            &tuple,
125            SYSTEM_TXN_ID,
126            SYSTEM_COMMAND_ID,
127        )?;
128        Ok(())
129    }
130
131    pub fn create_table(
132        &mut self,
133        table_ref: TableReference,
134        schema: SchemaRef,
135    ) -> QuillSQLResult<Arc<TableHeap>> {
136        let catalog_name = table_ref
137            .catalog()
138            .unwrap_or(DEFAULT_CATALOG_NAME)
139            .to_string();
140        let catalog_schema_name = table_ref
141            .schema()
142            .unwrap_or(DEFAULT_SCHEMA_NAME)
143            .to_string();
144        let table_name = table_ref.table().to_string();
145
146        let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
147            return Err(QuillSQLError::Storage(format!(
148                "catalog schema {} not created yet",
149                catalog_schema_name
150            )));
151        };
152        if catalog_schema.tables.contains_key(table_ref.table()) {
153            return Err(QuillSQLError::Storage(
154                "Cannot create duplicated table".to_string(),
155            ));
156        }
157        let table_heap = Arc::new(TableHeap::try_new(
158            schema.clone(),
159            self.buffer_pool.clone(),
160        )?);
161        let catalog_table = CatalogTable {
162            name: table_name.clone(),
163            table: table_heap.clone(),
164            indexes: HashMap::new(),
165            stats: None,
166        };
167        catalog_schema
168            .tables
169            .insert(table_name.clone(), catalog_table);
170        self.table_registry
171            .register(table_ref.clone(), table_heap.clone());
172
173        // update system table
174        let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
175            return Err(QuillSQLError::Internal(
176                "catalog schema information_schema not created yet".to_string(),
177            ));
178        };
179        let Some(tables_table) = information_schema.tables.get_mut(INFORMATION_SCHEMA_TABLES)
180        else {
181            return Err(QuillSQLError::Internal(
182                "table information_schema.tables not created yet".to_string(),
183            ));
184        };
185
186        let tuple = Tuple::new(
187            TABLES_SCHEMA.clone(),
188            vec![
189                catalog_name.clone().into(),
190                catalog_schema_name.clone().into(),
191                table_name.clone().into(),
192                (table_heap.first_page_id.load(Ordering::SeqCst)).into(),
193            ],
194        );
195        let _ = MvccHeap::new(tables_table.table.clone()).insert(
196            &tuple,
197            SYSTEM_TXN_ID,
198            SYSTEM_COMMAND_ID,
199        )?;
200
201        let Some(columns_table) = information_schema
202            .tables
203            .get_mut(INFORMATION_SCHEMA_COLUMNS)
204        else {
205            return Err(QuillSQLError::Internal(
206                "table information_schema.columns not created yet".to_string(),
207            ));
208        };
209        let columns_mvcc = MvccHeap::new(columns_table.table.clone());
210        for col in schema.columns.iter() {
211            let sql_type: sqlparser::ast::DataType = (&col.data_type).into();
212            let tuple = Tuple::new(
213                COLUMNS_SCHEMA.clone(),
214                vec![
215                    catalog_name.clone().into(),
216                    catalog_schema_name.clone().into(),
217                    table_name.clone().into(),
218                    col.name.clone().into(),
219                    format!("{sql_type}").into(),
220                    col.nullable.into(),
221                    format!("{}", col.default).into(),
222                ],
223            );
224            let _ = columns_mvcc.insert(&tuple, SYSTEM_TXN_ID, SYSTEM_COMMAND_ID)?;
225        }
226
227        Ok(table_heap)
228    }
229
230    pub fn drop_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<bool> {
231        let catalog_name = table_ref
232            .catalog()
233            .unwrap_or(DEFAULT_CATALOG_NAME)
234            .to_string();
235        let schema_name = table_ref
236            .schema()
237            .unwrap_or(DEFAULT_SCHEMA_NAME)
238            .to_string();
239        let table_name = table_ref.table().to_string();
240
241        if schema_name == INFORMATION_SCHEMA_NAME {
242            return Err(QuillSQLError::Execution(
243                "dropping information_schema tables is not allowed".to_string(),
244            ));
245        }
246
247        let Some(schema) = self.schemas.get_mut(&schema_name) else {
248            return Ok(false);
249        };
250
251        let Some(catalog_table) = schema.tables.remove(&table_name) else {
252            return Ok(false);
253        };
254
255        self.table_registry.unregister(table_ref);
256
257        for index_name in catalog_table.indexes.keys() {
258            self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
259        }
260
261        self.remove_table_metadata(&catalog_name, &schema_name, &table_name)?;
262        Ok(true)
263    }
264
265    pub fn table_heap(&self, table_ref: &TableReference) -> QuillSQLResult<Arc<TableHeap>> {
266        let catalog_schema_name = table_ref
267            .schema()
268            .unwrap_or(DEFAULT_SCHEMA_NAME)
269            .to_string();
270        let table_name = table_ref.table().to_string();
271
272        let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
273            return Err(QuillSQLError::Storage(format!(
274                "catalog schema {} not created yet",
275                catalog_schema_name
276            )));
277        };
278        let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
279            return Err(QuillSQLError::Storage(format!(
280                "table {} not created yet",
281                table_name
282            )));
283        };
284        Ok(catalog_table.table.clone())
285    }
286
287    pub fn table_statistics(&self, table_ref: &TableReference) -> Option<&TableStatistics> {
288        let catalog_schema_name = table_ref
289            .schema()
290            .unwrap_or(DEFAULT_SCHEMA_NAME)
291            .to_string();
292        self.schemas
293            .get(&catalog_schema_name)
294            .and_then(|schema| schema.tables.get(table_ref.table()))
295            .and_then(|table| table.stats.as_ref())
296    }
297
298    pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
299        let catalog_schema_name = table_ref
300            .schema()
301            .unwrap_or(DEFAULT_SCHEMA_NAME)
302            .to_string();
303        let table_name = table_ref.table().to_string();
304
305        let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
306            return Err(QuillSQLError::Storage(format!(
307                "catalog schema {} not created yet",
308                catalog_schema_name
309            )));
310        };
311        let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
312            return Err(QuillSQLError::Storage(format!(
313                "table {} not created yet",
314                table_name
315            )));
316        };
317
318        let stats = TableStatistics::analyze(catalog_table.table.clone())?;
319        catalog_table.stats = Some(stats.clone());
320        Ok(stats)
321    }
322
323    pub fn try_table_heap(&self, table_ref: &TableReference) -> Option<Arc<TableHeap>> {
324        let schema_name = table_ref
325            .schema()
326            .unwrap_or(DEFAULT_SCHEMA_NAME)
327            .to_string();
328        self.schemas
329            .get(&schema_name)
330            .and_then(|schema| schema.tables.get(table_ref.table()))
331            .map(|catalog_table| catalog_table.table.clone())
332    }
333
334    pub fn table_indexes(
335        &self,
336        table_ref: &TableReference,
337    ) -> QuillSQLResult<Vec<(String, Arc<BPlusTreeIndex>)>> {
338        let catalog_schema_name = table_ref
339            .schema()
340            .unwrap_or(DEFAULT_SCHEMA_NAME)
341            .to_string();
342        let table_name = table_ref.table().to_string();
343
344        let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
345            return Err(QuillSQLError::Storage(format!(
346                "catalog schema {} not created yet",
347                catalog_schema_name
348            )));
349        };
350        let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
351            return Err(QuillSQLError::Storage(format!(
352                "table {} not created yet",
353                table_name
354            )));
355        };
356        Ok(catalog_table
357            .indexes
358            .iter()
359            .map(|(name, index)| (name.clone(), index.clone()))
360            .collect())
361    }
362
363    pub fn create_index(
364        &mut self,
365        index_name: String,
366        table_ref: &TableReference,
367        key_schema: SchemaRef,
368    ) -> QuillSQLResult<Arc<BPlusTreeIndex>> {
369        let catalog_name = table_ref
370            .catalog()
371            .unwrap_or(DEFAULT_CATALOG_NAME)
372            .to_string();
373        let catalog_schema_name = table_ref
374            .schema()
375            .unwrap_or(DEFAULT_SCHEMA_NAME)
376            .to_string();
377        let table_name = table_ref.table().to_string();
378
379        let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
380            return Err(QuillSQLError::Storage(format!(
381                "catalog schema {} not created yet",
382                catalog_schema_name
383            )));
384        };
385        let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
386            return Err(QuillSQLError::Storage(format!(
387                "table {} not created yet",
388                table_name
389            )));
390        };
391        if catalog_table.indexes.contains_key(&index_name) {
392            return Err(QuillSQLError::Storage(
393                "Cannot create duplicated index".to_string(),
394            ));
395        }
396
397        let b_plus_tree_index = Arc::new(BPlusTreeIndex::new(
398            key_schema.clone(),
399            self.buffer_pool.clone(),
400            BPLUS_INTERNAL_PAGE_MAX_SIZE as u32,
401            BPLUS_LEAF_PAGE_MAX_SIZE as u32,
402        ));
403        catalog_table
404            .indexes
405            .insert(index_name.clone(), b_plus_tree_index.clone());
406
407        // update system table
408        let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
409            return Err(QuillSQLError::Internal(
410                "catalog schema information_schema not created yet".to_string(),
411            ));
412        };
413        let Some(indexes_table) = information_schema
414            .tables
415            .get_mut(INFORMATION_SCHEMA_INDEXES)
416        else {
417            return Err(QuillSQLError::Internal(
418                "table information_schema.indexes not created yet".to_string(),
419            ));
420        };
421
422        let tuple = Tuple::new(
423            INDEXES_SCHEMA.clone(),
424            vec![
425                catalog_name.clone().into(),
426                catalog_schema_name.clone().into(),
427                table_name.clone().into(),
428                index_name.clone().into(),
429                key_schema_to_varchar(&b_plus_tree_index.key_schema).into(),
430                b_plus_tree_index.internal_max_size.into(),
431                b_plus_tree_index.leaf_max_size.into(),
432                b_plus_tree_index.header_page_id.into(),
433            ],
434        );
435        let _ = MvccHeap::new(indexes_table.table.clone()).insert(
436            &tuple,
437            SYSTEM_TXN_ID,
438            SYSTEM_COMMAND_ID,
439        )?;
440
441        Ok(b_plus_tree_index)
442    }
443
444    pub fn drop_index(
445        &mut self,
446        table_ref: &TableReference,
447        index_name: &str,
448    ) -> QuillSQLResult<bool> {
449        let catalog_name = table_ref
450            .catalog()
451            .unwrap_or(DEFAULT_CATALOG_NAME)
452            .to_string();
453        let schema_name = table_ref
454            .schema()
455            .unwrap_or(DEFAULT_SCHEMA_NAME)
456            .to_string();
457        let table_name = table_ref.table().to_string();
458
459        if schema_name == INFORMATION_SCHEMA_NAME {
460            return Err(QuillSQLError::Execution(
461                "dropping indexes on information_schema tables is not allowed".to_string(),
462            ));
463        }
464
465        let Some(schema) = self.schemas.get_mut(&schema_name) else {
466            return Ok(false);
467        };
468        let Some(table) = schema.tables.get_mut(&table_name) else {
469            return Ok(false);
470        };
471
472        if table.indexes.remove(index_name).is_none() {
473            return Ok(false);
474        }
475
476        self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
477
478        Ok(true)
479    }
480
481    pub fn find_index_owner(
482        &self,
483        catalog_hint: Option<&str>,
484        schema_hint: Option<&str>,
485        index_name: &str,
486    ) -> Option<TableReference> {
487        let catalog_name = catalog_hint.unwrap_or(DEFAULT_CATALOG_NAME);
488
489        if let Some(schema_name) = schema_hint {
490            return self.find_index_in_schema(catalog_name, schema_name, index_name);
491        }
492
493        for schema_name in self.schemas.keys() {
494            if schema_name == INFORMATION_SCHEMA_NAME {
495                continue;
496            }
497            if let Some(table_ref) =
498                self.find_index_in_schema(catalog_name, schema_name, index_name)
499            {
500                return Some(table_ref);
501            }
502        }
503        None
504    }
505
506    pub fn index(
507        &self,
508        table_ref: &TableReference,
509        index_name: &str,
510    ) -> QuillSQLResult<Option<Arc<BPlusTreeIndex>>> {
511        let catalog_schema_name = table_ref
512            .schema()
513            .unwrap_or(DEFAULT_SCHEMA_NAME)
514            .to_string();
515        let table_name = table_ref.table().to_string();
516
517        let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
518            return Err(QuillSQLError::Storage(format!(
519                "catalog schema {} not created yet",
520                catalog_schema_name
521            )));
522        };
523        let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
524            return Err(QuillSQLError::Storage(format!(
525                "table {} not created yet",
526                table_name
527            )));
528        };
529        Ok(catalog_table.indexes.get(index_name).cloned())
530    }
531
532    pub fn load_schema(&mut self, name: impl Into<String>, schema: CatalogSchema) {
533        self.schemas.insert(name.into(), schema);
534    }
535
536    pub fn load_table(
537        &mut self,
538        table_ref: TableReference,
539        table: CatalogTable,
540    ) -> QuillSQLResult<()> {
541        let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
542        let table_name = table_ref.table().to_string();
543        let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
544            return Err(QuillSQLError::Storage(format!(
545                "catalog schema {} not created yet",
546                catalog_schema_name
547            )));
548        };
549        self.table_registry
550            .register(table_ref.clone(), table.table.clone());
551        catalog_schema.tables.insert(table_name, table);
552        Ok(())
553    }
554
555    pub fn load_index(
556        &mut self,
557        table_ref: TableReference,
558        index_name: impl Into<String>,
559        index: Arc<BPlusTreeIndex>,
560    ) -> QuillSQLResult<()> {
561        let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
562        let table_name = table_ref.table().to_string();
563        let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
564            return Err(QuillSQLError::Storage(format!(
565                "catalog schema {} not created yet",
566                catalog_schema_name
567            )));
568        };
569        let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
570            return Err(QuillSQLError::Storage(format!(
571                "catalog table {} not created yet",
572                table_name
573            )));
574        };
575        let idx_name: String = index_name.into();
576        catalog_table.indexes.insert(idx_name.clone(), index);
577        Ok(())
578    }
579
580    fn find_index_in_schema(
581        &self,
582        catalog_name: &str,
583        schema_name: &str,
584        index_name: &str,
585    ) -> Option<TableReference> {
586        if schema_name == INFORMATION_SCHEMA_NAME {
587            return None;
588        }
589        let schema = self.schemas.get(schema_name)?;
590        for (table_name, table) in &schema.tables {
591            if table.indexes.contains_key(index_name) {
592                if catalog_name == DEFAULT_CATALOG_NAME {
593                    if schema_name == DEFAULT_SCHEMA_NAME {
594                        return Some(TableReference::Bare {
595                            table: table_name.clone(),
596                        });
597                    }
598                    return Some(TableReference::Partial {
599                        schema: schema_name.to_string(),
600                        table: table_name.clone(),
601                    });
602                }
603                return Some(TableReference::Full {
604                    catalog: catalog_name.to_string(),
605                    schema: schema_name.to_string(),
606                    table: table_name.clone(),
607                });
608            }
609        }
610        None
611    }
612
613    fn remove_table_metadata(
614        &mut self,
615        catalog_name: &str,
616        schema_name: &str,
617        table_name: &str,
618    ) -> QuillSQLResult<()> {
619        let (tables_heap, columns_heap, indexes_heap) = {
620            let information_schema =
621                self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
622                    QuillSQLError::Internal(
623                        "catalog schema information_schema not created yet".to_string(),
624                    )
625                })?;
626            let tables_heap = information_schema
627                .tables
628                .get(INFORMATION_SCHEMA_TABLES)
629                .ok_or_else(|| {
630                    QuillSQLError::Internal(
631                        "table information_schema.tables not created yet".to_string(),
632                    )
633                })?
634                .table
635                .clone();
636            let columns_heap = information_schema
637                .tables
638                .get(INFORMATION_SCHEMA_COLUMNS)
639                .ok_or_else(|| {
640                    QuillSQLError::Internal(
641                        "table information_schema.columns not created yet".to_string(),
642                    )
643                })?
644                .table
645                .clone();
646            let indexes_heap = information_schema
647                .tables
648                .get(INFORMATION_SCHEMA_INDEXES)
649                .ok_or_else(|| {
650                    QuillSQLError::Internal(
651                        "table information_schema.indexes not created yet".to_string(),
652                    )
653                })?
654                .table
655                .clone();
656            (tables_heap, columns_heap, indexes_heap)
657        };
658
659        Self::delete_matching_rows(tables_heap, |tuple| {
660            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
661                return Ok(false);
662            };
663            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
664                return Ok(false);
665            };
666            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
667                return Ok(false);
668            };
669            Ok(catalog == catalog_name && schema == schema_name && table == table_name)
670        })?;
671
672        Self::delete_matching_rows(columns_heap, |tuple| {
673            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
674                return Ok(false);
675            };
676            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
677                return Ok(false);
678            };
679            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
680                return Ok(false);
681            };
682            Ok(catalog == catalog_name && schema == schema_name && table == table_name)
683        })?;
684
685        Self::delete_matching_rows(indexes_heap, |tuple| {
686            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
687                return Ok(false);
688            };
689            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
690                return Ok(false);
691            };
692            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
693                return Ok(false);
694            };
695            Ok(catalog == catalog_name && schema == schema_name && table == table_name)
696        })?;
697
698        Ok(())
699    }
700
701    fn remove_index_metadata(
702        &mut self,
703        catalog_name: &str,
704        schema_name: &str,
705        table_name: &str,
706        index_name: &str,
707    ) -> QuillSQLResult<()> {
708        let indexes_heap = {
709            let information_schema =
710                self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
711                    QuillSQLError::Internal(
712                        "catalog schema information_schema not created yet".to_string(),
713                    )
714                })?;
715            information_schema
716                .tables
717                .get(INFORMATION_SCHEMA_INDEXES)
718                .ok_or_else(|| {
719                    QuillSQLError::Internal(
720                        "table information_schema.indexes not created yet".to_string(),
721                    )
722                })?
723                .table
724                .clone()
725        };
726
727        Self::delete_matching_rows(indexes_heap, |tuple| {
728            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
729                return Ok(false);
730            };
731            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
732                return Ok(false);
733            };
734            let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
735                return Ok(false);
736            };
737            let ScalarValue::Varchar(Some(index)) = tuple.value(3)? else {
738                return Ok(false);
739            };
740            Ok(catalog == catalog_name
741                && schema == schema_name
742                && table == table_name
743                && index == index_name)
744        })?;
745
746        Ok(())
747    }
748
749    fn delete_matching_rows<F>(heap: Arc<TableHeap>, mut predicate: F) -> QuillSQLResult<()>
750    where
751        F: FnMut(&Tuple) -> QuillSQLResult<bool>,
752    {
753        let mvcc = MvccHeap::new(heap.clone());
754        let mut iterator = TableIterator::new(heap.clone(), ..);
755        while let Some((rid, _meta, tuple)) = iterator.next()? {
756            if predicate(&tuple)? {
757                let _ = mvcc.mark_deleted(rid, SYSTEM_TXN_ID, SYSTEM_COMMAND_ID)?;
758            }
759        }
760        Ok(())
761    }
762}
763
764#[cfg(test)]
765mod tests {
766    use std::sync::Arc;
767
768    use crate::utils::table_ref::TableReference;
769    use crate::{
770        catalog::{Column, DataType, Schema},
771        database::Database,
772    };
773
774    #[test]
775    pub fn test_catalog_create_table() {
776        let mut db = Database::new_temp().unwrap();
777
778        let table_ref1 = TableReference::Bare {
779            table: "test_table1".to_string(),
780        };
781        let schema = Arc::new(Schema::new(vec![
782            Column::new("a", DataType::Int8, true),
783            Column::new("b", DataType::Int16, true),
784            Column::new("c", DataType::Int32, true),
785        ]));
786        let table_info = db
787            .catalog
788            .create_table(table_ref1.clone(), schema.clone())
789            .unwrap();
790        assert_eq!(table_info.schema, schema);
791
792        let table_ref2 = TableReference::Bare {
793            table: "test_table2".to_string(),
794        };
795        let schema = Arc::new(Schema::new(vec![
796            Column::new("d", DataType::Int32, true),
797            Column::new("e", DataType::Int16, true),
798            Column::new("f", DataType::Int8, true),
799        ]));
800        let table_info = db
801            .catalog
802            .create_table(table_ref2.clone(), schema.clone())
803            .unwrap();
804        assert_eq!(table_info.schema, schema);
805
806        let table_info = db.catalog.table_heap(&table_ref1).unwrap();
807        assert_eq!(table_info.schema.column_count(), 3);
808
809        let table_info = db.catalog.table_heap(&table_ref2).unwrap();
810        assert_eq!(table_info.schema.column_count(), 3);
811    }
812
813    #[test]
814    pub fn test_catalog_create_index() {
815        let mut db = Database::new_temp().unwrap();
816
817        let table_ref = TableReference::Bare {
818            table: "test_table1".to_string(),
819        };
820        let schema = Arc::new(Schema::new(vec![
821            Column::new("a", DataType::Int8, true),
822            Column::new("b", DataType::Int16, true),
823            Column::new("c", DataType::Int32, true),
824        ]));
825        let _ = db.catalog.create_table(table_ref.clone(), schema.clone());
826
827        let index_name1 = "test_index1".to_string();
828        let key_schema1 = Arc::new(schema.project(&[0, 2]).unwrap());
829        let index1 = db
830            .catalog
831            .create_index(index_name1.clone(), &table_ref, key_schema1.clone())
832            .unwrap();
833        assert_eq!(index1.key_schema, key_schema1);
834
835        let index_name2 = "test_index2".to_string();
836        let key_schema2 = Arc::new(schema.project(&[1]).unwrap());
837        let index2 = db
838            .catalog
839            .create_index(index_name2.clone(), &table_ref, key_schema2.clone())
840            .unwrap();
841        assert_eq!(index2.key_schema, key_schema2);
842
843        let index3 = db
844            .catalog
845            .index(&table_ref, index_name1.as_str())
846            .unwrap()
847            .unwrap();
848        assert_eq!(index3.key_schema, key_schema1);
849    }
850
851    #[test]
852    fn analyze_table_collects_basic_stats() {
853        let mut db = Database::new_temp().unwrap();
854        db.run("create table t_stats (a int, b int)").unwrap();
855        db.run("insert into t_stats values (1, 10), (null, 20), (2, null)")
856            .unwrap();
857
858        let table_ref = TableReference::Bare {
859            table: "t_stats".to_string(),
860        };
861        let stats = db.catalog.analyze_table(&table_ref).unwrap();
862        assert_eq!(stats.row_count, 3);
863        let col_a = stats.column_stats.get("a").unwrap();
864        assert_eq!(col_a.null_count, 1);
865        assert_eq!(col_a.non_null_count, 2);
866        let stored = db.catalog.table_statistics(&table_ref).unwrap();
867        assert_eq!(stored.row_count, 3);
868    }
869}