Skip to main content

quill_sql/catalog/
information.rs

1use std::collections::HashMap;
2use std::sync::{Arc, LazyLock};
3
4use crate::catalog::{
5    Catalog, CatalogSchema, CatalogTable, Column, DataType, Schema, SchemaRef,
6    DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
7};
8use crate::database::Database;
9use crate::error::{QuillSQLError, QuillSQLResult};
10use crate::storage::engine::TableHandle;
11use crate::storage::holt::HoltTableHandle;
12use crate::storage::tuple::Tuple;
13use crate::utils::scalar::ScalarValue;
14use crate::utils::table_ref::TableReference;
15
16pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
17pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
18pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
19pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
20pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";
21
22pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
23    Arc::new(Schema::new(vec![
24        Column::new("catalog", DataType::Varchar(None), false),
25        Column::new("schema", DataType::Varchar(None), false),
26    ]))
27});
28
29pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30    Arc::new(Schema::new(vec![
31        Column::new("table_catalog", DataType::Varchar(None), false),
32        Column::new("table_schema", DataType::Varchar(None), false),
33        Column::new("table_name", DataType::Varchar(None), false),
34        Column::new("first_page_id", DataType::UInt32, false),
35    ]))
36});
37
38pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
39    Arc::new(Schema::new(vec![
40        Column::new("table_catalog", DataType::Varchar(None), false),
41        Column::new("table_schema", DataType::Varchar(None), false),
42        Column::new("table_name", DataType::Varchar(None), false),
43        Column::new("column_name", DataType::Varchar(None), false),
44        Column::new("data_type", DataType::Varchar(None), false),
45        Column::new("nullable", DataType::Boolean, false),
46        Column::new("default", DataType::Varchar(None), false),
47    ]))
48});
49
50pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
51    Arc::new(Schema::new(vec![
52        Column::new("table_catalog", DataType::Varchar(None), false),
53        Column::new("table_schema", DataType::Varchar(None), false),
54        Column::new("table_name", DataType::Varchar(None), false),
55        Column::new("index_name", DataType::Varchar(None), false),
56        Column::new("key_schema", DataType::Varchar(None), false),
57        Column::new("internal_max_size", DataType::UInt32, false),
58        Column::new("leaf_max_size", DataType::UInt32, false),
59        Column::new("header_page_id", DataType::UInt32, false),
60    ]))
61});
62
63pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
64    load_information_schema(&mut db.catalog)?;
65    load_schemas(db)?;
66    create_default_schema_if_not_exists(&mut db.catalog)?;
67    load_user_tables(db)?;
68    load_descriptor_tables(db)?;
69    load_user_indexes(db)?;
70    load_descriptor_indexes(db)?;
71    db.catalog.refresh_information_schema_projection()?;
72    Ok(())
73}
74
75fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
76    if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
77        catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
78    }
79    Ok(())
80}
81
82fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
83    let holt = catalog.holt_store.clone();
84    let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);
85
86    for (table_name, schema) in [
87        (INFORMATION_SCHEMA_SCHEMAS, SCHEMAS_SCHEMA.clone()),
88        (INFORMATION_SCHEMA_TABLES, TABLES_SCHEMA.clone()),
89        (INFORMATION_SCHEMA_COLUMNS, COLUMNS_SCHEMA.clone()),
90        (INFORMATION_SCHEMA_INDEXES, INDEXES_SCHEMA.clone()),
91    ] {
92        let table_ref = TableReference::Full {
93            catalog: DEFAULT_CATALOG_NAME.to_string(),
94            schema: INFORMATION_SCHEMA_NAME.to_string(),
95            table: table_name.to_string(),
96        };
97        let table_id = match holt.table_descriptor(&table_ref)? {
98            Some(table_id) => table_id,
99            None => holt.create_table_descriptor(&table_ref, schema.as_ref())?,
100        };
101        information_schema.tables.insert(
102            table_name.to_string(),
103            CatalogTable::new(table_name, schema, table_id),
104        );
105    }
106
107    catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
108    Ok(())
109}
110
111fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
112    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_SCHEMAS)? {
113        let ScalarValue::Varchar(Some(schema_name)) = tuple.value(1)? else {
114            return Err(QuillSQLError::Internal(
115                "invalid schema value in information_schema.schemas".to_string(),
116            ));
117        };
118        db.catalog
119            .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
120    }
121    Ok(())
122}
123
124fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
125    let mut column_map: HashMap<(String, String, String), Vec<Column>> = HashMap::new();
126    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_COLUMNS)? {
127        let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
128            return Err(QuillSQLError::Internal(
129                "invalid catalog in information_schema.columns".to_string(),
130            ));
131        };
132        let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
133            return Err(QuillSQLError::Internal(
134                "invalid schema in information_schema.columns".to_string(),
135            ));
136        };
137        let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
138            return Err(QuillSQLError::Internal(
139                "invalid table in information_schema.columns".to_string(),
140            ));
141        };
142        let ScalarValue::Varchar(Some(column_name)) = tuple.value(3)? else {
143            return Err(QuillSQLError::Internal(
144                "invalid column name in information_schema.columns".to_string(),
145            ));
146        };
147        let ScalarValue::Varchar(Some(data_type_str)) = tuple.value(4)? else {
148            return Err(QuillSQLError::Internal(
149                "invalid data type in information_schema.columns".to_string(),
150            ));
151        };
152        let ScalarValue::Boolean(Some(nullable)) = tuple.value(5)? else {
153            return Err(QuillSQLError::Internal(
154                "invalid nullable flag in information_schema.columns".to_string(),
155            ));
156        };
157        let ScalarValue::Varchar(Some(default)) = tuple.value(6)? else {
158            return Err(QuillSQLError::Internal(
159                "invalid default in information_schema.columns".to_string(),
160            ));
161        };
162
163        let data_type: DataType = data_type_str.as_str().try_into()?;
164        let default_value = ScalarValue::from_string(default, data_type)?;
165        column_map
166            .entry((catalog.clone(), schema.clone(), table.clone()))
167            .or_default()
168            .push(
169                Column::new(column_name.clone(), data_type, *nullable).with_default(default_value),
170            );
171    }
172
173    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_TABLES)? {
174        let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
175            return Err(QuillSQLError::Internal(
176                "invalid catalog in information_schema.tables".to_string(),
177            ));
178        };
179        let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
180            return Err(QuillSQLError::Internal(
181                "invalid schema in information_schema.tables".to_string(),
182            ));
183        };
184        let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
185            return Err(QuillSQLError::Internal(
186                "invalid table name in information_schema.tables".to_string(),
187            ));
188        };
189        let table_ref = TableReference::Full {
190            catalog: catalog.to_string(),
191            schema: schema.to_string(),
192            table: table.to_string(),
193        };
194        let Some(table_id) = db.catalog.holt_store.table_descriptor(&table_ref)? else {
195            continue;
196        };
197        let columns = column_map
198            .remove(&(catalog.clone(), schema.clone(), table.clone()))
199            .unwrap_or_default();
200        db.catalog.load_table(
201            table_ref,
202            table.clone(),
203            Arc::new(Schema::new(columns)),
204            table_id,
205        )?;
206    }
207    Ok(())
208}
209
210fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
211    for tuple in information_schema_rows(db, INFORMATION_SCHEMA_INDEXES)? {
212        let ScalarValue::Varchar(Some(catalog_name)) = tuple.value(0)? else {
213            return Err(QuillSQLError::Internal(
214                "invalid catalog in information_schema.indexes".to_string(),
215            ));
216        };
217        let ScalarValue::Varchar(Some(table_schema_name)) = tuple.value(1)? else {
218            return Err(QuillSQLError::Internal(
219                "invalid schema in information_schema.indexes".to_string(),
220            ));
221        };
222        let ScalarValue::Varchar(Some(table_name)) = tuple.value(2)? else {
223            return Err(QuillSQLError::Internal(
224                "invalid table in information_schema.indexes".to_string(),
225            ));
226        };
227        let ScalarValue::Varchar(Some(index_name)) = tuple.value(3)? else {
228            return Err(QuillSQLError::Internal(
229                "invalid index name in information_schema.indexes".to_string(),
230            ));
231        };
232        let ScalarValue::Varchar(Some(key_schema_str)) = tuple.value(4)? else {
233            return Err(QuillSQLError::Internal(
234                "invalid key schema in information_schema.indexes".to_string(),
235            ));
236        };
237
238        let table_ref = TableReference::Full {
239            catalog: catalog_name.clone(),
240            schema: table_schema_name.clone(),
241            table: table_name.clone(),
242        };
243        let Some(index_id) = db
244            .catalog
245            .holt_store
246            .index_descriptor(&table_ref, index_name)?
247        else {
248            continue;
249        };
250        let table_schema = db.catalog.table_schema(&table_ref)?;
251        let key_schema = Arc::new(parse_key_schema_from_varchar(
252            key_schema_str.as_str(),
253            table_schema,
254        )?);
255        db.catalog
256            .load_index(table_ref, index_name, key_schema, index_id)?;
257    }
258    Ok(())
259}
260
261fn load_descriptor_tables(db: &mut Database) -> QuillSQLResult<()> {
262    for descriptor in db.catalog.holt_store.table_descriptors()? {
263        if descriptor.table_ref.schema() == Some(INFORMATION_SCHEMA_NAME) {
264            continue;
265        }
266        if db.catalog.try_table_schema(&descriptor.table_ref).is_some() {
267            continue;
268        }
269        let schema_name = descriptor
270            .table_ref
271            .schema()
272            .unwrap_or(DEFAULT_SCHEMA_NAME)
273            .to_string();
274        if !db.catalog.schemas.contains_key(&schema_name) {
275            db.catalog
276                .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
277        }
278        db.catalog.load_table(
279            descriptor.table_ref,
280            descriptor.table_name,
281            descriptor.schema,
282            descriptor.table_id,
283        )?;
284    }
285    Ok(())
286}
287
288fn load_descriptor_indexes(db: &mut Database) -> QuillSQLResult<()> {
289    for descriptor in db.catalog.holt_store.index_descriptors()? {
290        if descriptor.table_ref.schema() == Some(INFORMATION_SCHEMA_NAME) {
291            continue;
292        }
293        if db.catalog.try_table_schema(&descriptor.table_ref).is_none() {
294            continue;
295        }
296        if db
297            .catalog
298            .table_indexes(&descriptor.table_ref)
299            .map(|indexes| {
300                indexes
301                    .iter()
302                    .any(|index| index.name == descriptor.index_name)
303            })
304            .unwrap_or(false)
305        {
306            continue;
307        }
308        db.catalog.load_index(
309            descriptor.table_ref,
310            descriptor.index_name,
311            descriptor.key_schema,
312            descriptor.index_id,
313        )?;
314    }
315    Ok(())
316}
317
318fn information_schema_rows(db: &Database, table_name: &str) -> QuillSQLResult<Vec<Tuple>> {
319    let information_schema = db
320        .catalog
321        .schemas
322        .get(INFORMATION_SCHEMA_NAME)
323        .ok_or_else(|| QuillSQLError::Internal("information_schema not initialized".to_string()))?;
324    let table = information_schema.tables.get(table_name).ok_or_else(|| {
325        QuillSQLError::Internal(format!("information_schema.{table_name} missing"))
326    })?;
327    let table_ref = TableReference::Full {
328        catalog: DEFAULT_CATALOG_NAME.to_string(),
329        schema: INFORMATION_SCHEMA_NAME.to_string(),
330        table: table_name.to_string(),
331    };
332    let handle = HoltTableHandle::new(
333        table_ref,
334        table.schema.clone(),
335        table.table_id,
336        db.catalog.holt_store.clone(),
337    );
338    let mut stream = handle.full_scan()?;
339    let mut rows = Vec::new();
340    while let Some((_rid, meta, tuple)) = stream.next()? {
341        if !meta.is_deleted {
342            rows.push(tuple);
343        }
344    }
345    Ok(rows)
346}
347
348pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
349    key_schema
350        .columns
351        .iter()
352        .map(|col| col.name.as_str())
353        .collect::<Vec<_>>()
354        .join(", ")
355}
356
357fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
358    let column_names = varchar
359        .split(',')
360        .map(|name| name.trim())
361        .collect::<Vec<&str>>();
362    let indices = column_names
363        .into_iter()
364        .map(|name| table_schema.index_of(None, name))
365        .collect::<QuillSQLResult<Vec<usize>>>()?;
366    table_schema.project(&indices)
367}