Skip to main content

quill_sql/catalog/
core.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crate::catalog::{
5    key_schema_to_varchar, Schema, SchemaRef, TableStatistics, COLUMNS_SCHEMA, INDEXES_SCHEMA,
6    INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME,
7    INFORMATION_SCHEMA_SCHEMAS, INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHEMA, TABLES_SCHEMA,
8};
9use crate::error::{QuillSQLError, QuillSQLResult};
10use crate::storage::engine::TableHandle;
11use crate::storage::holt::{HoltStore, HoltTableHandle};
12use crate::storage::tuple::Tuple;
13use crate::utils::scalar::ScalarValue;
14use crate::utils::table_ref::TableReference;
15
16pub static DEFAULT_CATALOG_NAME: &str = "quillsql";
17pub static DEFAULT_SCHEMA_NAME: &str = "public";
18
19#[derive(Debug)]
20pub struct Catalog {
21    pub schemas: HashMap<String, CatalogSchema>,
22    pub holt_store: Arc<HoltStore>,
23}
24
25#[derive(Debug)]
26pub struct CatalogSchema {
27    pub name: String,
28    pub tables: HashMap<String, CatalogTable>,
29}
30
31impl CatalogSchema {
32    pub fn new(name: impl Into<String>) -> Self {
33        Self {
34            name: name.into(),
35            tables: HashMap::new(),
36        }
37    }
38}
39
40#[derive(Debug)]
41pub struct CatalogTable {
42    pub name: String,
43    pub schema: SchemaRef,
44    pub table_id: u64,
45    pub indexes: HashMap<String, CatalogIndex>,
46    pub stats: Option<TableStatistics>,
47}
48
49impl CatalogTable {
50    pub fn new(name: impl Into<String>, schema: SchemaRef, table_id: u64) -> Self {
51        Self {
52            name: name.into(),
53            schema,
54            table_id,
55            indexes: HashMap::new(),
56            stats: None,
57        }
58    }
59}
60
61#[derive(Debug, Clone)]
62pub struct CatalogIndex {
63    pub name: String,
64    pub key_schema: SchemaRef,
65    pub index_id: u64,
66}
67
68impl CatalogIndex {
69    pub fn new(name: impl Into<String>, key_schema: SchemaRef, index_id: u64) -> Self {
70        Self {
71            name: name.into(),
72            key_schema,
73            index_id,
74        }
75    }
76}
77
78impl Catalog {
79    pub fn new(holt_store: Arc<HoltStore>) -> Self {
80        Self {
81            schemas: HashMap::new(),
82            holt_store,
83        }
84    }
85
86    pub fn create_schema(&mut self, schema_name: impl Into<String>) -> QuillSQLResult<()> {
87        let schema_name = schema_name.into();
88        if self.schemas.contains_key(&schema_name) {
89            return Err(QuillSQLError::Storage(
90                "Cannot create duplicated schema".to_string(),
91            ));
92        }
93        self.schemas
94            .insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));
95
96        let tuple = Tuple::new(
97            SCHEMAS_SCHEMA.clone(),
98            vec![
99                DEFAULT_CATALOG_NAME.to_string().into(),
100                schema_name.clone().into(),
101            ],
102        );
103        self.insert_system_tuple(INFORMATION_SCHEMA_SCHEMAS, &tuple)?;
104        Ok(())
105    }
106
107    pub fn create_table(
108        &mut self,
109        table_ref: TableReference,
110        schema: SchemaRef,
111    ) -> QuillSQLResult<u64> {
112        let catalog_name = table_ref
113            .catalog()
114            .unwrap_or(DEFAULT_CATALOG_NAME)
115            .to_string();
116        let catalog_schema_name = table_ref
117            .schema()
118            .unwrap_or(DEFAULT_SCHEMA_NAME)
119            .to_string();
120        let table_name = table_ref.table().to_string();
121
122        {
123            let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else {
124                return Err(QuillSQLError::Storage(format!(
125                    "catalog schema {} not created yet",
126                    catalog_schema_name
127                )));
128            };
129            if catalog_schema.tables.contains_key(table_ref.table()) {
130                return Err(QuillSQLError::Storage(
131                    "Cannot create duplicated table".to_string(),
132                ));
133            }
134        }
135
136        let table_id = self
137            .holt_store
138            .create_table_descriptor(&table_ref, schema.as_ref())?;
139        self.schemas
140            .get_mut(&catalog_schema_name)
141            .expect("schema checked above")
142            .tables
143            .insert(
144                table_name.clone(),
145                CatalogTable::new(table_name.clone(), schema.clone(), table_id),
146            );
147
148        self.insert_table_metadata(&catalog_name, &catalog_schema_name, &table_name, &schema)?;
149        Ok(table_id)
150    }
151
152    pub fn drop_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<bool> {
153        let catalog_name = table_ref
154            .catalog()
155            .unwrap_or(DEFAULT_CATALOG_NAME)
156            .to_string();
157        let schema_name = table_ref
158            .schema()
159            .unwrap_or(DEFAULT_SCHEMA_NAME)
160            .to_string();
161        let table_name = table_ref.table().to_string();
162
163        if schema_name == INFORMATION_SCHEMA_NAME {
164            return Err(QuillSQLError::Execution(
165                "dropping information_schema tables is not allowed".to_string(),
166            ));
167        }
168
169        let Some(catalog_table) = self
170            .schemas
171            .get(&schema_name)
172            .and_then(|schema| schema.tables.get(&table_name))
173        else {
174            return Ok(false);
175        };
176        let index_names = catalog_table.indexes.keys().cloned().collect::<Vec<_>>();
177
178        for index_name in &index_names {
179            self.holt_store
180                .drop_index_descriptor(table_ref, index_name)?;
181        }
182        self.holt_store.drop_table_descriptor(table_ref)?;
183
184        self.schemas
185            .get_mut(&schema_name)
186            .expect("table existence checked above")
187            .tables
188            .remove(&table_name);
189
190        for index_name in index_names {
191            self.remove_index_metadata(&catalog_name, &schema_name, &table_name, &index_name)?;
192        }
193        self.remove_table_metadata(&catalog_name, &schema_name, &table_name)?;
194        Ok(true)
195    }
196
197    pub fn table_schema(&self, table_ref: &TableReference) -> QuillSQLResult<SchemaRef> {
198        Ok(self.catalog_table(table_ref)?.schema.clone())
199    }
200
201    pub fn table_id(&self, table_ref: &TableReference) -> QuillSQLResult<u64> {
202        Ok(self.catalog_table(table_ref)?.table_id)
203    }
204
205    pub fn table_statistics(&self, table_ref: &TableReference) -> Option<&TableStatistics> {
206        let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
207        self.schemas
208            .get(schema_name)
209            .and_then(|schema| schema.tables.get(table_ref.table()))
210            .and_then(|table| table.stats.as_ref())
211    }
212
213    pub fn analyze_table(&mut self, table_ref: &TableReference) -> QuillSQLResult<TableStatistics> {
214        let schema_name = table_ref
215            .schema()
216            .unwrap_or(DEFAULT_SCHEMA_NAME)
217            .to_string();
218        let table_name = table_ref.table().to_string();
219        let catalog_table = self.catalog_table(table_ref)?;
220        let schema = catalog_table.schema.clone();
221        let table = HoltTableHandle::new(
222            table_ref.clone(),
223            schema.clone(),
224            catalog_table.table_id,
225            self.holt_store.clone(),
226        );
227        let mut stats = TableStatistics::empty(schema.as_ref());
228        let mut stream = table.full_scan()?;
229        while let Some((_rid, meta, tuple)) = stream.next()? {
230            if !meta.is_deleted {
231                stats.record_tuple(&tuple);
232            }
233        }
234        self.schemas
235            .get_mut(&schema_name)
236            .and_then(|schema| schema.tables.get_mut(&table_name))
237            .expect("table existence checked above")
238            .stats = Some(stats.clone());
239        Ok(stats)
240    }
241
242    pub fn try_table_schema(&self, table_ref: &TableReference) -> Option<SchemaRef> {
243        let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
244        self.schemas
245            .get(schema_name)
246            .and_then(|schema| schema.tables.get(table_ref.table()))
247            .map(|table| table.schema.clone())
248    }
249
250    pub fn table_indexes(&self, table_ref: &TableReference) -> QuillSQLResult<Vec<CatalogIndex>> {
251        Ok(self
252            .catalog_table(table_ref)?
253            .indexes
254            .values()
255            .cloned()
256            .collect())
257    }
258
259    pub fn create_index(
260        &mut self,
261        index_name: String,
262        table_ref: &TableReference,
263        key_schema: SchemaRef,
264    ) -> QuillSQLResult<u64> {
265        let catalog_name = table_ref
266            .catalog()
267            .unwrap_or(DEFAULT_CATALOG_NAME)
268            .to_string();
269        let schema_name = table_ref
270            .schema()
271            .unwrap_or(DEFAULT_SCHEMA_NAME)
272            .to_string();
273        let table_name = table_ref.table().to_string();
274
275        {
276            let table = self.catalog_table_mut(table_ref)?;
277            if table.indexes.contains_key(&index_name) {
278                return Err(QuillSQLError::Storage(
279                    "Cannot create duplicated index".to_string(),
280                ));
281            }
282        }
283
284        let index_id =
285            self.holt_store
286                .create_index_descriptor(table_ref, &index_name, key_schema.as_ref())?;
287        self.catalog_table_mut(table_ref)?.indexes.insert(
288            index_name.clone(),
289            CatalogIndex::new(index_name.clone(), key_schema.clone(), index_id),
290        );
291        self.insert_index_metadata(
292            &catalog_name,
293            &schema_name,
294            &table_name,
295            &index_name,
296            key_schema.as_ref(),
297        )?;
298        Ok(index_id)
299    }
300
301    pub fn drop_index(
302        &mut self,
303        table_ref: &TableReference,
304        index_name: &str,
305    ) -> QuillSQLResult<bool> {
306        let catalog_name = table_ref
307            .catalog()
308            .unwrap_or(DEFAULT_CATALOG_NAME)
309            .to_string();
310        let schema_name = table_ref
311            .schema()
312            .unwrap_or(DEFAULT_SCHEMA_NAME)
313            .to_string();
314        let table_name = table_ref.table().to_string();
315
316        if schema_name == INFORMATION_SCHEMA_NAME {
317            return Err(QuillSQLError::Execution(
318                "dropping indexes on information_schema tables is not allowed".to_string(),
319            ));
320        }
321
322        if !self
323            .catalog_table(table_ref)?
324            .indexes
325            .contains_key(index_name)
326        {
327            return Ok(false);
328        }
329        self.holt_store
330            .drop_index_descriptor(table_ref, index_name)?;
331        self.catalog_table_mut(table_ref)?
332            .indexes
333            .remove(index_name);
334        self.remove_index_metadata(&catalog_name, &schema_name, &table_name, index_name)?;
335        Ok(true)
336    }
337
338    pub fn find_index_owner(
339        &self,
340        catalog_hint: Option<&str>,
341        schema_hint: Option<&str>,
342        index_name: &str,
343    ) -> Option<TableReference> {
344        let catalog_name = catalog_hint.unwrap_or(DEFAULT_CATALOG_NAME);
345
346        if let Some(schema_name) = schema_hint {
347            return self.find_index_in_schema(catalog_name, schema_name, index_name);
348        }
349
350        for schema_name in self.schemas.keys() {
351            if schema_name == INFORMATION_SCHEMA_NAME {
352                continue;
353            }
354            if let Some(table_ref) =
355                self.find_index_in_schema(catalog_name, schema_name, index_name)
356            {
357                return Some(table_ref);
358            }
359        }
360        None
361    }
362
363    pub fn load_schema(&mut self, name: impl Into<String>, schema: CatalogSchema) {
364        self.schemas.insert(name.into(), schema);
365    }
366
367    pub fn load_table(
368        &mut self,
369        table_ref: TableReference,
370        table_name: impl Into<String>,
371        schema: SchemaRef,
372        table_id: u64,
373    ) -> QuillSQLResult<()> {
374        let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
375        let catalog_schema = self.schemas.get_mut(schema_name).ok_or_else(|| {
376            QuillSQLError::Storage(format!("catalog schema {} not created yet", schema_name))
377        })?;
378        catalog_schema.tables.insert(
379            table_ref.table().to_string(),
380            CatalogTable::new(table_name, schema, table_id),
381        );
382        Ok(())
383    }
384
385    pub fn load_index(
386        &mut self,
387        table_ref: TableReference,
388        index_name: impl Into<String>,
389        key_schema: SchemaRef,
390        index_id: u64,
391    ) -> QuillSQLResult<()> {
392        let idx_name = index_name.into();
393        self.catalog_table_mut(&table_ref)?.indexes.insert(
394            idx_name.clone(),
395            CatalogIndex::new(idx_name, key_schema, index_id),
396        );
397        Ok(())
398    }
399
400    pub fn refresh_information_schema_projection(&self) -> QuillSQLResult<()> {
401        self.delete_matching_system_rows(INFORMATION_SCHEMA_SCHEMAS, |tuple| {
402            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
403                return Ok(false);
404            };
405            Ok(schema != INFORMATION_SCHEMA_NAME)
406        })?;
407        for table_name in [
408            INFORMATION_SCHEMA_TABLES,
409            INFORMATION_SCHEMA_COLUMNS,
410            INFORMATION_SCHEMA_INDEXES,
411        ] {
412            self.delete_matching_system_rows(table_name, |tuple| {
413                let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
414                    return Ok(false);
415                };
416                Ok(schema != INFORMATION_SCHEMA_NAME)
417            })?;
418        }
419
420        for (schema_name, schema) in &self.schemas {
421            if schema_name == INFORMATION_SCHEMA_NAME {
422                continue;
423            }
424            let tuple = Tuple::new(
425                SCHEMAS_SCHEMA.clone(),
426                vec![
427                    DEFAULT_CATALOG_NAME.to_string().into(),
428                    schema_name.clone().into(),
429                ],
430            );
431            self.insert_system_tuple(INFORMATION_SCHEMA_SCHEMAS, &tuple)?;
432
433            for (table_name, table) in &schema.tables {
434                self.insert_table_metadata(
435                    DEFAULT_CATALOG_NAME,
436                    schema_name,
437                    table_name,
438                    table.schema.as_ref(),
439                )?;
440                for (index_name, index) in &table.indexes {
441                    self.insert_index_metadata(
442                        DEFAULT_CATALOG_NAME,
443                        schema_name,
444                        table_name,
445                        index_name,
446                        index.key_schema.as_ref(),
447                    )?;
448                }
449            }
450        }
451        Ok(())
452    }
453
454    fn catalog_table(&self, table_ref: &TableReference) -> QuillSQLResult<&CatalogTable> {
455        let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
456        self.schemas
457            .get(schema_name)
458            .ok_or_else(|| {
459                QuillSQLError::Storage(format!("catalog schema {} not created yet", schema_name))
460            })?
461            .tables
462            .get(table_ref.table())
463            .ok_or_else(|| {
464                QuillSQLError::Storage(format!("table {} not created yet", table_ref.table()))
465            })
466    }
467
468    fn catalog_table_mut(
469        &mut self,
470        table_ref: &TableReference,
471    ) -> QuillSQLResult<&mut CatalogTable> {
472        let schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
473        self.schemas
474            .get_mut(schema_name)
475            .ok_or_else(|| {
476                QuillSQLError::Storage(format!("catalog schema {} not created yet", schema_name))
477            })?
478            .tables
479            .get_mut(table_ref.table())
480            .ok_or_else(|| {
481                QuillSQLError::Storage(format!("table {} not created yet", table_ref.table()))
482            })
483    }
484
485    fn information_schema_table(&self, table_name: &str) -> QuillSQLResult<&CatalogTable> {
486        self.schemas
487            .get(INFORMATION_SCHEMA_NAME)
488            .ok_or_else(|| {
489                QuillSQLError::Internal(
490                    "catalog schema information_schema not created yet".to_string(),
491                )
492            })?
493            .tables
494            .get(table_name)
495            .ok_or_else(|| {
496                QuillSQLError::Internal(format!(
497                    "table information_schema.{table_name} not created yet"
498                ))
499            })
500    }
501
502    fn insert_system_tuple(&self, table_name: &str, tuple: &Tuple) -> QuillSQLResult<()> {
503        let table_id = self.information_schema_table(table_name)?.table_id;
504        let _ = self.holt_store.insert_system_row(table_id, tuple)?;
505        Ok(())
506    }
507
508    fn insert_table_metadata(
509        &self,
510        catalog_name: &str,
511        schema_name: &str,
512        table_name: &str,
513        schema: &Schema,
514    ) -> QuillSQLResult<()> {
515        let tuple = Tuple::new(
516            TABLES_SCHEMA.clone(),
517            vec![
518                catalog_name.to_string().into(),
519                schema_name.to_string().into(),
520                table_name.to_string().into(),
521                0u32.into(),
522            ],
523        );
524        self.insert_system_tuple(INFORMATION_SCHEMA_TABLES, &tuple)?;
525
526        for col in &schema.columns {
527            let sql_type: sqlparser::ast::DataType = (&col.data_type).into();
528            let tuple = Tuple::new(
529                COLUMNS_SCHEMA.clone(),
530                vec![
531                    catalog_name.to_string().into(),
532                    schema_name.to_string().into(),
533                    table_name.to_string().into(),
534                    col.name.clone().into(),
535                    format!("{sql_type}").into(),
536                    col.nullable.into(),
537                    format!("{}", col.default).into(),
538                ],
539            );
540            self.insert_system_tuple(INFORMATION_SCHEMA_COLUMNS, &tuple)?;
541        }
542        Ok(())
543    }
544
545    fn insert_index_metadata(
546        &self,
547        catalog_name: &str,
548        schema_name: &str,
549        table_name: &str,
550        index_name: &str,
551        key_schema: &Schema,
552    ) -> QuillSQLResult<()> {
553        let tuple = Tuple::new(
554            INDEXES_SCHEMA.clone(),
555            vec![
556                catalog_name.to_string().into(),
557                schema_name.to_string().into(),
558                table_name.to_string().into(),
559                index_name.to_string().into(),
560                key_schema_to_varchar(key_schema).into(),
561                0u32.into(),
562                0u32.into(),
563                0u32.into(),
564            ],
565        );
566        self.insert_system_tuple(INFORMATION_SCHEMA_INDEXES, &tuple)
567    }
568
569    fn remove_table_metadata(
570        &self,
571        catalog_name: &str,
572        schema_name: &str,
573        table_name: &str,
574    ) -> QuillSQLResult<()> {
575        for table in [
576            INFORMATION_SCHEMA_TABLES,
577            INFORMATION_SCHEMA_COLUMNS,
578            INFORMATION_SCHEMA_INDEXES,
579        ] {
580            self.delete_matching_system_rows(table, |tuple| {
581                let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
582                    return Ok(false);
583                };
584                let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
585                    return Ok(false);
586                };
587                let ScalarValue::Varchar(Some(row_table)) = tuple.value(2)? else {
588                    return Ok(false);
589                };
590                Ok(catalog == catalog_name && schema == schema_name && row_table == table_name)
591            })?;
592        }
593        Ok(())
594    }
595
596    fn remove_index_metadata(
597        &self,
598        catalog_name: &str,
599        schema_name: &str,
600        table_name: &str,
601        index_name: &str,
602    ) -> QuillSQLResult<()> {
603        self.delete_matching_system_rows(INFORMATION_SCHEMA_INDEXES, |tuple| {
604            let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
605                return Ok(false);
606            };
607            let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
608                return Ok(false);
609            };
610            let ScalarValue::Varchar(Some(row_table)) = tuple.value(2)? else {
611                return Ok(false);
612            };
613            let ScalarValue::Varchar(Some(index)) = tuple.value(3)? else {
614                return Ok(false);
615            };
616            Ok(catalog == catalog_name
617                && schema == schema_name
618                && row_table == table_name
619                && index == index_name)
620        })
621    }
622
623    fn delete_matching_system_rows<F>(
624        &self,
625        table_name: &str,
626        mut predicate: F,
627    ) -> QuillSQLResult<()>
628    where
629        F: FnMut(&Tuple) -> QuillSQLResult<bool>,
630    {
631        let table = self.information_schema_table(table_name)?;
632        let table_id = table.table_id;
633        let table_ref = TableReference::Full {
634            catalog: DEFAULT_CATALOG_NAME.to_string(),
635            schema: INFORMATION_SCHEMA_NAME.to_string(),
636            table: table_name.to_string(),
637        };
638        let handle = HoltTableHandle::new(
639            table_ref,
640            table.schema.clone(),
641            table_id,
642            self.holt_store.clone(),
643        );
644        let mut stream = handle.full_scan()?;
645        let mut deleted = Vec::new();
646        while let Some((rid, meta, tuple)) = stream.next()? {
647            if !meta.is_deleted && predicate(&tuple)? {
648                deleted.push(rid);
649            }
650        }
651        for rid in deleted {
652            self.holt_store
653                .mark_system_row_deleted(table_id, table.schema.clone(), rid)?;
654        }
655        Ok(())
656    }
657
658    fn find_index_in_schema(
659        &self,
660        catalog_name: &str,
661        schema_name: &str,
662        index_name: &str,
663    ) -> Option<TableReference> {
664        let schema = self.schemas.get(schema_name)?;
665        for (table_name, table) in &schema.tables {
666            if table.indexes.contains_key(index_name) {
667                if catalog_name == DEFAULT_CATALOG_NAME {
668                    if schema_name == DEFAULT_SCHEMA_NAME {
669                        return Some(TableReference::Bare {
670                            table: table_name.clone(),
671                        });
672                    }
673                    return Some(TableReference::Partial {
674                        schema: schema_name.to_string(),
675                        table: table_name.clone(),
676                    });
677                }
678                return Some(TableReference::Full {
679                    catalog: catalog_name.to_string(),
680                    schema: schema_name.to_string(),
681                    table: table_name.clone(),
682                });
683            }
684        }
685        None
686    }
687}