quill_sql/catalog/
information.rs

1use crate::buffer::{AtomicPageId, PageId, INVALID_PAGE_ID};
2use crate::catalog::catalog::{CatalogSchema, CatalogTable};
3use crate::catalog::registry::global_table_registry;
4use crate::catalog::{
5    Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
6};
7use crate::database::Database;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::utils::scalar::ScalarValue;
10use crate::utils::table_ref::TableReference;
11
12use crate::storage::index::btree_index::BPlusTreeIndex;
13use crate::storage::table_heap::TableHeap;
14use std::sync::{Arc, LazyLock};
15
16pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
17pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
18pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
19pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
20pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";
21
22pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
23    Arc::new(Schema::new(vec![
24        Column::new("catalog", DataType::Varchar(None), false),
25        Column::new("schema", DataType::Varchar(None), false),
26    ]))
27});
28
29pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30    Arc::new(Schema::new(vec![
31        Column::new("table_catalog", DataType::Varchar(None), false),
32        Column::new("table_schema", DataType::Varchar(None), false),
33        Column::new("table_name", DataType::Varchar(None), false),
34        Column::new("first_page_id", DataType::UInt32, false),
35    ]))
36});
37
38pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
39    Arc::new(Schema::new(vec![
40        Column::new("table_catalog", DataType::Varchar(None), false),
41        Column::new("table_schema", DataType::Varchar(None), false),
42        Column::new("table_name", DataType::Varchar(None), false),
43        Column::new("column_name", DataType::Varchar(None), false),
44        Column::new("data_type", DataType::Varchar(None), false),
45        Column::new("nullable", DataType::Boolean, false),
46        Column::new("default", DataType::Varchar(None), false),
47    ]))
48});
49
50pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
51    Arc::new(Schema::new(vec![
52        Column::new("table_catalog", DataType::Varchar(None), false),
53        Column::new("table_schema", DataType::Varchar(None), false),
54        Column::new("table_name", DataType::Varchar(None), false),
55        Column::new("index_name", DataType::Varchar(None), false),
56        Column::new("key_schema", DataType::Varchar(None), false),
57        Column::new("internal_max_size", DataType::UInt32, false),
58        Column::new("leaf_max_size", DataType::UInt32, false),
59        Column::new("header_page_id", DataType::UInt32, false),
60    ]))
61});
62
63pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
64    load_information_schema(&mut db.catalog)?;
65    load_schemas(db)?;
66    create_default_schema_if_not_exists(&mut db.catalog)?;
67    load_user_tables(db)?;
68    load_user_indexes(db)?;
69    Ok(())
70}
71
72fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
73    if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
74        catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
75    }
76    Ok(())
77}
78
79fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
80    let meta = catalog.disk_manager.meta.read().unwrap();
81    let information_schema_schemas_first_page_id = meta.information_schema_schemas_first_page_id;
82    let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
83    let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
84    let information_schema_indexes_first_page_id = meta.information_schema_indexes_first_page_id;
85    drop(meta);
86
87    // load last page id
88    let information_schema_schemas_last_page_id = load_table_last_page_id(
89        catalog,
90        information_schema_schemas_first_page_id,
91        SCHEMAS_SCHEMA.clone(),
92    )?;
93    let information_schema_tables_last_page_id = load_table_last_page_id(
94        catalog,
95        information_schema_tables_first_page_id,
96        TABLES_SCHEMA.clone(),
97    )?;
98    let information_schema_columns_last_page_id = load_table_last_page_id(
99        catalog,
100        information_schema_columns_first_page_id,
101        COLUMNS_SCHEMA.clone(),
102    )?;
103    let information_schema_indexes_last_page_id = load_table_last_page_id(
104        catalog,
105        information_schema_indexes_first_page_id,
106        INDEXES_SCHEMA.clone(),
107    )?;
108
109    let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);
110
111    let schemas_table = TableHeap {
112        schema: SCHEMAS_SCHEMA.clone(),
113        buffer_pool: catalog.buffer_pool.clone(),
114        first_page_id: AtomicPageId::new(information_schema_schemas_first_page_id),
115        last_page_id: AtomicPageId::new(information_schema_schemas_last_page_id),
116    };
117    let schemas_heap = Arc::new(schemas_table);
118    information_schema.tables.insert(
119        INFORMATION_SCHEMA_SCHEMAS.to_string(),
120        CatalogTable::new(INFORMATION_SCHEMA_SCHEMAS, schemas_heap.clone()),
121    );
122    global_table_registry().register(
123        TableReference::Full {
124            catalog: DEFAULT_CATALOG_NAME.to_string(),
125            schema: INFORMATION_SCHEMA_NAME.to_string(),
126            table: INFORMATION_SCHEMA_SCHEMAS.to_string(),
127        },
128        schemas_heap,
129    );
130
131    let tables_table = TableHeap {
132        schema: TABLES_SCHEMA.clone(),
133        buffer_pool: catalog.buffer_pool.clone(),
134        first_page_id: AtomicPageId::new(information_schema_tables_first_page_id),
135        last_page_id: AtomicPageId::new(information_schema_tables_last_page_id),
136    };
137    let tables_heap = Arc::new(tables_table);
138    information_schema.tables.insert(
139        INFORMATION_SCHEMA_TABLES.to_string(),
140        CatalogTable::new(INFORMATION_SCHEMA_TABLES, tables_heap.clone()),
141    );
142    global_table_registry().register(
143        TableReference::Full {
144            catalog: DEFAULT_CATALOG_NAME.to_string(),
145            schema: INFORMATION_SCHEMA_NAME.to_string(),
146            table: INFORMATION_SCHEMA_TABLES.to_string(),
147        },
148        tables_heap,
149    );
150
151    let columns_table = TableHeap {
152        schema: COLUMNS_SCHEMA.clone(),
153        buffer_pool: catalog.buffer_pool.clone(),
154        first_page_id: AtomicPageId::new(information_schema_columns_first_page_id),
155        last_page_id: AtomicPageId::new(information_schema_columns_last_page_id),
156    };
157    let columns_heap = Arc::new(columns_table);
158    information_schema.tables.insert(
159        INFORMATION_SCHEMA_COLUMNS.to_string(),
160        CatalogTable::new(INFORMATION_SCHEMA_COLUMNS, columns_heap.clone()),
161    );
162    global_table_registry().register(
163        TableReference::Full {
164            catalog: DEFAULT_CATALOG_NAME.to_string(),
165            schema: INFORMATION_SCHEMA_NAME.to_string(),
166            table: INFORMATION_SCHEMA_COLUMNS.to_string(),
167        },
168        columns_heap,
169    );
170
171    let indexes_table = TableHeap {
172        schema: INDEXES_SCHEMA.clone(),
173        buffer_pool: catalog.buffer_pool.clone(),
174        first_page_id: AtomicPageId::new(information_schema_indexes_first_page_id),
175        last_page_id: AtomicPageId::new(information_schema_indexes_last_page_id),
176    };
177    let indexes_heap = Arc::new(indexes_table);
178    information_schema.tables.insert(
179        INFORMATION_SCHEMA_INDEXES.to_string(),
180        CatalogTable::new(INFORMATION_SCHEMA_INDEXES, indexes_heap.clone()),
181    );
182    global_table_registry().register(
183        TableReference::Full {
184            catalog: DEFAULT_CATALOG_NAME.to_string(),
185            schema: INFORMATION_SCHEMA_NAME.to_string(),
186            table: INFORMATION_SCHEMA_INDEXES.to_string(),
187        },
188        indexes_heap,
189    );
190
191    catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
192    Ok(())
193}
194
195fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
196    let schema_tuples = db.run(&format!(
197        "select * from {}.{}",
198        INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMAS
199    ))?;
200    for schema_tuple in schema_tuples.into_iter() {
201        let error = Err(QuillSQLError::Internal(format!(
202            "Failed to decode schema tuple: {:?}",
203            schema_tuple,
204        )));
205        let ScalarValue::Varchar(Some(_catalog)) = schema_tuple.value(0)? else {
206            return error;
207        };
208        let ScalarValue::Varchar(Some(schema_name)) = schema_tuple.value(1)? else {
209            return error;
210        };
211        db.catalog
212            .load_schema(schema_name, CatalogSchema::new(schema_name));
213    }
214    Ok(())
215}
216
217fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
218    let table_tuples = db.run(&format!(
219        "select * from {}.{}",
220        INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_TABLES
221    ))?;
222    for table_tuple in table_tuples.into_iter() {
223        let error = Err(QuillSQLError::Internal(format!(
224            "Failed to decode table tuple: {:?}",
225            table_tuple
226        )));
227        let ScalarValue::Varchar(Some(catalog)) = table_tuple.value(0)? else {
228            return error;
229        };
230        let ScalarValue::Varchar(Some(table_schema)) = table_tuple.value(1)? else {
231            return error;
232        };
233        let ScalarValue::Varchar(Some(table_name)) = table_tuple.value(2)? else {
234            return error;
235        };
236        let ScalarValue::UInt32(Some(first_page_id)) = table_tuple.value(3)? else {
237            return error;
238        };
239
240        let column_tuples = db.run(&format!("select * from {}.{} where table_catalog = '{}' and table_schema = '{}' and table_name = '{}'",
241                                            INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_COLUMNS, catalog, table_schema, table_name))?;
242        let mut columns = vec![];
243        for column_tuple in column_tuples.into_iter() {
244            let error = Err(QuillSQLError::Internal(format!(
245                "Failed to decode column tuple: {:?}",
246                column_tuple
247            )));
248            let ScalarValue::Varchar(Some(column_name)) = column_tuple.value(3)? else {
249                return error;
250            };
251            let ScalarValue::Varchar(Some(data_type_str)) = column_tuple.value(4)? else {
252                return error;
253            };
254            let ScalarValue::Boolean(Some(nullable)) = column_tuple.value(5)? else {
255                return error;
256            };
257            let ScalarValue::Varchar(Some(default)) = column_tuple.value(6)? else {
258                return error;
259            };
260            let data_type: DataType = data_type_str.as_str().try_into()?;
261            let default = ScalarValue::from_string(default, data_type)?;
262            columns
263                .push(Column::new(column_name.clone(), data_type, *nullable).with_default(default));
264        }
265        let schema = Arc::new(Schema::new(columns));
266
267        // load last page id
268        let last_page_id =
269            load_table_last_page_id(&mut db.catalog, *first_page_id, schema.clone())?;
270        let table_heap = TableHeap {
271            schema: schema.clone(),
272            buffer_pool: db.buffer_pool.clone(),
273            first_page_id: AtomicPageId::new(*first_page_id),
274            last_page_id: AtomicPageId::new(last_page_id),
275        };
276        db.catalog.load_table(
277            TableReference::Full {
278                catalog: catalog.to_string(),
279                schema: table_schema.to_string(),
280                table: table_name.to_string(),
281            },
282            CatalogTable::new(table_name, Arc::new(table_heap)),
283        )?;
284    }
285    Ok(())
286}
287
288fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
289    let index_tuples = db.run(&format!(
290        "select * from {}.{}",
291        INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_INDEXES
292    ))?;
293    for index_tuple in index_tuples.into_iter() {
294        let error = Err(QuillSQLError::Internal(format!(
295            "Failed to decode index tuple: {:?}",
296            index_tuple
297        )));
298        let ScalarValue::Varchar(Some(catalog_name)) = index_tuple.value(0)? else {
299            return error;
300        };
301        let ScalarValue::Varchar(Some(table_schema_name)) = index_tuple.value(1)? else {
302            return error;
303        };
304        let ScalarValue::Varchar(Some(table_name)) = index_tuple.value(2)? else {
305            return error;
306        };
307        let ScalarValue::Varchar(Some(index_name)) = index_tuple.value(3)? else {
308            return error;
309        };
310        let ScalarValue::Varchar(Some(key_schema_str)) = index_tuple.value(4)? else {
311            return error;
312        };
313        let ScalarValue::UInt32(Some(internal_max_size)) = index_tuple.value(5)? else {
314            return error;
315        };
316        let ScalarValue::UInt32(Some(leaf_max_size)) = index_tuple.value(6)? else {
317            return error;
318        };
319        let ScalarValue::UInt32(Some(header_page_id)) = index_tuple.value(7)? else {
320            return error;
321        };
322
323        let table_ref = TableReference::Full {
324            catalog: catalog_name.clone(),
325            schema: table_schema_name.clone(),
326            table: table_name.clone(),
327        };
328        let table_schema = db.catalog.table_heap(&table_ref)?.schema.clone();
329        let key_schema = Arc::new(parse_key_schema_from_varchar(
330            key_schema_str.as_str(),
331            table_schema,
332        )?);
333
334        let b_plus_tree_index = BPlusTreeIndex::open(
335            key_schema,
336            db.buffer_pool.clone(),
337            *internal_max_size,
338            *leaf_max_size,
339            *header_page_id,
340        );
341        db.catalog
342            .load_index(table_ref, index_name, Arc::new(b_plus_tree_index))?;
343    }
344    Ok(())
345}
346
347fn load_table_last_page_id(
348    catalog: &mut Catalog,
349    first_page_id: PageId,
350    schema: SchemaRef,
351) -> QuillSQLResult<PageId> {
352    let mut page_id = first_page_id;
353    loop {
354        let (_, table_page) = catalog
355            .buffer_pool
356            .fetch_table_page(page_id, schema.clone())?;
357
358        if table_page.header.next_page_id == INVALID_PAGE_ID {
359            return Ok(page_id);
360        } else {
361            page_id = table_page.header.next_page_id;
362        }
363    }
364}
365
366pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
367    key_schema
368        .columns
369        .iter()
370        .map(|col| col.name.as_str())
371        .collect::<Vec<_>>()
372        .join(", ")
373}
374
375fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
376    let column_names = varchar
377        .split(",")
378        .map(|name| name.trim())
379        .collect::<Vec<&str>>();
380    let indices = column_names
381        .into_iter()
382        .map(|name| table_schema.index_of(None, name))
383        .collect::<QuillSQLResult<Vec<usize>>>()?;
384    table_schema.project(&indices)
385}