1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use super::registry::{global_index_registry, global_table_registry};
6use crate::catalog::{
7 key_schema_to_varchar, SchemaRef, COLUMNS_SCHEMA, INDEXES_SCHEMA, INFORMATION_SCHEMA_COLUMNS,
8 INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMAS,
9 INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHEMA, TABLES_SCHEMA,
10};
11use crate::storage::disk_manager::DiskManager;
12use crate::storage::page::{
13 BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE, EMPTY_TUPLE_META,
14};
15use crate::storage::table_heap::{TableHeap, TableIterator};
16use crate::storage::tuple::Tuple;
17use crate::transaction::{CommandId, TransactionId};
18use crate::utils::scalar::ScalarValue;
19use crate::utils::table_ref::TableReference;
20use crate::{
21 buffer::BufferManager,
22 error::{QuillSQLError, QuillSQLResult},
23 storage::index::btree_index::BPlusTreeIndex,
24};
25
26pub static DEFAULT_CATALOG_NAME: &str = "quillsql";
27pub static DEFAULT_SCHEMA_NAME: &str = "public";
28
29#[derive(Debug)]
30pub struct Catalog {
31 pub schemas: HashMap<String, CatalogSchema>,
32 pub buffer_pool: Arc<BufferManager>,
33 pub disk_manager: Arc<DiskManager>,
34}
35
36#[derive(Debug)]
37pub struct CatalogSchema {
38 pub name: String,
39 pub tables: HashMap<String, CatalogTable>,
40}
41
42impl CatalogSchema {
43 pub fn new(name: impl Into<String>) -> Self {
44 Self {
45 name: name.into(),
46 tables: HashMap::new(),
47 }
48 }
49}
50
51#[derive(Debug)]
52pub struct CatalogTable {
53 pub name: String,
54 pub table: Arc<TableHeap>,
55 pub indexes: HashMap<String, Arc<BPlusTreeIndex>>,
56}
57
58impl CatalogTable {
59 pub fn new(name: impl Into<String>, table: Arc<TableHeap>) -> Self {
60 Self {
61 name: name.into(),
62 table,
63 indexes: HashMap::new(),
64 }
65 }
66}
67
68const SYSTEM_TXN_ID: TransactionId = 0;
69const SYSTEM_COMMAND_ID: CommandId = 0;
70
71impl Catalog {
72 pub fn new(buffer_pool: Arc<BufferManager>, disk_manager: Arc<DiskManager>) -> Self {
73 Self {
74 schemas: HashMap::new(),
75 buffer_pool,
76 disk_manager,
77 }
78 }
79
80 pub fn create_schema(&mut self, schema_name: impl Into<String>) -> QuillSQLResult<()> {
81 let schema_name = schema_name.into();
82 if self.schemas.contains_key(&schema_name) {
83 return Err(QuillSQLError::Storage(
84 "Cannot create duplicated schema".to_string(),
85 ));
86 }
87 self.schemas
88 .insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));
89
90 let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
92 return Err(QuillSQLError::Internal(
93 "catalog schema information_schema not created yet".to_string(),
94 ));
95 };
96 let Some(schemas_table) = information_schema
97 .tables
98 .get_mut(INFORMATION_SCHEMA_SCHEMAS)
99 else {
100 return Err(QuillSQLError::Internal(
101 "table information_schema.schemas not created yet".to_string(),
102 ));
103 };
104
105 let tuple = Tuple::new(
106 SCHEMAS_SCHEMA.clone(),
107 vec![
108 DEFAULT_CATALOG_NAME.to_string().into(),
109 schema_name.clone().into(),
110 ],
111 );
112 schemas_table
113 .table
114 .insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
115 Ok(())
116 }
117
118 pub fn create_table(
119 &mut self,
120 table_ref: TableReference,
121 schema: SchemaRef,
122 ) -> QuillSQLResult<Arc<TableHeap>> {
123 let catalog_name = table_ref
124 .catalog()
125 .unwrap_or(DEFAULT_CATALOG_NAME)
126 .to_string();
127 let catalog_schema_name = table_ref
128 .schema()
129 .unwrap_or(DEFAULT_SCHEMA_NAME)
130 .to_string();
131 let table_name = table_ref.table().to_string();
132
133 let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
134 return Err(QuillSQLError::Storage(format!(
135 "catalog schema {} not created yet",
136 catalog_schema_name
137 )));
138 };
139 if catalog_schema.tables.contains_key(table_ref.table()) {
140 return Err(QuillSQLError::Storage(
141 "Cannot create duplicated table".to_string(),
142 ));
143 }
144 let table_heap = Arc::new(TableHeap::try_new(
145 schema.clone(),
146 self.buffer_pool.clone(),
147 )?);
148 let catalog_table = CatalogTable {
149 name: table_name.clone(),
150 table: table_heap.clone(),
151 indexes: HashMap::new(),
152 };
153 catalog_schema
154 .tables
155 .insert(table_name.clone(), catalog_table);
156 global_table_registry().register(table_ref.clone(), table_heap.clone());
157
158 let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
160 return Err(QuillSQLError::Internal(
161 "catalog schema information_schema not created yet".to_string(),
162 ));
163 };
164 let Some(tables_table) = information_schema.tables.get_mut(INFORMATION_SCHEMA_TABLES)
165 else {
166 return Err(QuillSQLError::Internal(
167 "table information_schema.tables not created yet".to_string(),
168 ));
169 };
170
171 let tuple = Tuple::new(
172 TABLES_SCHEMA.clone(),
173 vec![
174 catalog_name.clone().into(),
175 catalog_schema_name.clone().into(),
176 table_name.clone().into(),
177 (table_heap.first_page_id.load(Ordering::SeqCst)).into(),
178 ],
179 );
180 tables_table.table.insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
181
182 let Some(columns_table) = information_schema
183 .tables
184 .get_mut(INFORMATION_SCHEMA_COLUMNS)
185 else {
186 return Err(QuillSQLError::Internal(
187 "table information_schema.columns not created yet".to_string(),
188 ));
189 };
190 for col in schema.columns.iter() {
191 let sql_type: sqlparser::ast::DataType = (&col.data_type).into();
192 let tuple = Tuple::new(
193 COLUMNS_SCHEMA.clone(),
194 vec![
195 catalog_name.clone().into(),
196 catalog_schema_name.clone().into(),
197 table_name.clone().into(),
198 col.name.clone().into(),
199 format!("{sql_type}").into(),
200 col.nullable.into(),
201 format!("{}", col.default).into(),
202 ],
203 );
204 columns_table
205 .table
206 .insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
207 }
208
209 Ok(table_heap)
210 }
211
212 pub fn drop_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<bool> {
213 let catalog_name = table_ref
214 .catalog()
215 .unwrap_or(DEFAULT_CATALOG_NAME)
216 .to_string();
217 let schema_name = table_ref
218 .schema()
219 .unwrap_or(DEFAULT_SCHEMA_NAME)
220 .to_string();
221 let table_name = table_ref.table().to_string();
222
223 if schema_name == INFORMATION_SCHEMA_NAME {
224 return Err(QuillSQLError::Execution(
225 "dropping information_schema tables is not allowed".to_string(),
226 ));
227 }
228
229 let Some(schema) = self.schemas.get_mut(&schema_name) else {
230 return Ok(false);
231 };
232
233 let Some(catalog_table) = schema.tables.remove(&table_name) else {
234 return Ok(false);
235 };
236
237 global_table_registry().unregister(table_ref);
238
239 for index_name in catalog_table.indexes.keys() {
240 self.unregister_index_variants(
241 &catalog_name,
242 &schema_name,
243 &table_name,
244 table_ref,
245 index_name,
246 );
247 }
248
249 self.remove_table_metadata(&catalog_name, &schema_name, &table_name)?;
250 Ok(true)
251 }
252
253 pub fn table_heap(&self, table_ref: &TableReference) -> QuillSQLResult<Arc<TableHeap>> {
254 let catalog_schema_name = table_ref
255 .schema()
256 .unwrap_or(DEFAULT_SCHEMA_NAME)
257 .to_string();
258 let table_name = table_ref.table().to_string();
259
260 let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
261 return Err(QuillSQLError::Storage(format!(
262 "catalog schema {} not created yet",
263 catalog_schema_name
264 )));
265 };
266 let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
267 return Err(QuillSQLError::Storage(format!(
268 "table {} not created yet",
269 table_name
270 )));
271 };
272 Ok(catalog_table.table.clone())
273 }
274
275 pub fn try_table_heap(&self, table_ref: &TableReference) -> Option<Arc<TableHeap>> {
276 let schema_name = table_ref
277 .schema()
278 .unwrap_or(DEFAULT_SCHEMA_NAME)
279 .to_string();
280 self.schemas
281 .get(&schema_name)
282 .and_then(|schema| schema.tables.get(table_ref.table()))
283 .map(|catalog_table| catalog_table.table.clone())
284 }
285
286 pub fn table_indexes(
287 &self,
288 table_ref: &TableReference,
289 ) -> QuillSQLResult<Vec<Arc<BPlusTreeIndex>>> {
290 let catalog_schema_name = table_ref
291 .schema()
292 .unwrap_or(DEFAULT_SCHEMA_NAME)
293 .to_string();
294 let table_name = table_ref.table().to_string();
295
296 let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
297 return Err(QuillSQLError::Storage(format!(
298 "catalog schema {} not created yet",
299 catalog_schema_name
300 )));
301 };
302 let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
303 return Err(QuillSQLError::Storage(format!(
304 "table {} not created yet",
305 table_name
306 )));
307 };
308 Ok(catalog_table.indexes.values().cloned().collect())
309 }
310
311 pub fn create_index(
312 &mut self,
313 index_name: String,
314 table_ref: &TableReference,
315 key_schema: SchemaRef,
316 ) -> QuillSQLResult<Arc<BPlusTreeIndex>> {
317 let catalog_name = table_ref
318 .catalog()
319 .unwrap_or(DEFAULT_CATALOG_NAME)
320 .to_string();
321 let catalog_schema_name = table_ref
322 .schema()
323 .unwrap_or(DEFAULT_SCHEMA_NAME)
324 .to_string();
325 let table_name = table_ref.table().to_string();
326
327 let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
328 return Err(QuillSQLError::Storage(format!(
329 "catalog schema {} not created yet",
330 catalog_schema_name
331 )));
332 };
333 let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
334 return Err(QuillSQLError::Storage(format!(
335 "table {} not created yet",
336 table_name
337 )));
338 };
339 if catalog_table.indexes.contains_key(&index_name) {
340 return Err(QuillSQLError::Storage(
341 "Cannot create duplicated index".to_string(),
342 ));
343 }
344
345 let b_plus_tree_index = Arc::new(BPlusTreeIndex::new(
346 key_schema.clone(),
347 self.buffer_pool.clone(),
348 BPLUS_INTERNAL_PAGE_MAX_SIZE as u32,
349 BPLUS_LEAF_PAGE_MAX_SIZE as u32,
350 ));
351 catalog_table
352 .indexes
353 .insert(index_name.clone(), b_plus_tree_index.clone());
354 let table_heap = catalog_table.table.clone();
356 global_index_registry().register(
357 table_ref.clone(),
358 index_name.clone(),
359 b_plus_tree_index.clone(),
360 table_heap,
361 );
362
363 let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
365 return Err(QuillSQLError::Internal(
366 "catalog schema information_schema not created yet".to_string(),
367 ));
368 };
369 let Some(indexes_table) = information_schema
370 .tables
371 .get_mut(INFORMATION_SCHEMA_INDEXES)
372 else {
373 return Err(QuillSQLError::Internal(
374 "table information_schema.indexes not created yet".to_string(),
375 ));
376 };
377
378 let tuple = Tuple::new(
379 INDEXES_SCHEMA.clone(),
380 vec![
381 catalog_name.clone().into(),
382 catalog_schema_name.clone().into(),
383 table_name.clone().into(),
384 index_name.clone().into(),
385 key_schema_to_varchar(&b_plus_tree_index.key_schema).into(),
386 b_plus_tree_index.internal_max_size.into(),
387 b_plus_tree_index.leaf_max_size.into(),
388 b_plus_tree_index.header_page_id.into(),
389 ],
390 );
391 indexes_table
392 .table
393 .insert_tuple(&EMPTY_TUPLE_META, &tuple)?;
394
395 Ok(b_plus_tree_index)
396 }
397
398 pub fn drop_index(
399 &mut self,
400 table_ref: &TableReference,
401 index_name: &str,
402 ) -> QuillSQLResult<bool> {
403 let catalog_name = table_ref
404 .catalog()
405 .unwrap_or(DEFAULT_CATALOG_NAME)
406 .to_string();
407 let schema_name = table_ref
408 .schema()
409 .unwrap_or(DEFAULT_SCHEMA_NAME)
410 .to_string();
411 let table_name = table_ref.table().to_string();
412
413 if schema_name == INFORMATION_SCHEMA_NAME {
414 return Err(QuillSQLError::Execution(
415 "dropping indexes on information_schema tables is not allowed".to_string(),
416 ));
417 }
418
419 let Some(schema) = self.schemas.get_mut(&schema_name) else {
420 return Ok(false);
421 };
422 let Some(table) = schema.tables.get_mut(&table_name) else {
423 return Ok(false);
424 };
425
426 if table.indexes.remove(index_name).is_none() {
427 return Ok(false);
428 }
429
430 self.unregister_index_variants(
431 &catalog_name,
432 &schema_name,
433 &table_name,
434 table_ref,
435 index_name,
436 );
437
438 self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
439
440 Ok(true)
441 }
442
443 pub fn find_index_owner(
444 &self,
445 catalog_hint: Option<&str>,
446 schema_hint: Option<&str>,
447 index_name: &str,
448 ) -> Option<TableReference> {
449 let catalog_name = catalog_hint.unwrap_or(DEFAULT_CATALOG_NAME);
450
451 if let Some(schema_name) = schema_hint {
452 return self.find_index_in_schema(catalog_name, schema_name, index_name);
453 }
454
455 for schema_name in self.schemas.keys() {
456 if schema_name == INFORMATION_SCHEMA_NAME {
457 continue;
458 }
459 if let Some(table_ref) =
460 self.find_index_in_schema(catalog_name, schema_name, index_name)
461 {
462 return Some(table_ref);
463 }
464 }
465 None
466 }
467
468 pub fn index(
469 &self,
470 table_ref: &TableReference,
471 index_name: &str,
472 ) -> QuillSQLResult<Option<Arc<BPlusTreeIndex>>> {
473 let catalog_schema_name = table_ref
474 .schema()
475 .unwrap_or(DEFAULT_SCHEMA_NAME)
476 .to_string();
477 let table_name = table_ref.table().to_string();
478
479 let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
480 return Err(QuillSQLError::Storage(format!(
481 "catalog schema {} not created yet",
482 catalog_schema_name
483 )));
484 };
485 let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
486 return Err(QuillSQLError::Storage(format!(
487 "table {} not created yet",
488 table_name
489 )));
490 };
491 Ok(catalog_table.indexes.get(index_name).cloned())
492 }
493
494 pub fn load_schema(&mut self, name: impl Into<String>, schema: CatalogSchema) {
495 self.schemas.insert(name.into(), schema);
496 }
497
498 pub fn load_table(
499 &mut self,
500 table_ref: TableReference,
501 table: CatalogTable,
502 ) -> QuillSQLResult<()> {
503 let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
504 let table_name = table_ref.table().to_string();
505 let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
506 return Err(QuillSQLError::Storage(format!(
507 "catalog schema {} not created yet",
508 catalog_schema_name
509 )));
510 };
511 global_table_registry().register(table_ref.clone(), table.table.clone());
512 catalog_schema.tables.insert(table_name, table);
513 Ok(())
514 }
515
516 pub fn load_index(
517 &mut self,
518 table_ref: TableReference,
519 index_name: impl Into<String>,
520 index: Arc<BPlusTreeIndex>,
521 ) -> QuillSQLResult<()> {
522 let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
523 let table_name = table_ref.table().to_string();
524 let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
525 return Err(QuillSQLError::Storage(format!(
526 "catalog schema {} not created yet",
527 catalog_schema_name
528 )));
529 };
530 let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
531 return Err(QuillSQLError::Storage(format!(
532 "catalog table {} not created yet",
533 table_name
534 )));
535 };
536 let idx_name: String = index_name.into();
537 catalog_table.indexes.insert(idx_name.clone(), index);
538 if let Some(idx) = catalog_table.indexes.get(&idx_name) {
540 let table_heap = catalog_table.table.clone();
541 global_index_registry().register(table_ref.clone(), idx_name, idx.clone(), table_heap);
542 }
543 Ok(())
544 }
545
546 fn find_index_in_schema(
547 &self,
548 catalog_name: &str,
549 schema_name: &str,
550 index_name: &str,
551 ) -> Option<TableReference> {
552 if schema_name == INFORMATION_SCHEMA_NAME {
553 return None;
554 }
555 let schema = self.schemas.get(schema_name)?;
556 for (table_name, table) in &schema.tables {
557 if table.indexes.contains_key(index_name) {
558 if catalog_name == DEFAULT_CATALOG_NAME {
559 if schema_name == DEFAULT_SCHEMA_NAME {
560 return Some(TableReference::Bare {
561 table: table_name.clone(),
562 });
563 }
564 return Some(TableReference::Partial {
565 schema: schema_name.to_string(),
566 table: table_name.clone(),
567 });
568 }
569 return Some(TableReference::Full {
570 catalog: catalog_name.to_string(),
571 schema: schema_name.to_string(),
572 table: table_name.clone(),
573 });
574 }
575 }
576 None
577 }
578
579 fn remove_table_metadata(
580 &mut self,
581 catalog_name: &str,
582 schema_name: &str,
583 table_name: &str,
584 ) -> QuillSQLResult<()> {
585 let (tables_heap, columns_heap, indexes_heap) = {
586 let information_schema =
587 self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
588 QuillSQLError::Internal(
589 "catalog schema information_schema not created yet".to_string(),
590 )
591 })?;
592 let tables_heap = information_schema
593 .tables
594 .get(INFORMATION_SCHEMA_TABLES)
595 .ok_or_else(|| {
596 QuillSQLError::Internal(
597 "table information_schema.tables not created yet".to_string(),
598 )
599 })?
600 .table
601 .clone();
602 let columns_heap = information_schema
603 .tables
604 .get(INFORMATION_SCHEMA_COLUMNS)
605 .ok_or_else(|| {
606 QuillSQLError::Internal(
607 "table information_schema.columns not created yet".to_string(),
608 )
609 })?
610 .table
611 .clone();
612 let indexes_heap = information_schema
613 .tables
614 .get(INFORMATION_SCHEMA_INDEXES)
615 .ok_or_else(|| {
616 QuillSQLError::Internal(
617 "table information_schema.indexes not created yet".to_string(),
618 )
619 })?
620 .table
621 .clone();
622 (tables_heap, columns_heap, indexes_heap)
623 };
624
625 Self::delete_matching_rows(tables_heap, |tuple| {
626 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
627 return Ok(false);
628 };
629 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
630 return Ok(false);
631 };
632 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
633 return Ok(false);
634 };
635 Ok(catalog == catalog_name && schema == schema_name && table == table_name)
636 })?;
637
638 Self::delete_matching_rows(columns_heap, |tuple| {
639 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
640 return Ok(false);
641 };
642 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
643 return Ok(false);
644 };
645 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
646 return Ok(false);
647 };
648 Ok(catalog == catalog_name && schema == schema_name && table == table_name)
649 })?;
650
651 Self::delete_matching_rows(indexes_heap, |tuple| {
652 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
653 return Ok(false);
654 };
655 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
656 return Ok(false);
657 };
658 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
659 return Ok(false);
660 };
661 Ok(catalog == catalog_name && schema == schema_name && table == table_name)
662 })?;
663
664 Ok(())
665 }
666
667 fn remove_index_metadata(
668 &mut self,
669 catalog_name: &str,
670 schema_name: &str,
671 table_name: &str,
672 index_name: &str,
673 ) -> QuillSQLResult<()> {
674 let indexes_heap = {
675 let information_schema =
676 self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
677 QuillSQLError::Internal(
678 "catalog schema information_schema not created yet".to_string(),
679 )
680 })?;
681 information_schema
682 .tables
683 .get(INFORMATION_SCHEMA_INDEXES)
684 .ok_or_else(|| {
685 QuillSQLError::Internal(
686 "table information_schema.indexes not created yet".to_string(),
687 )
688 })?
689 .table
690 .clone()
691 };
692
693 Self::delete_matching_rows(indexes_heap, |tuple| {
694 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
695 return Ok(false);
696 };
697 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
698 return Ok(false);
699 };
700 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
701 return Ok(false);
702 };
703 let ScalarValue::Varchar(Some(index)) = tuple.value(3)? else {
704 return Ok(false);
705 };
706 Ok(catalog == catalog_name
707 && schema == schema_name
708 && table == table_name
709 && index == index_name)
710 })?;
711
712 Ok(())
713 }
714
715 fn delete_matching_rows<F>(heap: Arc<TableHeap>, mut predicate: F) -> QuillSQLResult<()>
716 where
717 F: FnMut(&Tuple) -> QuillSQLResult<bool>,
718 {
719 let mut iterator = TableIterator::new(heap.clone(), ..);
720 while let Some((rid, _meta, tuple)) = iterator.next()? {
721 if predicate(&tuple)? {
722 heap.delete_tuple(rid, SYSTEM_TXN_ID, SYSTEM_COMMAND_ID)?;
723 }
724 }
725 Ok(())
726 }
727
728 fn unregister_index_variants(
729 &self,
730 catalog_name: &str,
731 schema_name: &str,
732 table_name: &str,
733 original_ref: &TableReference,
734 index_name: &str,
735 ) {
736 let registry = global_index_registry();
737 registry.unregister(original_ref, index_name);
738 registry.unregister(
739 &TableReference::Bare {
740 table: table_name.to_string(),
741 },
742 index_name,
743 );
744 registry.unregister(
745 &TableReference::Partial {
746 schema: schema_name.to_string(),
747 table: table_name.to_string(),
748 },
749 index_name,
750 );
751 registry.unregister(
752 &TableReference::Full {
753 catalog: catalog_name.to_string(),
754 schema: schema_name.to_string(),
755 table: table_name.to_string(),
756 },
757 index_name,
758 );
759 }
760}
761
762#[cfg(test)]
763mod tests {
764 use std::sync::Arc;
765
766 use crate::utils::table_ref::TableReference;
767 use crate::{
768 catalog::{Column, DataType, Schema},
769 database::Database,
770 };
771
772 #[test]
773 pub fn test_catalog_create_table() {
774 let mut db = Database::new_temp().unwrap();
775
776 let table_ref1 = TableReference::Bare {
777 table: "test_table1".to_string(),
778 };
779 let schema = Arc::new(Schema::new(vec![
780 Column::new("a", DataType::Int8, true),
781 Column::new("b", DataType::Int16, true),
782 Column::new("c", DataType::Int32, true),
783 ]));
784 let table_info = db
785 .catalog
786 .create_table(table_ref1.clone(), schema.clone())
787 .unwrap();
788 assert_eq!(table_info.schema, schema);
789
790 let table_ref2 = TableReference::Bare {
791 table: "test_table2".to_string(),
792 };
793 let schema = Arc::new(Schema::new(vec![
794 Column::new("d", DataType::Int32, true),
795 Column::new("e", DataType::Int16, true),
796 Column::new("f", DataType::Int8, true),
797 ]));
798 let table_info = db
799 .catalog
800 .create_table(table_ref2.clone(), schema.clone())
801 .unwrap();
802 assert_eq!(table_info.schema, schema);
803
804 let table_info = db.catalog.table_heap(&table_ref1).unwrap();
805 assert_eq!(table_info.schema.column_count(), 3);
806
807 let table_info = db.catalog.table_heap(&table_ref2).unwrap();
808 assert_eq!(table_info.schema.column_count(), 3);
809 }
810
811 #[test]
812 pub fn test_catalog_create_index() {
813 let mut db = Database::new_temp().unwrap();
814
815 let table_ref = TableReference::Bare {
816 table: "test_table1".to_string(),
817 };
818 let schema = Arc::new(Schema::new(vec![
819 Column::new("a", DataType::Int8, true),
820 Column::new("b", DataType::Int16, true),
821 Column::new("c", DataType::Int32, true),
822 ]));
823 let _ = db.catalog.create_table(table_ref.clone(), schema.clone());
824
825 let index_name1 = "test_index1".to_string();
826 let key_schema1 = Arc::new(schema.project(&[0, 2]).unwrap());
827 let index1 = db
828 .catalog
829 .create_index(index_name1.clone(), &table_ref, key_schema1.clone())
830 .unwrap();
831 assert_eq!(index1.key_schema, key_schema1);
832
833 let index_name2 = "test_index2".to_string();
834 let key_schema2 = Arc::new(schema.project(&[1]).unwrap());
835 let index2 = db
836 .catalog
837 .create_index(index_name2.clone(), &table_ref, key_schema2.clone())
838 .unwrap();
839 assert_eq!(index2.key_schema, key_schema2);
840
841 let index3 = db
842 .catalog
843 .index(&table_ref, index_name1.as_str())
844 .unwrap()
845 .unwrap();
846 assert_eq!(index3.key_schema, key_schema1);
847 }
848}