1use crate::buffer::{AtomicPageId, PageId, INVALID_PAGE_ID};
2use crate::catalog::catalog::{CatalogSchema, CatalogTable};
3use crate::catalog::{
4 Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
5};
6use crate::database::Database;
7use crate::error::{QuillSQLError, QuillSQLResult};
8use crate::utils::scalar::ScalarValue;
9use crate::utils::table_ref::TableReference;
10
11use crate::storage::index::btree_index::BPlusTreeIndex;
12use crate::storage::table_heap::{TableHeap, TableIterator};
13use std::collections::HashMap;
14use std::sync::{Arc, LazyLock};
15
16pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
17pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
18pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
19pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
20pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";
21
22pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
23 Arc::new(Schema::new(vec![
24 Column::new("catalog", DataType::Varchar(None), false),
25 Column::new("schema", DataType::Varchar(None), false),
26 ]))
27});
28
29pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30 Arc::new(Schema::new(vec![
31 Column::new("table_catalog", DataType::Varchar(None), false),
32 Column::new("table_schema", DataType::Varchar(None), false),
33 Column::new("table_name", DataType::Varchar(None), false),
34 Column::new("first_page_id", DataType::UInt32, false),
35 ]))
36});
37
38pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
39 Arc::new(Schema::new(vec![
40 Column::new("table_catalog", DataType::Varchar(None), false),
41 Column::new("table_schema", DataType::Varchar(None), false),
42 Column::new("table_name", DataType::Varchar(None), false),
43 Column::new("column_name", DataType::Varchar(None), false),
44 Column::new("data_type", DataType::Varchar(None), false),
45 Column::new("nullable", DataType::Boolean, false),
46 Column::new("default", DataType::Varchar(None), false),
47 ]))
48});
49
50pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
51 Arc::new(Schema::new(vec![
52 Column::new("table_catalog", DataType::Varchar(None), false),
53 Column::new("table_schema", DataType::Varchar(None), false),
54 Column::new("table_name", DataType::Varchar(None), false),
55 Column::new("index_name", DataType::Varchar(None), false),
56 Column::new("key_schema", DataType::Varchar(None), false),
57 Column::new("internal_max_size", DataType::UInt32, false),
58 Column::new("leaf_max_size", DataType::UInt32, false),
59 Column::new("header_page_id", DataType::UInt32, false),
60 ]))
61});
62
63pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
64 load_information_schema(&mut db.catalog)?;
65 load_schemas(db)?;
66 create_default_schema_if_not_exists(&mut db.catalog)?;
67 load_user_tables(db)?;
68 load_user_indexes(db)?;
69 Ok(())
70}
71
72fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
73 if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
74 catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
75 }
76 Ok(())
77}
78
79fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
80 let meta = catalog.disk_manager.meta.read().unwrap();
81 let information_schema_schemas_first_page_id = meta.information_schema_schemas_first_page_id;
82 let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
83 let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
84 let information_schema_indexes_first_page_id = meta.information_schema_indexes_first_page_id;
85 drop(meta);
86
87 let information_schema_schemas_last_page_id = load_table_last_page_id(
89 catalog,
90 information_schema_schemas_first_page_id,
91 SCHEMAS_SCHEMA.clone(),
92 )?;
93 let information_schema_tables_last_page_id = load_table_last_page_id(
94 catalog,
95 information_schema_tables_first_page_id,
96 TABLES_SCHEMA.clone(),
97 )?;
98 let information_schema_columns_last_page_id = load_table_last_page_id(
99 catalog,
100 information_schema_columns_first_page_id,
101 COLUMNS_SCHEMA.clone(),
102 )?;
103 let information_schema_indexes_last_page_id = load_table_last_page_id(
104 catalog,
105 information_schema_indexes_first_page_id,
106 INDEXES_SCHEMA.clone(),
107 )?;
108
109 let table_registry = catalog.table_registry();
110 let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);
111
112 let schemas_table = TableHeap {
113 schema: SCHEMAS_SCHEMA.clone(),
114 buffer_pool: catalog.buffer_pool.clone(),
115 first_page_id: AtomicPageId::new(information_schema_schemas_first_page_id),
116 last_page_id: AtomicPageId::new(information_schema_schemas_last_page_id),
117 };
118 let schemas_heap = Arc::new(schemas_table);
119 information_schema.tables.insert(
120 INFORMATION_SCHEMA_SCHEMAS.to_string(),
121 CatalogTable::new(INFORMATION_SCHEMA_SCHEMAS, schemas_heap.clone()),
122 );
123 table_registry.register(
124 TableReference::Full {
125 catalog: DEFAULT_CATALOG_NAME.to_string(),
126 schema: INFORMATION_SCHEMA_NAME.to_string(),
127 table: INFORMATION_SCHEMA_SCHEMAS.to_string(),
128 },
129 schemas_heap,
130 );
131
132 let tables_table = TableHeap {
133 schema: TABLES_SCHEMA.clone(),
134 buffer_pool: catalog.buffer_pool.clone(),
135 first_page_id: AtomicPageId::new(information_schema_tables_first_page_id),
136 last_page_id: AtomicPageId::new(information_schema_tables_last_page_id),
137 };
138 let tables_heap = Arc::new(tables_table);
139 information_schema.tables.insert(
140 INFORMATION_SCHEMA_TABLES.to_string(),
141 CatalogTable::new(INFORMATION_SCHEMA_TABLES, tables_heap.clone()),
142 );
143 table_registry.register(
144 TableReference::Full {
145 catalog: DEFAULT_CATALOG_NAME.to_string(),
146 schema: INFORMATION_SCHEMA_NAME.to_string(),
147 table: INFORMATION_SCHEMA_TABLES.to_string(),
148 },
149 tables_heap,
150 );
151
152 let columns_table = TableHeap {
153 schema: COLUMNS_SCHEMA.clone(),
154 buffer_pool: catalog.buffer_pool.clone(),
155 first_page_id: AtomicPageId::new(information_schema_columns_first_page_id),
156 last_page_id: AtomicPageId::new(information_schema_columns_last_page_id),
157 };
158 let columns_heap = Arc::new(columns_table);
159 information_schema.tables.insert(
160 INFORMATION_SCHEMA_COLUMNS.to_string(),
161 CatalogTable::new(INFORMATION_SCHEMA_COLUMNS, columns_heap.clone()),
162 );
163 table_registry.register(
164 TableReference::Full {
165 catalog: DEFAULT_CATALOG_NAME.to_string(),
166 schema: INFORMATION_SCHEMA_NAME.to_string(),
167 table: INFORMATION_SCHEMA_COLUMNS.to_string(),
168 },
169 columns_heap,
170 );
171
172 let indexes_table = TableHeap {
173 schema: INDEXES_SCHEMA.clone(),
174 buffer_pool: catalog.buffer_pool.clone(),
175 first_page_id: AtomicPageId::new(information_schema_indexes_first_page_id),
176 last_page_id: AtomicPageId::new(information_schema_indexes_last_page_id),
177 };
178 let indexes_heap = Arc::new(indexes_table);
179 information_schema.tables.insert(
180 INFORMATION_SCHEMA_INDEXES.to_string(),
181 CatalogTable::new(INFORMATION_SCHEMA_INDEXES, indexes_heap.clone()),
182 );
183 table_registry.register(
184 TableReference::Full {
185 catalog: DEFAULT_CATALOG_NAME.to_string(),
186 schema: INFORMATION_SCHEMA_NAME.to_string(),
187 table: INFORMATION_SCHEMA_INDEXES.to_string(),
188 },
189 indexes_heap,
190 );
191
192 catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
193 Ok(())
194}
195
196fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
197 let schemas_table = {
198 let information_schema =
199 db.catalog
200 .schemas
201 .get(INFORMATION_SCHEMA_NAME)
202 .ok_or_else(|| {
203 QuillSQLError::Internal("information_schema not initialized".to_string())
204 })?;
205 information_schema
206 .tables
207 .get(INFORMATION_SCHEMA_SCHEMAS)
208 .ok_or_else(|| {
209 QuillSQLError::Internal("information_schema.schemas missing".to_string())
210 })?
211 .table
212 .clone()
213 };
214
215 let mut iterator = TableIterator::new(schemas_table, ..);
216 while let Some((_rid, meta, tuple)) = iterator.next()? {
217 if meta.is_deleted {
218 continue;
219 }
220 let ScalarValue::Varchar(Some(_catalog)) = tuple.value(0)? else {
221 return Err(QuillSQLError::Internal(
222 "invalid catalog value in information_schema.schemas".to_string(),
223 ));
224 };
225 let ScalarValue::Varchar(Some(schema_name)) = tuple.value(1)? else {
226 return Err(QuillSQLError::Internal(
227 "invalid schema value in information_schema.schemas".to_string(),
228 ));
229 };
230 db.catalog
231 .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
232 }
233 Ok(())
234}
235
236fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
237 let (columns_heap, tables_heap) = {
238 let information_schema =
239 db.catalog
240 .schemas
241 .get(INFORMATION_SCHEMA_NAME)
242 .ok_or_else(|| {
243 QuillSQLError::Internal("information_schema not initialized".to_string())
244 })?;
245 let columns_heap = information_schema
246 .tables
247 .get(INFORMATION_SCHEMA_COLUMNS)
248 .ok_or_else(|| {
249 QuillSQLError::Internal("information_schema.columns missing".to_string())
250 })?
251 .table
252 .clone();
253 let tables_heap = information_schema
254 .tables
255 .get(INFORMATION_SCHEMA_TABLES)
256 .ok_or_else(|| {
257 QuillSQLError::Internal("information_schema.tables missing".to_string())
258 })?
259 .table
260 .clone();
261 (columns_heap, tables_heap)
262 };
263
264 let mut column_map: HashMap<(String, String, String), Vec<Column>> = HashMap::new();
265 let mut column_iter = TableIterator::new(columns_heap, ..);
266 while let Some((_rid, meta, tuple)) = column_iter.next()? {
267 if meta.is_deleted {
268 continue;
269 }
270 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
271 return Err(QuillSQLError::Internal(
272 "invalid catalog in information_schema.columns".to_string(),
273 ));
274 };
275 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
276 return Err(QuillSQLError::Internal(
277 "invalid schema in information_schema.columns".to_string(),
278 ));
279 };
280 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
281 return Err(QuillSQLError::Internal(
282 "invalid table in information_schema.columns".to_string(),
283 ));
284 };
285 let ScalarValue::Varchar(Some(column_name)) = tuple.value(3)? else {
286 return Err(QuillSQLError::Internal(
287 "invalid column name in information_schema.columns".to_string(),
288 ));
289 };
290 let ScalarValue::Varchar(Some(data_type_str)) = tuple.value(4)? else {
291 return Err(QuillSQLError::Internal(
292 "invalid data type in information_schema.columns".to_string(),
293 ));
294 };
295 let ScalarValue::Boolean(Some(nullable)) = tuple.value(5)? else {
296 return Err(QuillSQLError::Internal(
297 "invalid nullable flag in information_schema.columns".to_string(),
298 ));
299 };
300 let ScalarValue::Varchar(Some(default)) = tuple.value(6)? else {
301 return Err(QuillSQLError::Internal(
302 "invalid default in information_schema.columns".to_string(),
303 ));
304 };
305
306 let data_type: DataType = data_type_str.as_str().try_into()?;
307 let default_value = ScalarValue::from_string(default, data_type)?;
308 column_map
309 .entry((catalog.clone(), schema.clone(), table.clone()))
310 .or_default()
311 .push(
312 Column::new(column_name.clone(), data_type, *nullable).with_default(default_value),
313 );
314 }
315
316 let mut table_iter = TableIterator::new(tables_heap, ..);
317 while let Some((_rid, meta, tuple)) = table_iter.next()? {
318 if meta.is_deleted {
319 continue;
320 }
321 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
322 return Err(QuillSQLError::Internal(
323 "invalid catalog in information_schema.tables".to_string(),
324 ));
325 };
326 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
327 return Err(QuillSQLError::Internal(
328 "invalid schema in information_schema.tables".to_string(),
329 ));
330 };
331 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
332 return Err(QuillSQLError::Internal(
333 "invalid table name in information_schema.tables".to_string(),
334 ));
335 };
336 let ScalarValue::UInt32(Some(first_page_id)) = tuple.value(3)? else {
337 return Err(QuillSQLError::Internal(
338 "invalid first_page_id in information_schema.tables".to_string(),
339 ));
340 };
341
342 let columns = column_map
343 .remove(&(catalog.clone(), schema.clone(), table.clone()))
344 .unwrap_or_default();
345 let schema_ref = Arc::new(Schema::new(columns));
346
347 let last_page_id =
348 load_table_last_page_id(&mut db.catalog, *first_page_id, schema_ref.clone())?;
349 let table_heap = TableHeap {
350 schema: schema_ref.clone(),
351 buffer_pool: db.buffer_pool.clone(),
352 first_page_id: AtomicPageId::new(*first_page_id),
353 last_page_id: AtomicPageId::new(last_page_id),
354 };
355
356 db.catalog.load_table(
357 TableReference::Full {
358 catalog: catalog.to_string(),
359 schema: schema.to_string(),
360 table: table.to_string(),
361 },
362 CatalogTable::new(table, Arc::new(table_heap)),
363 )?;
364 }
365 Ok(())
366}
367
368fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
369 let indexes_heap = {
370 let information_schema =
371 db.catalog
372 .schemas
373 .get(INFORMATION_SCHEMA_NAME)
374 .ok_or_else(|| {
375 QuillSQLError::Internal("information_schema not initialized".to_string())
376 })?;
377
378 information_schema
379 .tables
380 .get(INFORMATION_SCHEMA_INDEXES)
381 .ok_or_else(|| {
382 QuillSQLError::Internal("information_schema.indexes missing".to_string())
383 })?
384 .table
385 .clone()
386 };
387
388 let mut iterator = TableIterator::new(indexes_heap, ..);
389 while let Some((_rid, meta, tuple)) = iterator.next()? {
390 if meta.is_deleted {
391 continue;
392 }
393 let ScalarValue::Varchar(Some(catalog_name)) = tuple.value(0)? else {
394 return Err(QuillSQLError::Internal(
395 "invalid catalog in information_schema.indexes".to_string(),
396 ));
397 };
398 let ScalarValue::Varchar(Some(table_schema_name)) = tuple.value(1)? else {
399 return Err(QuillSQLError::Internal(
400 "invalid schema in information_schema.indexes".to_string(),
401 ));
402 };
403 let ScalarValue::Varchar(Some(table_name)) = tuple.value(2)? else {
404 return Err(QuillSQLError::Internal(
405 "invalid table in information_schema.indexes".to_string(),
406 ));
407 };
408 let ScalarValue::Varchar(Some(index_name)) = tuple.value(3)? else {
409 return Err(QuillSQLError::Internal(
410 "invalid index name in information_schema.indexes".to_string(),
411 ));
412 };
413 let ScalarValue::Varchar(Some(key_schema_str)) = tuple.value(4)? else {
414 return Err(QuillSQLError::Internal(
415 "invalid key schema in information_schema.indexes".to_string(),
416 ));
417 };
418 let ScalarValue::UInt32(Some(internal_max_size)) = tuple.value(5)? else {
419 return Err(QuillSQLError::Internal(
420 "invalid internal max size in information_schema.indexes".to_string(),
421 ));
422 };
423 let ScalarValue::UInt32(Some(leaf_max_size)) = tuple.value(6)? else {
424 return Err(QuillSQLError::Internal(
425 "invalid leaf max size in information_schema.indexes".to_string(),
426 ));
427 };
428 let ScalarValue::UInt32(Some(header_page_id)) = tuple.value(7)? else {
429 return Err(QuillSQLError::Internal(
430 "invalid header page id in information_schema.indexes".to_string(),
431 ));
432 };
433
434 let table_ref = TableReference::Full {
435 catalog: catalog_name.clone(),
436 schema: table_schema_name.clone(),
437 table: table_name.clone(),
438 };
439 let table_schema = db.catalog.table_heap(&table_ref)?.schema.clone();
440 let key_schema = Arc::new(parse_key_schema_from_varchar(
441 key_schema_str.as_str(),
442 table_schema,
443 )?);
444
445 let b_plus_tree_index = BPlusTreeIndex::open(
446 key_schema,
447 db.buffer_pool.clone(),
448 *internal_max_size,
449 *leaf_max_size,
450 *header_page_id,
451 );
452 db.catalog
453 .load_index(table_ref, index_name, Arc::new(b_plus_tree_index))?;
454 }
455 Ok(())
456}
457
458fn load_table_last_page_id(
459 catalog: &mut Catalog,
460 first_page_id: PageId,
461 schema: SchemaRef,
462) -> QuillSQLResult<PageId> {
463 let mut page_id = first_page_id;
464 loop {
465 let (_, table_page) = catalog
466 .buffer_pool
467 .fetch_table_page(page_id, schema.clone())?;
468
469 if table_page.header.next_page_id == INVALID_PAGE_ID {
470 return Ok(page_id);
471 } else {
472 page_id = table_page.header.next_page_id;
473 }
474 }
475}
476
477pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
478 key_schema
479 .columns
480 .iter()
481 .map(|col| col.name.as_str())
482 .collect::<Vec<_>>()
483 .join(", ")
484}
485
486fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
487 let column_names = varchar
488 .split(",")
489 .map(|name| name.trim())
490 .collect::<Vec<&str>>();
491 let indices = column_names
492 .into_iter()
493 .map(|name| table_schema.index_of(None, name))
494 .collect::<QuillSQLResult<Vec<usize>>>()?;
495 table_schema.project(&indices)
496}