1use std::collections::HashMap;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use super::registry::TableRegistry;
6use crate::catalog::{
7 key_schema_to_varchar, SchemaRef, TableStatistics, COLUMNS_SCHEMA, INDEXES_SCHEMA,
8 INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME,
9 INFORMATION_SCHEMA_SCHEMAS, INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHEMA, TABLES_SCHEMA,
10};
11use crate::storage::disk_manager::DiskManager;
12use crate::storage::heap::MvccHeap;
13use crate::storage::page::{BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE};
14use crate::storage::table_heap::{TableHeap, TableIterator};
15use crate::storage::tuple::Tuple;
16use crate::transaction::{CommandId, TransactionId};
17use crate::utils::scalar::ScalarValue;
18use crate::utils::table_ref::TableReference;
19use crate::{
20 buffer::BufferManager,
21 error::{QuillSQLError, QuillSQLResult},
22 storage::index::btree_index::BPlusTreeIndex,
23};
24
25pub static DEFAULT_CATALOG_NAME: &str = "quillsql";
26pub static DEFAULT_SCHEMA_NAME: &str = "public";
27
28#[derive(Debug)]
29pub struct Catalog {
30 pub schemas: HashMap<String, CatalogSchema>,
31 pub buffer_pool: Arc<BufferManager>,
32 pub disk_manager: Arc<DiskManager>,
33 table_registry: Arc<TableRegistry>,
34}
35
36#[derive(Debug)]
37pub struct CatalogSchema {
38 pub name: String,
39 pub tables: HashMap<String, CatalogTable>,
40}
41
42impl CatalogSchema {
43 pub fn new(name: impl Into<String>) -> Self {
44 Self {
45 name: name.into(),
46 tables: HashMap::new(),
47 }
48 }
49}
50
51#[derive(Debug)]
52pub struct CatalogTable {
53 pub name: String,
54 pub table: Arc<TableHeap>,
55 pub indexes: HashMap<String, Arc<BPlusTreeIndex>>,
56 pub stats: Option<TableStatistics>,
57}
58
59impl CatalogTable {
60 pub fn new(name: impl Into<String>, table: Arc<TableHeap>) -> Self {
61 Self {
62 name: name.into(),
63 table,
64 indexes: HashMap::new(),
65 stats: None,
66 }
67 }
68}
69
70const SYSTEM_TXN_ID: TransactionId = 0;
71const SYSTEM_COMMAND_ID: CommandId = 0;
72
73impl Catalog {
74 pub fn new(
75 buffer_pool: Arc<BufferManager>,
76 disk_manager: Arc<DiskManager>,
77 table_registry: Arc<TableRegistry>,
78 ) -> Self {
79 Self {
80 schemas: HashMap::new(),
81 buffer_pool,
82 disk_manager,
83 table_registry,
84 }
85 }
86
87 pub fn table_registry(&self) -> Arc<TableRegistry> {
88 self.table_registry.clone()
89 }
90
91 pub fn create_schema(&mut self, schema_name: impl Into<String>) -> QuillSQLResult<()> {
92 let schema_name = schema_name.into();
93 if self.schemas.contains_key(&schema_name) {
94 return Err(QuillSQLError::Storage(
95 "Cannot create duplicated schema".to_string(),
96 ));
97 }
98 self.schemas
99 .insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));
100
101 let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
103 return Err(QuillSQLError::Internal(
104 "catalog schema information_schema not created yet".to_string(),
105 ));
106 };
107 let Some(schemas_table) = information_schema
108 .tables
109 .get_mut(INFORMATION_SCHEMA_SCHEMAS)
110 else {
111 return Err(QuillSQLError::Internal(
112 "table information_schema.schemas not created yet".to_string(),
113 ));
114 };
115
116 let tuple = Tuple::new(
117 SCHEMAS_SCHEMA.clone(),
118 vec![
119 DEFAULT_CATALOG_NAME.to_string().into(),
120 schema_name.clone().into(),
121 ],
122 );
123 let _ = MvccHeap::new(schemas_table.table.clone()).insert(
124 &tuple,
125 SYSTEM_TXN_ID,
126 SYSTEM_COMMAND_ID,
127 )?;
128 Ok(())
129 }
130
131 pub fn create_table(
132 &mut self,
133 table_ref: TableReference,
134 schema: SchemaRef,
135 ) -> QuillSQLResult<Arc<TableHeap>> {
136 let catalog_name = table_ref
137 .catalog()
138 .unwrap_or(DEFAULT_CATALOG_NAME)
139 .to_string();
140 let catalog_schema_name = table_ref
141 .schema()
142 .unwrap_or(DEFAULT_SCHEMA_NAME)
143 .to_string();
144 let table_name = table_ref.table().to_string();
145
146 let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
147 return Err(QuillSQLError::Storage(format!(
148 "catalog schema {} not created yet",
149 catalog_schema_name
150 )));
151 };
152 if catalog_schema.tables.contains_key(table_ref.table()) {
153 return Err(QuillSQLError::Storage(
154 "Cannot create duplicated table".to_string(),
155 ));
156 }
157 let table_heap = Arc::new(TableHeap::try_new(
158 schema.clone(),
159 self.buffer_pool.clone(),
160 )?);
161 let catalog_table = CatalogTable {
162 name: table_name.clone(),
163 table: table_heap.clone(),
164 indexes: HashMap::new(),
165 stats: None,
166 };
167 catalog_schema
168 .tables
169 .insert(table_name.clone(), catalog_table);
170 self.table_registry
171 .register(table_ref.clone(), table_heap.clone());
172
173 let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
175 return Err(QuillSQLError::Internal(
176 "catalog schema information_schema not created yet".to_string(),
177 ));
178 };
179 let Some(tables_table) = information_schema.tables.get_mut(INFORMATION_SCHEMA_TABLES)
180 else {
181 return Err(QuillSQLError::Internal(
182 "table information_schema.tables not created yet".to_string(),
183 ));
184 };
185
186 let tuple = Tuple::new(
187 TABLES_SCHEMA.clone(),
188 vec![
189 catalog_name.clone().into(),
190 catalog_schema_name.clone().into(),
191 table_name.clone().into(),
192 (table_heap.first_page_id.load(Ordering::SeqCst)).into(),
193 ],
194 );
195 let _ = MvccHeap::new(tables_table.table.clone()).insert(
196 &tuple,
197 SYSTEM_TXN_ID,
198 SYSTEM_COMMAND_ID,
199 )?;
200
201 let Some(columns_table) = information_schema
202 .tables
203 .get_mut(INFORMATION_SCHEMA_COLUMNS)
204 else {
205 return Err(QuillSQLError::Internal(
206 "table information_schema.columns not created yet".to_string(),
207 ));
208 };
209 let columns_mvcc = MvccHeap::new(columns_table.table.clone());
210 for col in schema.columns.iter() {
211 let sql_type: sqlparser::ast::DataType = (&col.data_type).into();
212 let tuple = Tuple::new(
213 COLUMNS_SCHEMA.clone(),
214 vec![
215 catalog_name.clone().into(),
216 catalog_schema_name.clone().into(),
217 table_name.clone().into(),
218 col.name.clone().into(),
219 format!("{sql_type}").into(),
220 col.nullable.into(),
221 format!("{}", col.default).into(),
222 ],
223 );
224 let _ = columns_mvcc.insert(&tuple, SYSTEM_TXN_ID, SYSTEM_COMMAND_ID)?;
225 }
226
227 Ok(table_heap)
228 }
229
230 pub fn drop_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<bool> {
231 let catalog_name = table_ref
232 .catalog()
233 .unwrap_or(DEFAULT_CATALOG_NAME)
234 .to_string();
235 let schema_name = table_ref
236 .schema()
237 .unwrap_or(DEFAULT_SCHEMA_NAME)
238 .to_string();
239 let table_name = table_ref.table().to_string();
240
241 if schema_name == INFORMATION_SCHEMA_NAME {
242 return Err(QuillSQLError::Execution(
243 "dropping information_schema tables is not allowed".to_string(),
244 ));
245 }
246
247 let Some(schema) = self.schemas.get_mut(&schema_name) else {
248 return Ok(false);
249 };
250
251 let Some(catalog_table) = schema.tables.remove(&table_name) else {
252 return Ok(false);
253 };
254
255 self.table_registry.unregister(table_ref);
256
257 for index_name in catalog_table.indexes.keys() {
258 self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
259 }
260
261 self.remove_table_metadata(&catalog_name, &schema_name, &table_name)?;
262 Ok(true)
263 }
264
265 pub fn table_heap(&self, table_ref: &TableReference) -> QuillSQLResult<Arc<TableHeap>> {
266 let catalog_schema_name = table_ref
267 .schema()
268 .unwrap_or(DEFAULT_SCHEMA_NAME)
269 .to_string();
270 let table_name = table_ref.table().to_string();
271
272 let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
273 return Err(QuillSQLError::Storage(format!(
274 "catalog schema {} not created yet",
275 catalog_schema_name
276 )));
277 };
278 let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
279 return Err(QuillSQLError::Storage(format!(
280 "table {} not created yet",
281 table_name
282 )));
283 };
284 Ok(catalog_table.table.clone())
285 }
286
287 pub fn table_statistics(&self, table_ref: &TableReference) -> Option<&TableStatistics> {
288 let catalog_schema_name = table_ref
289 .schema()
290 .unwrap_or(DEFAULT_SCHEMA_NAME)
291 .to_string();
292 self.schemas
293 .get(&catalog_schema_name)
294 .and_then(|schema| schema.tables.get(table_ref.table()))
295 .and_then(|table| table.stats.as_ref())
296 }
297
298 pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
299 let catalog_schema_name = table_ref
300 .schema()
301 .unwrap_or(DEFAULT_SCHEMA_NAME)
302 .to_string();
303 let table_name = table_ref.table().to_string();
304
305 let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
306 return Err(QuillSQLError::Storage(format!(
307 "catalog schema {} not created yet",
308 catalog_schema_name
309 )));
310 };
311 let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
312 return Err(QuillSQLError::Storage(format!(
313 "table {} not created yet",
314 table_name
315 )));
316 };
317
318 let stats = TableStatistics::analyze(catalog_table.table.clone())?;
319 catalog_table.stats = Some(stats.clone());
320 Ok(stats)
321 }
322
323 pub fn try_table_heap(&self, table_ref: &TableReference) -> Option<Arc<TableHeap>> {
324 let schema_name = table_ref
325 .schema()
326 .unwrap_or(DEFAULT_SCHEMA_NAME)
327 .to_string();
328 self.schemas
329 .get(&schema_name)
330 .and_then(|schema| schema.tables.get(table_ref.table()))
331 .map(|catalog_table| catalog_table.table.clone())
332 }
333
334 pub fn table_indexes(
335 &self,
336 table_ref: &TableReference,
337 ) -> QuillSQLResult<Vec<(String, Arc<BPlusTreeIndex>)>> {
338 let catalog_schema_name = table_ref
339 .schema()
340 .unwrap_or(DEFAULT_SCHEMA_NAME)
341 .to_string();
342 let table_name = table_ref.table().to_string();
343
344 let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
345 return Err(QuillSQLError::Storage(format!(
346 "catalog schema {} not created yet",
347 catalog_schema_name
348 )));
349 };
350 let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
351 return Err(QuillSQLError::Storage(format!(
352 "table {} not created yet",
353 table_name
354 )));
355 };
356 Ok(catalog_table
357 .indexes
358 .iter()
359 .map(|(name, index)| (name.clone(), index.clone()))
360 .collect())
361 }
362
363 pub fn create_index(
364 &mut self,
365 index_name: String,
366 table_ref: &TableReference,
367 key_schema: SchemaRef,
368 ) -> QuillSQLResult<Arc<BPlusTreeIndex>> {
369 let catalog_name = table_ref
370 .catalog()
371 .unwrap_or(DEFAULT_CATALOG_NAME)
372 .to_string();
373 let catalog_schema_name = table_ref
374 .schema()
375 .unwrap_or(DEFAULT_SCHEMA_NAME)
376 .to_string();
377 let table_name = table_ref.table().to_string();
378
379 let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
380 return Err(QuillSQLError::Storage(format!(
381 "catalog schema {} not created yet",
382 catalog_schema_name
383 )));
384 };
385 let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
386 return Err(QuillSQLError::Storage(format!(
387 "table {} not created yet",
388 table_name
389 )));
390 };
391 if catalog_table.indexes.contains_key(&index_name) {
392 return Err(QuillSQLError::Storage(
393 "Cannot create duplicated index".to_string(),
394 ));
395 }
396
397 let b_plus_tree_index = Arc::new(BPlusTreeIndex::new(
398 key_schema.clone(),
399 self.buffer_pool.clone(),
400 BPLUS_INTERNAL_PAGE_MAX_SIZE as u32,
401 BPLUS_LEAF_PAGE_MAX_SIZE as u32,
402 ));
403 catalog_table
404 .indexes
405 .insert(index_name.clone(), b_plus_tree_index.clone());
406
407 let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
409 return Err(QuillSQLError::Internal(
410 "catalog schema information_schema not created yet".to_string(),
411 ));
412 };
413 let Some(indexes_table) = information_schema
414 .tables
415 .get_mut(INFORMATION_SCHEMA_INDEXES)
416 else {
417 return Err(QuillSQLError::Internal(
418 "table information_schema.indexes not created yet".to_string(),
419 ));
420 };
421
422 let tuple = Tuple::new(
423 INDEXES_SCHEMA.clone(),
424 vec![
425 catalog_name.clone().into(),
426 catalog_schema_name.clone().into(),
427 table_name.clone().into(),
428 index_name.clone().into(),
429 key_schema_to_varchar(&b_plus_tree_index.key_schema).into(),
430 b_plus_tree_index.internal_max_size.into(),
431 b_plus_tree_index.leaf_max_size.into(),
432 b_plus_tree_index.header_page_id.into(),
433 ],
434 );
435 let _ = MvccHeap::new(indexes_table.table.clone()).insert(
436 &tuple,
437 SYSTEM_TXN_ID,
438 SYSTEM_COMMAND_ID,
439 )?;
440
441 Ok(b_plus_tree_index)
442 }
443
444 pub fn drop_index(
445 &mut self,
446 table_ref: &TableReference,
447 index_name: &str,
448 ) -> QuillSQLResult<bool> {
449 let catalog_name = table_ref
450 .catalog()
451 .unwrap_or(DEFAULT_CATALOG_NAME)
452 .to_string();
453 let schema_name = table_ref
454 .schema()
455 .unwrap_or(DEFAULT_SCHEMA_NAME)
456 .to_string();
457 let table_name = table_ref.table().to_string();
458
459 if schema_name == INFORMATION_SCHEMA_NAME {
460 return Err(QuillSQLError::Execution(
461 "dropping indexes on information_schema tables is not allowed".to_string(),
462 ));
463 }
464
465 let Some(schema) = self.schemas.get_mut(&schema_name) else {
466 return Ok(false);
467 };
468 let Some(table) = schema.tables.get_mut(&table_name) else {
469 return Ok(false);
470 };
471
472 if table.indexes.remove(index_name).is_none() {
473 return Ok(false);
474 }
475
476 self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
477
478 Ok(true)
479 }
480
481 pub fn find_index_owner(
482 &self,
483 catalog_hint: Option<&str>,
484 schema_hint: Option<&str>,
485 index_name: &str,
486 ) -> Option<TableReference> {
487 let catalog_name = catalog_hint.unwrap_or(DEFAULT_CATALOG_NAME);
488
489 if let Some(schema_name) = schema_hint {
490 return self.find_index_in_schema(catalog_name, schema_name, index_name);
491 }
492
493 for schema_name in self.schemas.keys() {
494 if schema_name == INFORMATION_SCHEMA_NAME {
495 continue;
496 }
497 if let Some(table_ref) =
498 self.find_index_in_schema(catalog_name, schema_name, index_name)
499 {
500 return Some(table_ref);
501 }
502 }
503 None
504 }
505
506 pub fn index(
507 &self,
508 table_ref: &TableReference,
509 index_name: &str,
510 ) -> QuillSQLResult<Option<Arc<BPlusTreeIndex>>> {
511 let catalog_schema_name = table_ref
512 .schema()
513 .unwrap_or(DEFAULT_SCHEMA_NAME)
514 .to_string();
515 let table_name = table_ref.table().to_string();
516
517 let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else {
518 return Err(QuillSQLError::Storage(format!(
519 "catalog schema {} not created yet",
520 catalog_schema_name
521 )));
522 };
523 let Some(catalog_table) = catalog_schema.tables.get(&table_name) else {
524 return Err(QuillSQLError::Storage(format!(
525 "table {} not created yet",
526 table_name
527 )));
528 };
529 Ok(catalog_table.indexes.get(index_name).cloned())
530 }
531
532 pub fn load_schema(&mut self, name: impl Into<String>, schema: CatalogSchema) {
533 self.schemas.insert(name.into(), schema);
534 }
535
536 pub fn load_table(
537 &mut self,
538 table_ref: TableReference,
539 table: CatalogTable,
540 ) -> QuillSQLResult<()> {
541 let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
542 let table_name = table_ref.table().to_string();
543 let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
544 return Err(QuillSQLError::Storage(format!(
545 "catalog schema {} not created yet",
546 catalog_schema_name
547 )));
548 };
549 self.table_registry
550 .register(table_ref.clone(), table.table.clone());
551 catalog_schema.tables.insert(table_name, table);
552 Ok(())
553 }
554
555 pub fn load_index(
556 &mut self,
557 table_ref: TableReference,
558 index_name: impl Into<String>,
559 index: Arc<BPlusTreeIndex>,
560 ) -> QuillSQLResult<()> {
561 let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
562 let table_name = table_ref.table().to_string();
563 let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
564 return Err(QuillSQLError::Storage(format!(
565 "catalog schema {} not created yet",
566 catalog_schema_name
567 )));
568 };
569 let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
570 return Err(QuillSQLError::Storage(format!(
571 "catalog table {} not created yet",
572 table_name
573 )));
574 };
575 let idx_name: String = index_name.into();
576 catalog_table.indexes.insert(idx_name.clone(), index);
577 Ok(())
578 }
579
580 fn find_index_in_schema(
581 &self,
582 catalog_name: &str,
583 schema_name: &str,
584 index_name: &str,
585 ) -> Option<TableReference> {
586 if schema_name == INFORMATION_SCHEMA_NAME {
587 return None;
588 }
589 let schema = self.schemas.get(schema_name)?;
590 for (table_name, table) in &schema.tables {
591 if table.indexes.contains_key(index_name) {
592 if catalog_name == DEFAULT_CATALOG_NAME {
593 if schema_name == DEFAULT_SCHEMA_NAME {
594 return Some(TableReference::Bare {
595 table: table_name.clone(),
596 });
597 }
598 return Some(TableReference::Partial {
599 schema: schema_name.to_string(),
600 table: table_name.clone(),
601 });
602 }
603 return Some(TableReference::Full {
604 catalog: catalog_name.to_string(),
605 schema: schema_name.to_string(),
606 table: table_name.clone(),
607 });
608 }
609 }
610 None
611 }
612
613 fn remove_table_metadata(
614 &mut self,
615 catalog_name: &str,
616 schema_name: &str,
617 table_name: &str,
618 ) -> QuillSQLResult<()> {
619 let (tables_heap, columns_heap, indexes_heap) = {
620 let information_schema =
621 self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
622 QuillSQLError::Internal(
623 "catalog schema information_schema not created yet".to_string(),
624 )
625 })?;
626 let tables_heap = information_schema
627 .tables
628 .get(INFORMATION_SCHEMA_TABLES)
629 .ok_or_else(|| {
630 QuillSQLError::Internal(
631 "table information_schema.tables not created yet".to_string(),
632 )
633 })?
634 .table
635 .clone();
636 let columns_heap = information_schema
637 .tables
638 .get(INFORMATION_SCHEMA_COLUMNS)
639 .ok_or_else(|| {
640 QuillSQLError::Internal(
641 "table information_schema.columns not created yet".to_string(),
642 )
643 })?
644 .table
645 .clone();
646 let indexes_heap = information_schema
647 .tables
648 .get(INFORMATION_SCHEMA_INDEXES)
649 .ok_or_else(|| {
650 QuillSQLError::Internal(
651 "table information_schema.indexes not created yet".to_string(),
652 )
653 })?
654 .table
655 .clone();
656 (tables_heap, columns_heap, indexes_heap)
657 };
658
659 Self::delete_matching_rows(tables_heap, |tuple| {
660 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
661 return Ok(false);
662 };
663 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
664 return Ok(false);
665 };
666 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
667 return Ok(false);
668 };
669 Ok(catalog == catalog_name && schema == schema_name && table == table_name)
670 })?;
671
672 Self::delete_matching_rows(columns_heap, |tuple| {
673 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
674 return Ok(false);
675 };
676 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
677 return Ok(false);
678 };
679 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
680 return Ok(false);
681 };
682 Ok(catalog == catalog_name && schema == schema_name && table == table_name)
683 })?;
684
685 Self::delete_matching_rows(indexes_heap, |tuple| {
686 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
687 return Ok(false);
688 };
689 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
690 return Ok(false);
691 };
692 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
693 return Ok(false);
694 };
695 Ok(catalog == catalog_name && schema == schema_name && table == table_name)
696 })?;
697
698 Ok(())
699 }
700
701 fn remove_index_metadata(
702 &mut self,
703 catalog_name: &str,
704 schema_name: &str,
705 table_name: &str,
706 index_name: &str,
707 ) -> QuillSQLResult<()> {
708 let indexes_heap = {
709 let information_schema =
710 self.schemas.get(INFORMATION_SCHEMA_NAME).ok_or_else(|| {
711 QuillSQLError::Internal(
712 "catalog schema information_schema not created yet".to_string(),
713 )
714 })?;
715 information_schema
716 .tables
717 .get(INFORMATION_SCHEMA_INDEXES)
718 .ok_or_else(|| {
719 QuillSQLError::Internal(
720 "table information_schema.indexes not created yet".to_string(),
721 )
722 })?
723 .table
724 .clone()
725 };
726
727 Self::delete_matching_rows(indexes_heap, |tuple| {
728 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
729 return Ok(false);
730 };
731 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
732 return Ok(false);
733 };
734 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
735 return Ok(false);
736 };
737 let ScalarValue::Varchar(Some(index)) = tuple.value(3)? else {
738 return Ok(false);
739 };
740 Ok(catalog == catalog_name
741 && schema == schema_name
742 && table == table_name
743 && index == index_name)
744 })?;
745
746 Ok(())
747 }
748
749 fn delete_matching_rows<F>(heap: Arc<TableHeap>, mut predicate: F) -> QuillSQLResult<()>
750 where
751 F: FnMut(&Tuple) -> QuillSQLResult<bool>,
752 {
753 let mvcc = MvccHeap::new(heap.clone());
754 let mut iterator = TableIterator::new(heap.clone(), ..);
755 while let Some((rid, _meta, tuple)) = iterator.next()? {
756 if predicate(&tuple)? {
757 let _ = mvcc.mark_deleted(rid, SYSTEM_TXN_ID, SYSTEM_COMMAND_ID)?;
758 }
759 }
760 Ok(())
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use std::sync::Arc;
767
768 use crate::utils::table_ref::TableReference;
769 use crate::{
770 catalog::{Column, DataType, Schema},
771 database::Database,
772 };
773
774 #[test]
775 pub fn test_catalog_create_table() {
776 let mut db = Database::new_temp().unwrap();
777
778 let table_ref1 = TableReference::Bare {
779 table: "test_table1".to_string(),
780 };
781 let schema = Arc::new(Schema::new(vec![
782 Column::new("a", DataType::Int8, true),
783 Column::new("b", DataType::Int16, true),
784 Column::new("c", DataType::Int32, true),
785 ]));
786 let table_info = db
787 .catalog
788 .create_table(table_ref1.clone(), schema.clone())
789 .unwrap();
790 assert_eq!(table_info.schema, schema);
791
792 let table_ref2 = TableReference::Bare {
793 table: "test_table2".to_string(),
794 };
795 let schema = Arc::new(Schema::new(vec![
796 Column::new("d", DataType::Int32, true),
797 Column::new("e", DataType::Int16, true),
798 Column::new("f", DataType::Int8, true),
799 ]));
800 let table_info = db
801 .catalog
802 .create_table(table_ref2.clone(), schema.clone())
803 .unwrap();
804 assert_eq!(table_info.schema, schema);
805
806 let table_info = db.catalog.table_heap(&table_ref1).unwrap();
807 assert_eq!(table_info.schema.column_count(), 3);
808
809 let table_info = db.catalog.table_heap(&table_ref2).unwrap();
810 assert_eq!(table_info.schema.column_count(), 3);
811 }
812
813 #[test]
814 pub fn test_catalog_create_index() {
815 let mut db = Database::new_temp().unwrap();
816
817 let table_ref = TableReference::Bare {
818 table: "test_table1".to_string(),
819 };
820 let schema = Arc::new(Schema::new(vec![
821 Column::new("a", DataType::Int8, true),
822 Column::new("b", DataType::Int16, true),
823 Column::new("c", DataType::Int32, true),
824 ]));
825 let _ = db.catalog.create_table(table_ref.clone(), schema.clone());
826
827 let index_name1 = "test_index1".to_string();
828 let key_schema1 = Arc::new(schema.project(&[0, 2]).unwrap());
829 let index1 = db
830 .catalog
831 .create_index(index_name1.clone(), &table_ref, key_schema1.clone())
832 .unwrap();
833 assert_eq!(index1.key_schema, key_schema1);
834
835 let index_name2 = "test_index2".to_string();
836 let key_schema2 = Arc::new(schema.project(&[1]).unwrap());
837 let index2 = db
838 .catalog
839 .create_index(index_name2.clone(), &table_ref, key_schema2.clone())
840 .unwrap();
841 assert_eq!(index2.key_schema, key_schema2);
842
843 let index3 = db
844 .catalog
845 .index(&table_ref, index_name1.as_str())
846 .unwrap()
847 .unwrap();
848 assert_eq!(index3.key_schema, key_schema1);
849 }
850
851 #[test]
852 fn analyze_table_collects_basic_stats() {
853 let mut db = Database::new_temp().unwrap();
854 db.run("create table t_stats (a int, b int)").unwrap();
855 db.run("insert into t_stats values (1, 10), (null, 20), (2, null)")
856 .unwrap();
857
858 let table_ref = TableReference::Bare {
859 table: "t_stats".to_string(),
860 };
861 let stats = db.catalog.analyze_table(&table_ref).unwrap();
862 assert_eq!(stats.row_count, 3);
863 let col_a = stats.column_stats.get("a").unwrap();
864 assert_eq!(col_a.null_count, 1);
865 assert_eq!(col_a.non_null_count, 2);
866 let stored = db.catalog.table_statistics(&table_ref).unwrap();
867 assert_eq!(stored.row_count, 3);
868 }
869}