1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::catalog::{
5 key_schema_to_varchar, Schema, SchemaRef, TableStatistics, COLUMNS_SCHEMA, INDEXES_SCHEMA,
6 INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME,
7 INFORMATION_SCHEMA_SCHEMAS, INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHEMA, TABLES_SCHEMA,
8};
9use crate::error::{QuillSQLError, QuillSQLResult};
10use crate::storage::engine::TableHandle;
11use crate::storage::holt::{HoltStore, HoltTableHandle};
12use crate::storage::tuple::Tuple;
13use crate::utils::scalar::ScalarValue;
14use crate::utils::table_ref::TableReference;
15
16pub static DEFAULT_CATALOG_NAME: &str = "quillsql";
17pub static DEFAULT_SCHEMA_NAME: &str = "public";
18
19#[derive(Debug)]
20pub struct Catalog {
21 pub schemas: HashMap<String, CatalogSchema>,
22 pub holt_store: Arc<HoltStore>,
23}
24
25#[derive(Debug)]
26pub struct CatalogSchema {
27 pub name: String,
28 pub tables: HashMap<String, CatalogTable>,
29}
30
31impl CatalogSchema {
32 pub fn new(name: impl Into<String>) -> Self {
33 Self {
34 name: name.into(),
35 tables: HashMap::new(),
36 }
37 }
38}
39
40#[derive(Debug)]
41pub struct CatalogTable {
42 pub name: String,
43 pub schema: SchemaRef,
44 pub table_id: u64,
45 pub indexes: HashMap<String, CatalogIndex>,
46 pub stats: Option<TableStatistics>,
47}
48
49impl CatalogTable {
50 pub fn new(name: impl Into<String>, schema: SchemaRef, table_id: u64) -> Self {
51 Self {
52 name: name.into(),
53 schema,
54 table_id,
55 indexes: HashMap::new(),
56 stats: None,
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
62pub struct CatalogIndex {
63 pub name: String,
64 pub key_schema: SchemaRef,
65 pub index_id: u64,
66}
67
68impl CatalogIndex {
69 pub fn new(name: impl Into<String>, key_schema: SchemaRef, index_id: u64) -> Self {
70 Self {
71 name: name.into(),
72 key_schema,
73 index_id,
74 }
75 }
76}
77
78impl Catalog {
79 pub fn new(holt_store: Arc<HoltStore>) -> Self {
80 Self {
81 schemas: HashMap::new(),
82 holt_store,
83 }
84 }
85
86 pub fn create_schema(&mut self, schema_name: impl Into<String>) -> QuillSQLResult<()> {
87 let schema_name = schema_name.into();
88 if self.schemas.contains_key(&schema_name) {
89 return Err(QuillSQLError::Storage(
90 "Cannot create duplicated schema".to_string(),
91 ));
92 }
93 self.schemas
94 .insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));
95
96 let tuple = Tuple::new(
97 SCHEMAS_SCHEMA.clone(),
98 vec![
99 DEFAULT_CATALOG_NAME.to_string().into(),
100 schema_name.clone().into(),
101 ],
102 );
103 self.insert_system_tuple(INFORMATION_SCHEMA_SCHEMAS, &tuple)?;
104 Ok(())
105 }
106
107 pub fn create_table(
108 &mut self,
109 table_ref: TableReference,
110 schema: SchemaRef,
111 ) -> QuillSQLResult<u64> {
112 let catalog_name = table_ref
113 .catalog()
114 .unwrap_or(DEFAULT_CATALOG_NAME)
115 .to_string();
116 let catalog_schema_name = table_ref
117 .schema()
118 .unwrap_or(DEFAULT_SCHEMA_NAME)
119 .to_string();
120 let table_name = table_ref.table().to_string();
121
122 {
123 let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
124 return Err(QuillSQLError::Storage(format!(
125 "catalog schema {} not created yet",
126 catalog_schema_name
127 )));
128 };
129 if catalog_schema.tables.contains_key(table_ref.table()) {
130 return Err(QuillSQLError::Storage(
131 "Cannot create duplicated table".to_string(),
132 ));
133 }
134 }
135
136 let table_id = self
137 .holt_store
138 .create_table_descriptor(&table_ref, schema.as_ref())?;
139 self.schemas
140 .get_mut(&catalog_schema_name)
141 .expect("schema checked above")
142 .tables
143 .insert(
144 table_name.clone(),
145 CatalogTable::new(table_name.clone(), schema.clone(), table_id),
146 );
147
148 self.insert_table_metadata(&catalog_name, &catalog_schema_name, &table_name, &schema)?;
149 Ok(table_id)
150 }
151
152 pub fn drop_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<bool> {
153 let catalog_name = table_ref
154 .catalog()
155 .unwrap_or(DEFAULT_CATALOG_NAME)
156 .to_string();
157 let schema_name = table_ref
158 .schema()
159 .unwrap_or(DEFAULT_SCHEMA_NAME)
160 .to_string();
161 let table_name = table_ref.table().to_string();
162
163 if schema_name == INFORMATION_SCHEMA_NAME {
164 return Err(QuillSQLError::Execution(
165 "dropping information_schema tables is not allowed".to_string(),
166 ));
167 }
168
169 let Some(catalog_table) = self
170 .schemas
171 .get(&schema_name)
172 .and_then(|schema| schema.tables.get(&table_name))
173 else {
174 return Ok(false);
175 };
176 let index_names = catalog_table.indexes.keys().cloned().collect::<Vec<_>>();
177
178 for index_name in &index_names {
179 self.holt_store
180 .drop_index_descriptor(table_ref, index_name)?;
181 }
182 self.holt_store.drop_table_descriptor(table_ref)?;
183
184 self.schemas
185 .get_mut(&schema_name)
186 .expect("table existence checked above")
187 .tables
188 .remove(&table_name);
189
190 for index_name in index_names {
191 self.remove_index_metadata(&catalog_name, &schema_name, &table_name, &index_name)?;
192 }
193 self.remove_table_metadata(&catalog_name, &schema_name, &table_name)?;
194 Ok(true)
195 }
196
197 pub fn table_schema(&self, table_ref: &TableReference) -> QuillSQLResult<SchemaRef> {
198 Ok(self.catalog_table(table_ref)?.schema.clone())
199 }
200
201 pub fn table_id(&self, table_ref: &TableReference) -> QuillSQLResult<u64> {
202 Ok(self.catalog_table(table_ref)?.table_id)
203 }
204
205 pub fn table_statistics(&self, table_ref: &TableReference) -> Option<&TableStatistics> {
206 let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
207 self.schemas
208 .get(schema_name)
209 .and_then(|schema| schema.tables.get(table_ref.table()))
210 .and_then(|table| table.stats.as_ref())
211 }
212
213 pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
214 let schema_name = table_ref
215 .schema()
216 .unwrap_or(DEFAULT_SCHEMA_NAME)
217 .to_string();
218 let table_name = table_ref.table().to_string();
219 let catalog_table = self.catalog_table(table_ref)?;
220 let schema = catalog_table.schema.clone();
221 let table = HoltTableHandle::new(
222 table_ref.clone(),
223 schema.clone(),
224 catalog_table.table_id,
225 self.holt_store.clone(),
226 );
227 let mut stats = TableStatistics::empty(schema.as_ref());
228 let mut stream = table.full_scan()?;
229 while let Some((_rid, meta, tuple)) = stream.next()? {
230 if !meta.is_deleted {
231 stats.record_tuple(&tuple);
232 }
233 }
234 self.schemas
235 .get_mut(&schema_name)
236 .and_then(|schema| schema.tables.get_mut(&table_name))
237 .expect("table existence checked above")
238 .stats = Some(stats.clone());
239 Ok(stats)
240 }
241
242 pub fn try_table_schema(&self, table_ref: &TableReference) -> Option<SchemaRef> {
243 let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
244 self.schemas
245 .get(schema_name)
246 .and_then(|schema| schema.tables.get(table_ref.table()))
247 .map(|table| table.schema.clone())
248 }
249
250 pub fn table_indexes(&self, table_ref: &TableReference) -> QuillSQLResult<Vec<CatalogIndex>> {
251 Ok(self
252 .catalog_table(table_ref)?
253 .indexes
254 .values()
255 .cloned()
256 .collect())
257 }
258
259 pub fn create_index(
260 &mut self,
261 index_name: String,
262 table_ref: &TableReference,
263 key_schema: SchemaRef,
264 ) -> QuillSQLResult<u64> {
265 let catalog_name = table_ref
266 .catalog()
267 .unwrap_or(DEFAULT_CATALOG_NAME)
268 .to_string();
269 let schema_name = table_ref
270 .schema()
271 .unwrap_or(DEFAULT_SCHEMA_NAME)
272 .to_string();
273 let table_name = table_ref.table().to_string();
274
275 {
276 let table = self.catalog_table_mut(table_ref)?;
277 if table.indexes.contains_key(&index_name) {
278 return Err(QuillSQLError::Storage(
279 "Cannot create duplicated index".to_string(),
280 ));
281 }
282 }
283
284 let index_id =
285 self.holt_store
286 .create_index_descriptor(table_ref, &index_name, key_schema.as_ref())?;
287 self.catalog_table_mut(table_ref)?.indexes.insert(
288 index_name.clone(),
289 CatalogIndex::new(index_name.clone(), key_schema.clone(), index_id),
290 );
291 self.insert_index_metadata(
292 &catalog_name,
293 &schema_name,
294 &table_name,
295 &index_name,
296 key_schema.as_ref(),
297 )?;
298 Ok(index_id)
299 }
300
301 pub fn drop_index(
302 &mut self,
303 table_ref: &TableReference,
304 index_name: &str,
305 ) -> QuillSQLResult<bool> {
306 let catalog_name = table_ref
307 .catalog()
308 .unwrap_or(DEFAULT_CATALOG_NAME)
309 .to_string();
310 let schema_name = table_ref
311 .schema()
312 .unwrap_or(DEFAULT_SCHEMA_NAME)
313 .to_string();
314 let table_name = table_ref.table().to_string();
315
316 if schema_name == INFORMATION_SCHEMA_NAME {
317 return Err(QuillSQLError::Execution(
318 "dropping indexes on information_schema tables is not allowed".to_string(),
319 ));
320 }
321
322 if !self
323 .catalog_table(table_ref)?
324 .indexes
325 .contains_key(index_name)
326 {
327 return Ok(false);
328 }
329 self.holt_store
330 .drop_index_descriptor(table_ref, index_name)?;
331 self.catalog_table_mut(table_ref)?
332 .indexes
333 .remove(index_name);
334 self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
335 Ok(true)
336 }
337
338 pub fn find_index_owner(
339 &self,
340 catalog_hint: Option<&str>,
341 schema_hint: Option<&str>,
342 index_name: &str,
343 ) -> Option<TableReference> {
344 let catalog_name = catalog_hint.unwrap_or(DEFAULT_CATALOG_NAME);
345
346 if let Some(schema_name) = schema_hint {
347 return self.find_index_in_schema(catalog_name, schema_name, index_name);
348 }
349
350 for schema_name in self.schemas.keys() {
351 if schema_name == INFORMATION_SCHEMA_NAME {
352 continue;
353 }
354 if let Some(table_ref) =
355 self.find_index_in_schema(catalog_name, schema_name, index_name)
356 {
357 return Some(table_ref);
358 }
359 }
360 None
361 }
362
363 pub fn load_schema(&mut self, name: impl Into<String>, schema: CatalogSchema) {
364 self.schemas.insert(name.into(), schema);
365 }
366
367 pub fn load_table(
368 &mut self,
369 table_ref: TableReference,
370 table_name: impl Into<String>,
371 schema: SchemaRef,
372 table_id: u64,
373 ) -> QuillSQLResult<()> {
374 let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
375 let catalog_schema = self.schemas.get_mut(schema_name).ok_or_else(|| {
376 QuillSQLError::Storage(format!("catalog schema {} not created yet", schema_name))
377 })?;
378 catalog_schema.tables.insert(
379 table_ref.table().to_string(),
380 CatalogTable::new(table_name, schema, table_id),
381 );
382 Ok(())
383 }
384
385 pub fn load_index(
386 &mut self,
387 table_ref: TableReference,
388 index_name: impl Into<String>,
389 key_schema: SchemaRef,
390 index_id: u64,
391 ) -> QuillSQLResult<()> {
392 let idx_name = index_name.into();
393 self.catalog_table_mut(&table_ref)?.indexes.insert(
394 idx_name.clone(),
395 CatalogIndex::new(idx_name, key_schema, index_id),
396 );
397 Ok(())
398 }
399
400 pub fn refresh_information_schema_projection(&self) -> QuillSQLResult<()> {
401 self.delete_matching_system_rows(INFORMATION_SCHEMA_SCHEMAS, |tuple| {
402 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
403 return Ok(false);
404 };
405 Ok(schema != INFORMATION_SCHEMA_NAME)
406 })?;
407 for table_name in [
408 INFORMATION_SCHEMA_TABLES,
409 INFORMATION_SCHEMA_COLUMNS,
410 INFORMATION_SCHEMA_INDEXES,
411 ] {
412 self.delete_matching_system_rows(table_name, |tuple| {
413 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
414 return Ok(false);
415 };
416 Ok(schema != INFORMATION_SCHEMA_NAME)
417 })?;
418 }
419
420 for (schema_name, schema) in &self.schemas {
421 if schema_name == INFORMATION_SCHEMA_NAME {
422 continue;
423 }
424 let tuple = Tuple::new(
425 SCHEMAS_SCHEMA.clone(),
426 vec![
427 DEFAULT_CATALOG_NAME.to_string().into(),
428 schema_name.clone().into(),
429 ],
430 );
431 self.insert_system_tuple(INFORMATION_SCHEMA_SCHEMAS, &tuple)?;
432
433 for (table_name, table) in &schema.tables {
434 self.insert_table_metadata(
435 DEFAULT_CATALOG_NAME,
436 schema_name,
437 table_name,
438 table.schema.as_ref(),
439 )?;
440 for (index_name, index) in &table.indexes {
441 self.insert_index_metadata(
442 DEFAULT_CATALOG_NAME,
443 schema_name,
444 table_name,
445 index_name,
446 index.key_schema.as_ref(),
447 )?;
448 }
449 }
450 }
451 Ok(())
452 }
453
454 fn catalog_table(&self, table_ref: &TableReference) -> QuillSQLResult<&CatalogTable> {
455 let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
456 self.schemas
457 .get(schema_name)
458 .ok_or_else(|| {
459 QuillSQLError::Storage(format!("catalog schema {} not created yet", schema_name))
460 })?
461 .tables
462 .get(table_ref.table())
463 .ok_or_else(|| {
464 QuillSQLError::Storage(format!("table {} not created yet", table_ref.table()))
465 })
466 }
467
468 fn catalog_table_mut(
469 &mut self,
470 table_ref: &TableReference,
471 ) -> QuillSQLResult<&mut CatalogTable> {
472 let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
473 self.schemas
474 .get_mut(schema_name)
475 .ok_or_else(|| {
476 QuillSQLError::Storage(format!("catalog schema {} not created yet", schema_name))
477 })?
478 .tables
479 .get_mut(table_ref.table())
480 .ok_or_else(|| {
481 QuillSQLError::Storage(format!("table {} not created yet", table_ref.table()))
482 })
483 }
484
485 fn information_schema_table(&self, table_name: &str) -> QuillSQLResult<&CatalogTable> {
486 self.schemas
487 .get(INFORMATION_SCHEMA_NAME)
488 .ok_or_else(|| {
489 QuillSQLError::Internal(
490 "catalog schema information_schema not created yet".to_string(),
491 )
492 })?
493 .tables
494 .get(table_name)
495 .ok_or_else(|| {
496 QuillSQLError::Internal(format!(
497 "table information_schema.{table_name} not created yet"
498 ))
499 })
500 }
501
502 fn insert_system_tuple(&self, table_name: &str, tuple: &Tuple) -> QuillSQLResult<()> {
503 let table_id = self.information_schema_table(table_name)?.table_id;
504 let _ = self.holt_store.insert_system_row(table_id, tuple)?;
505 Ok(())
506 }
507
508 fn insert_table_metadata(
509 &self,
510 catalog_name: &str,
511 schema_name: &str,
512 table_name: &str,
513 schema: &Schema,
514 ) -> QuillSQLResult<()> {
515 let tuple = Tuple::new(
516 TABLES_SCHEMA.clone(),
517 vec![
518 catalog_name.to_string().into(),
519 schema_name.to_string().into(),
520 table_name.to_string().into(),
521 0u32.into(),
522 ],
523 );
524 self.insert_system_tuple(INFORMATION_SCHEMA_TABLES, &tuple)?;
525
526 for col in &schema.columns {
527 let sql_type: sqlparser::ast::DataType = (&col.data_type).into();
528 let tuple = Tuple::new(
529 COLUMNS_SCHEMA.clone(),
530 vec![
531 catalog_name.to_string().into(),
532 schema_name.to_string().into(),
533 table_name.to_string().into(),
534 col.name.clone().into(),
535 format!("{sql_type}").into(),
536 col.nullable.into(),
537 format!("{}", col.default).into(),
538 ],
539 );
540 self.insert_system_tuple(INFORMATION_SCHEMA_COLUMNS, &tuple)?;
541 }
542 Ok(())
543 }
544
545 fn insert_index_metadata(
546 &self,
547 catalog_name: &str,
548 schema_name: &str,
549 table_name: &str,
550 index_name: &str,
551 key_schema: &Schema,
552 ) -> QuillSQLResult<()> {
553 let tuple = Tuple::new(
554 INDEXES_SCHEMA.clone(),
555 vec![
556 catalog_name.to_string().into(),
557 schema_name.to_string().into(),
558 table_name.to_string().into(),
559 index_name.to_string().into(),
560 key_schema_to_varchar(key_schema).into(),
561 0u32.into(),
562 0u32.into(),
563 0u32.into(),
564 ],
565 );
566 self.insert_system_tuple(INFORMATION_SCHEMA_INDEXES, &tuple)
567 }
568
569 fn remove_table_metadata(
570 &self,
571 catalog_name: &str,
572 schema_name: &str,
573 table_name: &str,
574 ) -> QuillSQLResult<()> {
575 for table in [
576 INFORMATION_SCHEMA_TABLES,
577 INFORMATION_SCHEMA_COLUMNS,
578 INFORMATION_SCHEMA_INDEXES,
579 ] {
580 self.delete_matching_system_rows(table, |tuple| {
581 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
582 return Ok(false);
583 };
584 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
585 return Ok(false);
586 };
587 let ScalarValue::Varchar(Some(row_table)) = tuple.value(2)? else {
588 return Ok(false);
589 };
590 Ok(catalog == catalog_name && schema == schema_name && row_table == table_name)
591 })?;
592 }
593 Ok(())
594 }
595
596 fn remove_index_metadata(
597 &self,
598 catalog_name: &str,
599 schema_name: &str,
600 table_name: &str,
601 index_name: &str,
602 ) -> QuillSQLResult<()> {
603 self.delete_matching_system_rows(INFORMATION_SCHEMA_INDEXES, |tuple| {
604 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
605 return Ok(false);
606 };
607 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
608 return Ok(false);
609 };
610 let ScalarValue::Varchar(Some(row_table)) = tuple.value(2)? else {
611 return Ok(false);
612 };
613 let ScalarValue::Varchar(Some(index)) = tuple.value(3)? else {
614 return Ok(false);
615 };
616 Ok(catalog == catalog_name
617 && schema == schema_name
618 && row_table == table_name
619 && index == index_name)
620 })
621 }
622
623 fn delete_matching_system_rows<F>(
624 &self,
625 table_name: &str,
626 mut predicate: F,
627 ) -> QuillSQLResult<()>
628 where
629 F: FnMut(&Tuple) -> QuillSQLResult<bool>,
630 {
631 let table = self.information_schema_table(table_name)?;
632 let table_id = table.table_id;
633 let table_ref = TableReference::Full {
634 catalog: DEFAULT_CATALOG_NAME.to_string(),
635 schema: INFORMATION_SCHEMA_NAME.to_string(),
636 table: table_name.to_string(),
637 };
638 let handle = HoltTableHandle::new(
639 table_ref,
640 table.schema.clone(),
641 table_id,
642 self.holt_store.clone(),
643 );
644 let mut stream = handle.full_scan()?;
645 let mut deleted = Vec::new();
646 while let Some((rid, meta, tuple)) = stream.next()? {
647 if !meta.is_deleted && predicate(&tuple)? {
648 deleted.push(rid);
649 }
650 }
651 for rid in deleted {
652 self.holt_store
653 .mark_system_row_deleted(table_id, table.schema.clone(), rid)?;
654 }
655 Ok(())
656 }
657
658 fn find_index_in_schema(
659 &self,
660 catalog_name: &str,
661 schema_name: &str,
662 index_name: &str,
663 ) -> Option<TableReference> {
664 let schema = self.schemas.get(schema_name)?;
665 for (table_name, table) in &schema.tables {
666 if table.indexes.contains_key(index_name) {
667 if catalog_name == DEFAULT_CATALOG_NAME {
668 if schema_name == DEFAULT_SCHEMA_NAME {
669 return Some(TableReference::Bare {
670 table: table_name.clone(),
671 });
672 }
673 return Some(TableReference::Partial {
674 schema: schema_name.to_string(),
675 table: table_name.clone(),
676 });
677 }
678 return Some(TableReference::Full {
679 catalog: catalog_name.to_string(),
680 schema: schema_name.to_string(),
681 table: table_name.clone(),
682 });
683 }
684 }
685 None
686 }
687}