1use crate::buffer::{AtomicPageId, PageId, INVALID_PAGE_ID};
2use crate::catalog::catalog::{CatalogSchema, CatalogTable};
3use crate::catalog::registry::global_table_registry;
4use crate::catalog::{
5 Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
6};
7use crate::database::Database;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::utils::scalar::ScalarValue;
10use crate::utils::table_ref::TableReference;
11
12use crate::storage::index::btree_index::BPlusTreeIndex;
13use crate::storage::table_heap::TableHeap;
14use std::sync::{Arc, LazyLock};
15
16pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
17pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
18pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
19pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
20pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";
21
22pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
23 Arc::new(Schema::new(vec![
24 Column::new("catalog", DataType::Varchar(None), false),
25 Column::new("schema", DataType::Varchar(None), false),
26 ]))
27});
28
29pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30 Arc::new(Schema::new(vec![
31 Column::new("table_catalog", DataType::Varchar(None), false),
32 Column::new("table_schema", DataType::Varchar(None), false),
33 Column::new("table_name", DataType::Varchar(None), false),
34 Column::new("first_page_id", DataType::UInt32, false),
35 ]))
36});
37
38pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
39 Arc::new(Schema::new(vec![
40 Column::new("table_catalog", DataType::Varchar(None), false),
41 Column::new("table_schema", DataType::Varchar(None), false),
42 Column::new("table_name", DataType::Varchar(None), false),
43 Column::new("column_name", DataType::Varchar(None), false),
44 Column::new("data_type", DataType::Varchar(None), false),
45 Column::new("nullable", DataType::Boolean, false),
46 Column::new("default", DataType::Varchar(None), false),
47 ]))
48});
49
50pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
51 Arc::new(Schema::new(vec![
52 Column::new("table_catalog", DataType::Varchar(None), false),
53 Column::new("table_schema", DataType::Varchar(None), false),
54 Column::new("table_name", DataType::Varchar(None), false),
55 Column::new("index_name", DataType::Varchar(None), false),
56 Column::new("key_schema", DataType::Varchar(None), false),
57 Column::new("internal_max_size", DataType::UInt32, false),
58 Column::new("leaf_max_size", DataType::UInt32, false),
59 Column::new("header_page_id", DataType::UInt32, false),
60 ]))
61});
62
63pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
64 load_information_schema(&mut db.catalog)?;
65 load_schemas(db)?;
66 create_default_schema_if_not_exists(&mut db.catalog)?;
67 load_user_tables(db)?;
68 load_user_indexes(db)?;
69 Ok(())
70}
71
72fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
73 if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
74 catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
75 }
76 Ok(())
77}
78
79fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
80 let meta = catalog.disk_manager.meta.read().unwrap();
81 let information_schema_schemas_first_page_id = meta.information_schema_schemas_first_page_id;
82 let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
83 let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
84 let information_schema_indexes_first_page_id = meta.information_schema_indexes_first_page_id;
85 drop(meta);
86
87 let information_schema_schemas_last_page_id = load_table_last_page_id(
89 catalog,
90 information_schema_schemas_first_page_id,
91 SCHEMAS_SCHEMA.clone(),
92 )?;
93 let information_schema_tables_last_page_id = load_table_last_page_id(
94 catalog,
95 information_schema_tables_first_page_id,
96 TABLES_SCHEMA.clone(),
97 )?;
98 let information_schema_columns_last_page_id = load_table_last_page_id(
99 catalog,
100 information_schema_columns_first_page_id,
101 COLUMNS_SCHEMA.clone(),
102 )?;
103 let information_schema_indexes_last_page_id = load_table_last_page_id(
104 catalog,
105 information_schema_indexes_first_page_id,
106 INDEXES_SCHEMA.clone(),
107 )?;
108
109 let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);
110
111 let schemas_table = TableHeap {
112 schema: SCHEMAS_SCHEMA.clone(),
113 buffer_pool: catalog.buffer_pool.clone(),
114 first_page_id: AtomicPageId::new(information_schema_schemas_first_page_id),
115 last_page_id: AtomicPageId::new(information_schema_schemas_last_page_id),
116 };
117 let schemas_heap = Arc::new(schemas_table);
118 information_schema.tables.insert(
119 INFORMATION_SCHEMA_SCHEMAS.to_string(),
120 CatalogTable::new(INFORMATION_SCHEMA_SCHEMAS, schemas_heap.clone()),
121 );
122 global_table_registry().register(
123 TableReference::Full {
124 catalog: DEFAULT_CATALOG_NAME.to_string(),
125 schema: INFORMATION_SCHEMA_NAME.to_string(),
126 table: INFORMATION_SCHEMA_SCHEMAS.to_string(),
127 },
128 schemas_heap,
129 );
130
131 let tables_table = TableHeap {
132 schema: TABLES_SCHEMA.clone(),
133 buffer_pool: catalog.buffer_pool.clone(),
134 first_page_id: AtomicPageId::new(information_schema_tables_first_page_id),
135 last_page_id: AtomicPageId::new(information_schema_tables_last_page_id),
136 };
137 let tables_heap = Arc::new(tables_table);
138 information_schema.tables.insert(
139 INFORMATION_SCHEMA_TABLES.to_string(),
140 CatalogTable::new(INFORMATION_SCHEMA_TABLES, tables_heap.clone()),
141 );
142 global_table_registry().register(
143 TableReference::Full {
144 catalog: DEFAULT_CATALOG_NAME.to_string(),
145 schema: INFORMATION_SCHEMA_NAME.to_string(),
146 table: INFORMATION_SCHEMA_TABLES.to_string(),
147 },
148 tables_heap,
149 );
150
151 let columns_table = TableHeap {
152 schema: COLUMNS_SCHEMA.clone(),
153 buffer_pool: catalog.buffer_pool.clone(),
154 first_page_id: AtomicPageId::new(information_schema_columns_first_page_id),
155 last_page_id: AtomicPageId::new(information_schema_columns_last_page_id),
156 };
157 let columns_heap = Arc::new(columns_table);
158 information_schema.tables.insert(
159 INFORMATION_SCHEMA_COLUMNS.to_string(),
160 CatalogTable::new(INFORMATION_SCHEMA_COLUMNS, columns_heap.clone()),
161 );
162 global_table_registry().register(
163 TableReference::Full {
164 catalog: DEFAULT_CATALOG_NAME.to_string(),
165 schema: INFORMATION_SCHEMA_NAME.to_string(),
166 table: INFORMATION_SCHEMA_COLUMNS.to_string(),
167 },
168 columns_heap,
169 );
170
171 let indexes_table = TableHeap {
172 schema: INDEXES_SCHEMA.clone(),
173 buffer_pool: catalog.buffer_pool.clone(),
174 first_page_id: AtomicPageId::new(information_schema_indexes_first_page_id),
175 last_page_id: AtomicPageId::new(information_schema_indexes_last_page_id),
176 };
177 let indexes_heap = Arc::new(indexes_table);
178 information_schema.tables.insert(
179 INFORMATION_SCHEMA_INDEXES.to_string(),
180 CatalogTable::new(INFORMATION_SCHEMA_INDEXES, indexes_heap.clone()),
181 );
182 global_table_registry().register(
183 TableReference::Full {
184 catalog: DEFAULT_CATALOG_NAME.to_string(),
185 schema: INFORMATION_SCHEMA_NAME.to_string(),
186 table: INFORMATION_SCHEMA_INDEXES.to_string(),
187 },
188 indexes_heap,
189 );
190
191 catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
192 Ok(())
193}
194
195fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
196 let schema_tuples = db.run(&format!(
197 "select * from {}.{}",
198 INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMAS
199 ))?;
200 for schema_tuple in schema_tuples.into_iter() {
201 let error = Err(QuillSQLError::Internal(format!(
202 "Failed to decode schema tuple: {:?}",
203 schema_tuple,
204 )));
205 let ScalarValue::Varchar(Some(_catalog)) = schema_tuple.value(0)? else {
206 return error;
207 };
208 let ScalarValue::Varchar(Some(schema_name)) = schema_tuple.value(1)? else {
209 return error;
210 };
211 db.catalog
212 .load_schema(schema_name, CatalogSchema::new(schema_name));
213 }
214 Ok(())
215}
216
217fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
218 let table_tuples = db.run(&format!(
219 "select * from {}.{}",
220 INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_TABLES
221 ))?;
222 for table_tuple in table_tuples.into_iter() {
223 let error = Err(QuillSQLError::Internal(format!(
224 "Failed to decode table tuple: {:?}",
225 table_tuple
226 )));
227 let ScalarValue::Varchar(Some(catalog)) = table_tuple.value(0)? else {
228 return error;
229 };
230 let ScalarValue::Varchar(Some(table_schema)) = table_tuple.value(1)? else {
231 return error;
232 };
233 let ScalarValue::Varchar(Some(table_name)) = table_tuple.value(2)? else {
234 return error;
235 };
236 let ScalarValue::UInt32(Some(first_page_id)) = table_tuple.value(3)? else {
237 return error;
238 };
239
240 let column_tuples = db.run(&format!("select * from {}.{} where table_catalog = '{}' and table_schema = '{}' and table_name = '{}'",
241 INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_COLUMNS, catalog, table_schema, table_name))?;
242 let mut columns = vec![];
243 for column_tuple in column_tuples.into_iter() {
244 let error = Err(QuillSQLError::Internal(format!(
245 "Failed to decode column tuple: {:?}",
246 column_tuple
247 )));
248 let ScalarValue::Varchar(Some(column_name)) = column_tuple.value(3)? else {
249 return error;
250 };
251 let ScalarValue::Varchar(Some(data_type_str)) = column_tuple.value(4)? else {
252 return error;
253 };
254 let ScalarValue::Boolean(Some(nullable)) = column_tuple.value(5)? else {
255 return error;
256 };
257 let ScalarValue::Varchar(Some(default)) = column_tuple.value(6)? else {
258 return error;
259 };
260 let data_type: DataType = data_type_str.as_str().try_into()?;
261 let default = ScalarValue::from_string(default, data_type)?;
262 columns
263 .push(Column::new(column_name.clone(), data_type, *nullable).with_default(default));
264 }
265 let schema = Arc::new(Schema::new(columns));
266
267 let last_page_id =
269 load_table_last_page_id(&mut db.catalog, *first_page_id, schema.clone())?;
270 let table_heap = TableHeap {
271 schema: schema.clone(),
272 buffer_pool: db.buffer_pool.clone(),
273 first_page_id: AtomicPageId::new(*first_page_id),
274 last_page_id: AtomicPageId::new(last_page_id),
275 };
276 db.catalog.load_table(
277 TableReference::Full {
278 catalog: catalog.to_string(),
279 schema: table_schema.to_string(),
280 table: table_name.to_string(),
281 },
282 CatalogTable::new(table_name, Arc::new(table_heap)),
283 )?;
284 }
285 Ok(())
286}
287
288fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
289 let index_tuples = db.run(&format!(
290 "select * from {}.{}",
291 INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_INDEXES
292 ))?;
293 for index_tuple in index_tuples.into_iter() {
294 let error = Err(QuillSQLError::Internal(format!(
295 "Failed to decode index tuple: {:?}",
296 index_tuple
297 )));
298 let ScalarValue::Varchar(Some(catalog_name)) = index_tuple.value(0)? else {
299 return error;
300 };
301 let ScalarValue::Varchar(Some(table_schema_name)) = index_tuple.value(1)? else {
302 return error;
303 };
304 let ScalarValue::Varchar(Some(table_name)) = index_tuple.value(2)? else {
305 return error;
306 };
307 let ScalarValue::Varchar(Some(index_name)) = index_tuple.value(3)? else {
308 return error;
309 };
310 let ScalarValue::Varchar(Some(key_schema_str)) = index_tuple.value(4)? else {
311 return error;
312 };
313 let ScalarValue::UInt32(Some(internal_max_size)) = index_tuple.value(5)? else {
314 return error;
315 };
316 let ScalarValue::UInt32(Some(leaf_max_size)) = index_tuple.value(6)? else {
317 return error;
318 };
319 let ScalarValue::UInt32(Some(header_page_id)) = index_tuple.value(7)? else {
320 return error;
321 };
322
323 let table_ref = TableReference::Full {
324 catalog: catalog_name.clone(),
325 schema: table_schema_name.clone(),
326 table: table_name.clone(),
327 };
328 let table_schema = db.catalog.table_heap(&table_ref)?.schema.clone();
329 let key_schema = Arc::new(parse_key_schema_from_varchar(
330 key_schema_str.as_str(),
331 table_schema,
332 )?);
333
334 let b_plus_tree_index = BPlusTreeIndex::open(
335 key_schema,
336 db.buffer_pool.clone(),
337 *internal_max_size,
338 *leaf_max_size,
339 *header_page_id,
340 );
341 db.catalog
342 .load_index(table_ref, index_name, Arc::new(b_plus_tree_index))?;
343 }
344 Ok(())
345}
346
347fn load_table_last_page_id(
348 catalog: &mut Catalog,
349 first_page_id: PageId,
350 schema: SchemaRef,
351) -> QuillSQLResult<PageId> {
352 let mut page_id = first_page_id;
353 loop {
354 let (_, table_page) = catalog
355 .buffer_pool
356 .fetch_table_page(page_id, schema.clone())?;
357
358 if table_page.header.next_page_id == INVALID_PAGE_ID {
359 return Ok(page_id);
360 } else {
361 page_id = table_page.header.next_page_id;
362 }
363 }
364}
365
366pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
367 key_schema
368 .columns
369 .iter()
370 .map(|col| col.name.as_str())
371 .collect::<Vec<_>>()
372 .join(", ")
373}
374
375fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
376 let column_names = varchar
377 .split(",")
378 .map(|name| name.trim())
379 .collect::<Vec<&str>>();
380 let indices = column_names
381 .into_iter()
382 .map(|name| table_schema.index_of(None, name))
383 .collect::<QuillSQLResult<Vec<usize>>>()?;
384 table_schema.project(&indices)
385}