quill_sql/catalog/
information.rs

1use crate::buffer::{AtomicPageId, PageId, INVALID_PAGE_ID};
2use crate::catalog::catalog::{CatalogSchema, CatalogTable};
3use crate::catalog::{
4    Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
5};
6use crate::database::Database;
7use crate::error::{QuillSQLError, QuillSQLResult};
8use crate::utils::scalar::ScalarValue;
9use crate::utils::table_ref::TableReference;
10
11use crate::storage::index::btree_index::BPlusTreeIndex;
12use crate::storage::table_heap::{TableHeap, TableIterator};
13use std::collections::HashMap;
14use std::sync::{Arc, LazyLock};
15
16pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
17pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
18pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
19pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
20pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";
21
22pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
23    Arc::new(Schema::new(vec![
24        Column::new("catalog", DataType::Varchar(None), false),
25        Column::new("schema", DataType::Varchar(None), false),
26    ]))
27});
28
29pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30    Arc::new(Schema::new(vec![
31        Column::new("table_catalog", DataType::Varchar(None), false),
32        Column::new("table_schema", DataType::Varchar(None), false),
33        Column::new("table_name", DataType::Varchar(None), false),
34        Column::new("first_page_id", DataType::UInt32, false),
35    ]))
36});
37
38pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
39    Arc::new(Schema::new(vec![
40        Column::new("table_catalog", DataType::Varchar(None), false),
41        Column::new("table_schema", DataType::Varchar(None), false),
42        Column::new("table_name", DataType::Varchar(None), false),
43        Column::new("column_name", DataType::Varchar(None), false),
44        Column::new("data_type", DataType::Varchar(None), false),
45        Column::new("nullable", DataType::Boolean, false),
46        Column::new("default", DataType::Varchar(None), false),
47    ]))
48});
49
50pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
51    Arc::new(Schema::new(vec![
52        Column::new("table_catalog", DataType::Varchar(None), false),
53        Column::new("table_schema", DataType::Varchar(None), false),
54        Column::new("table_name", DataType::Varchar(None), false),
55        Column::new("index_name", DataType::Varchar(None), false),
56        Column::new("key_schema", DataType::Varchar(None), false),
57        Column::new("internal_max_size", DataType::UInt32, false),
58        Column::new("leaf_max_size", DataType::UInt32, false),
59        Column::new("header_page_id", DataType::UInt32, false),
60    ]))
61});
62
63pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
64    load_information_schema(&mut db.catalog)?;
65    load_schemas(db)?;
66    create_default_schema_if_not_exists(&mut db.catalog)?;
67    load_user_tables(db)?;
68    load_user_indexes(db)?;
69    Ok(())
70}
71
72fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
73    if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
74        catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
75    }
76    Ok(())
77}
78
79fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
80    let meta = catalog.disk_manager.meta.read().unwrap();
81    let information_schema_schemas_first_page_id = meta.information_schema_schemas_first_page_id;
82    let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
83    let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
84    let information_schema_indexes_first_page_id = meta.information_schema_indexes_first_page_id;
85    drop(meta);
86
87    // load last page id
88    let information_schema_schemas_last_page_id = load_table_last_page_id(
89        catalog,
90        information_schema_schemas_first_page_id,
91        SCHEMAS_SCHEMA.clone(),
92    )?;
93    let information_schema_tables_last_page_id = load_table_last_page_id(
94        catalog,
95        information_schema_tables_first_page_id,
96        TABLES_SCHEMA.clone(),
97    )?;
98    let information_schema_columns_last_page_id = load_table_last_page_id(
99        catalog,
100        information_schema_columns_first_page_id,
101        COLUMNS_SCHEMA.clone(),
102    )?;
103    let information_schema_indexes_last_page_id = load_table_last_page_id(
104        catalog,
105        information_schema_indexes_first_page_id,
106        INDEXES_SCHEMA.clone(),
107    )?;
108
109    let table_registry = catalog.table_registry();
110    let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);
111
112    let schemas_table = TableHeap {
113        schema: SCHEMAS_SCHEMA.clone(),
114        buffer_pool: catalog.buffer_pool.clone(),
115        first_page_id: AtomicPageId::new(information_schema_schemas_first_page_id),
116        last_page_id: AtomicPageId::new(information_schema_schemas_last_page_id),
117    };
118    let schemas_heap = Arc::new(schemas_table);
119    information_schema.tables.insert(
120        INFORMATION_SCHEMA_SCHEMAS.to_string(),
121        CatalogTable::new(INFORMATION_SCHEMA_SCHEMAS, schemas_heap.clone()),
122    );
123    table_registry.register(
124        TableReference::Full {
125            catalog: DEFAULT_CATALOG_NAME.to_string(),
126            schema: INFORMATION_SCHEMA_NAME.to_string(),
127            table: INFORMATION_SCHEMA_SCHEMAS.to_string(),
128        },
129        schemas_heap,
130    );
131
132    let tables_table = TableHeap {
133        schema: TABLES_SCHEMA.clone(),
134        buffer_pool: catalog.buffer_pool.clone(),
135        first_page_id: AtomicPageId::new(information_schema_tables_first_page_id),
136        last_page_id: AtomicPageId::new(information_schema_tables_last_page_id),
137    };
138    let tables_heap = Arc::new(tables_table);
139    information_schema.tables.insert(
140        INFORMATION_SCHEMA_TABLES.to_string(),
141        CatalogTable::new(INFORMATION_SCHEMA_TABLES, tables_heap.clone()),
142    );
143    table_registry.register(
144        TableReference::Full {
145            catalog: DEFAULT_CATALOG_NAME.to_string(),
146            schema: INFORMATION_SCHEMA_NAME.to_string(),
147            table: INFORMATION_SCHEMA_TABLES.to_string(),
148        },
149        tables_heap,
150    );
151
152    let columns_table = TableHeap {
153        schema: COLUMNS_SCHEMA.clone(),
154        buffer_pool: catalog.buffer_pool.clone(),
155        first_page_id: AtomicPageId::new(information_schema_columns_first_page_id),
156        last_page_id: AtomicPageId::new(information_schema_columns_last_page_id),
157    };
158    let columns_heap = Arc::new(columns_table);
159    information_schema.tables.insert(
160        INFORMATION_SCHEMA_COLUMNS.to_string(),
161        CatalogTable::new(INFORMATION_SCHEMA_COLUMNS, columns_heap.clone()),
162    );
163    table_registry.register(
164        TableReference::Full {
165            catalog: DEFAULT_CATALOG_NAME.to_string(),
166            schema: INFORMATION_SCHEMA_NAME.to_string(),
167            table: INFORMATION_SCHEMA_COLUMNS.to_string(),
168        },
169        columns_heap,
170    );
171
172    let indexes_table = TableHeap {
173        schema: INDEXES_SCHEMA.clone(),
174        buffer_pool: catalog.buffer_pool.clone(),
175        first_page_id: AtomicPageId::new(information_schema_indexes_first_page_id),
176        last_page_id: AtomicPageId::new(information_schema_indexes_last_page_id),
177    };
178    let indexes_heap = Arc::new(indexes_table);
179    information_schema.tables.insert(
180        INFORMATION_SCHEMA_INDEXES.to_string(),
181        CatalogTable::new(INFORMATION_SCHEMA_INDEXES, indexes_heap.clone()),
182    );
183    table_registry.register(
184        TableReference::Full {
185            catalog: DEFAULT_CATALOG_NAME.to_string(),
186            schema: INFORMATION_SCHEMA_NAME.to_string(),
187            table: INFORMATION_SCHEMA_INDEXES.to_string(),
188        },
189        indexes_heap,
190    );
191
192    catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
193    Ok(())
194}
195
196fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
197    let schemas_table = {
198        let information_schema =
199            db.catalog
200                .schemas
201                .get(INFORMATION_SCHEMA_NAME)
202                .ok_or_else(|| {
203                    QuillSQLError::Internal("information_schema not initialized".to_string())
204                })?;
205        information_schema
206            .tables
207            .get(INFORMATION_SCHEMA_SCHEMAS)
208            .ok_or_else(|| {
209                QuillSQLError::Internal("information_schema.schemas missing".to_string())
210            })?
211            .table
212            .clone()
213    };
214
215    let mut iterator = TableIterator::new(schemas_table, ..);
216    while let Some((_rid, meta, tuple)) = iterator.next()? {
217        if meta.is_deleted {
218            continue;
219        }
220        let ScalarValue::Varchar(Some(_catalog)) = tuple.value(0)? else {
221            return Err(QuillSQLError::Internal(
222                "invalid catalog value in information_schema.schemas".to_string(),
223            ));
224        };
225        let ScalarValue::Varchar(Some(schema_name)) = tuple.value(1)? else {
226            return Err(QuillSQLError::Internal(
227                "invalid schema value in information_schema.schemas".to_string(),
228            ));
229        };
230        db.catalog
231            .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
232    }
233    Ok(())
234}
235
236fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
237    let (columns_heap, tables_heap) = {
238        let information_schema =
239            db.catalog
240                .schemas
241                .get(INFORMATION_SCHEMA_NAME)
242                .ok_or_else(|| {
243                    QuillSQLError::Internal("information_schema not initialized".to_string())
244                })?;
245        let columns_heap = information_schema
246            .tables
247            .get(INFORMATION_SCHEMA_COLUMNS)
248            .ok_or_else(|| {
249                QuillSQLError::Internal("information_schema.columns missing".to_string())
250            })?
251            .table
252            .clone();
253        let tables_heap = information_schema
254            .tables
255            .get(INFORMATION_SCHEMA_TABLES)
256            .ok_or_else(|| {
257                QuillSQLError::Internal("information_schema.tables missing".to_string())
258            })?
259            .table
260            .clone();
261        (columns_heap, tables_heap)
262    };
263
264    let mut column_map: HashMap<(String, String, String), Vec<Column>> = HashMap::new();
265    let mut column_iter = TableIterator::new(columns_heap, ..);
266    while let Some((_rid, meta, tuple)) = column_iter.next()? {
267        if meta.is_deleted {
268            continue;
269        }
270        let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
271            return Err(QuillSQLError::Internal(
272                "invalid catalog in information_schema.columns".to_string(),
273            ));
274        };
275        let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
276            return Err(QuillSQLError::Internal(
277                "invalid schema in information_schema.columns".to_string(),
278            ));
279        };
280        let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
281            return Err(QuillSQLError::Internal(
282                "invalid table in information_schema.columns".to_string(),
283            ));
284        };
285        let ScalarValue::Varchar(Some(column_name)) = tuple.value(3)? else {
286            return Err(QuillSQLError::Internal(
287                "invalid column name in information_schema.columns".to_string(),
288            ));
289        };
290        let ScalarValue::Varchar(Some(data_type_str)) = tuple.value(4)? else {
291            return Err(QuillSQLError::Internal(
292                "invalid data type in information_schema.columns".to_string(),
293            ));
294        };
295        let ScalarValue::Boolean(Some(nullable)) = tuple.value(5)? else {
296            return Err(QuillSQLError::Internal(
297                "invalid nullable flag in information_schema.columns".to_string(),
298            ));
299        };
300        let ScalarValue::Varchar(Some(default)) = tuple.value(6)? else {
301            return Err(QuillSQLError::Internal(
302                "invalid default in information_schema.columns".to_string(),
303            ));
304        };
305
306        let data_type: DataType = data_type_str.as_str().try_into()?;
307        let default_value = ScalarValue::from_string(default, data_type)?;
308        column_map
309            .entry((catalog.clone(), schema.clone(), table.clone()))
310            .or_default()
311            .push(
312                Column::new(column_name.clone(), data_type, *nullable).with_default(default_value),
313            );
314    }
315
316    let mut table_iter = TableIterator::new(tables_heap, ..);
317    while let Some((_rid, meta, tuple)) = table_iter.next()? {
318        if meta.is_deleted {
319            continue;
320        }
321        let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
322            return Err(QuillSQLError::Internal(
323                "invalid catalog in information_schema.tables".to_string(),
324            ));
325        };
326        let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
327            return Err(QuillSQLError::Internal(
328                "invalid schema in information_schema.tables".to_string(),
329            ));
330        };
331        let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
332            return Err(QuillSQLError::Internal(
333                "invalid table name in information_schema.tables".to_string(),
334            ));
335        };
336        let ScalarValue::UInt32(Some(first_page_id)) = tuple.value(3)? else {
337            return Err(QuillSQLError::Internal(
338                "invalid first_page_id in information_schema.tables".to_string(),
339            ));
340        };
341
342        let columns = column_map
343            .remove(&(catalog.clone(), schema.clone(), table.clone()))
344            .unwrap_or_default();
345        let schema_ref = Arc::new(Schema::new(columns));
346
347        let last_page_id =
348            load_table_last_page_id(&mut db.catalog, *first_page_id, schema_ref.clone())?;
349        let table_heap = TableHeap {
350            schema: schema_ref.clone(),
351            buffer_pool: db.buffer_pool.clone(),
352            first_page_id: AtomicPageId::new(*first_page_id),
353            last_page_id: AtomicPageId::new(last_page_id),
354        };
355
356        db.catalog.load_table(
357            TableReference::Full {
358                catalog: catalog.to_string(),
359                schema: schema.to_string(),
360                table: table.to_string(),
361            },
362            CatalogTable::new(table, Arc::new(table_heap)),
363        )?;
364    }
365    Ok(())
366}
367
368fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
369    let indexes_heap = {
370        let information_schema =
371            db.catalog
372                .schemas
373                .get(INFORMATION_SCHEMA_NAME)
374                .ok_or_else(|| {
375                    QuillSQLError::Internal("information_schema not initialized".to_string())
376                })?;
377
378        information_schema
379            .tables
380            .get(INFORMATION_SCHEMA_INDEXES)
381            .ok_or_else(|| {
382                QuillSQLError::Internal("information_schema.indexes missing".to_string())
383            })?
384            .table
385            .clone()
386    };
387
388    let mut iterator = TableIterator::new(indexes_heap, ..);
389    while let Some((_rid, meta, tuple)) = iterator.next()? {
390        if meta.is_deleted {
391            continue;
392        }
393        let ScalarValue::Varchar(Some(catalog_name)) = tuple.value(0)? else {
394            return Err(QuillSQLError::Internal(
395                "invalid catalog in information_schema.indexes".to_string(),
396            ));
397        };
398        let ScalarValue::Varchar(Some(table_schema_name)) = tuple.value(1)? else {
399            return Err(QuillSQLError::Internal(
400                "invalid schema in information_schema.indexes".to_string(),
401            ));
402        };
403        let ScalarValue::Varchar(Some(table_name)) = tuple.value(2)? else {
404            return Err(QuillSQLError::Internal(
405                "invalid table in information_schema.indexes".to_string(),
406            ));
407        };
408        let ScalarValue::Varchar(Some(index_name)) = tuple.value(3)? else {
409            return Err(QuillSQLError::Internal(
410                "invalid index name in information_schema.indexes".to_string(),
411            ));
412        };
413        let ScalarValue::Varchar(Some(key_schema_str)) = tuple.value(4)? else {
414            return Err(QuillSQLError::Internal(
415                "invalid key schema in information_schema.indexes".to_string(),
416            ));
417        };
418        let ScalarValue::UInt32(Some(internal_max_size)) = tuple.value(5)? else {
419            return Err(QuillSQLError::Internal(
420                "invalid internal max size in information_schema.indexes".to_string(),
421            ));
422        };
423        let ScalarValue::UInt32(Some(leaf_max_size)) = tuple.value(6)? else {
424            return Err(QuillSQLError::Internal(
425                "invalid leaf max size in information_schema.indexes".to_string(),
426            ));
427        };
428        let ScalarValue::UInt32(Some(header_page_id)) = tuple.value(7)? else {
429            return Err(QuillSQLError::Internal(
430                "invalid header page id in information_schema.indexes".to_string(),
431            ));
432        };
433
434        let table_ref = TableReference::Full {
435            catalog: catalog_name.clone(),
436            schema: table_schema_name.clone(),
437            table: table_name.clone(),
438        };
439        let table_schema = db.catalog.table_heap(&table_ref)?.schema.clone();
440        let key_schema = Arc::new(parse_key_schema_from_varchar(
441            key_schema_str.as_str(),
442            table_schema,
443        )?);
444
445        let b_plus_tree_index = BPlusTreeIndex::open(
446            key_schema,
447            db.buffer_pool.clone(),
448            *internal_max_size,
449            *leaf_max_size,
450            *header_page_id,
451        );
452        db.catalog
453            .load_index(table_ref, index_name, Arc::new(b_plus_tree_index))?;
454    }
455    Ok(())
456}
457
458fn load_table_last_page_id(
459    catalog: &mut Catalog,
460    first_page_id: PageId,
461    schema: SchemaRef,
462) -> QuillSQLResult<PageId> {
463    let mut page_id = first_page_id;
464    loop {
465        let (_, table_page) = catalog
466            .buffer_pool
467            .fetch_table_page(page_id, schema.clone())?;
468
469        if table_page.header.next_page_id == INVALID_PAGE_ID {
470            return Ok(page_id);
471        } else {
472            page_id = table_page.header.next_page_id;
473        }
474    }
475}
476
477pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
478    key_schema
479        .columns
480        .iter()
481        .map(|col| col.name.as_str())
482        .collect::<Vec<_>>()
483        .join(", ")
484}
485
486fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
487    let column_names = varchar
488        .split(",")
489        .map(|name| name.trim())
490        .collect::<Vec<&str>>();
491    let indices = column_names
492        .into_iter()
493        .map(|name| table_schema.index_of(None, name))
494        .collect::<QuillSQLResult<Vec<usize>>>()?;
495    table_schema.project(&indices)
496}