1use std::collections::HashMap;
2use std::sync::{Arc, LazyLock};
3
4use crate::catalog::{
5 Catalog, CatalogSchema, CatalogTable, Column, DataType, Schema, SchemaRef,
6 DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
7};
8use crate::database::Database;
9use crate::error::{QuillSQLError, QuillSQLResult};
10use crate::storage::engine::TableHandle;
11use crate::storage::holt::HoltTableHandle;
12use crate::storage::tuple::Tuple;
13use crate::utils::scalar::ScalarValue;
14use crate::utils::table_ref::TableReference;
15
16pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
17pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
18pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
19pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
20pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";
21
22pub static SCHEMAS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
23 Arc::new(Schema::new(vec![
24 Column::new("catalog", DataType::Varchar(None), false),
25 Column::new("schema", DataType::Varchar(None), false),
26 ]))
27});
28
29pub static TABLES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30 Arc::new(Schema::new(vec![
31 Column::new("table_catalog", DataType::Varchar(None), false),
32 Column::new("table_schema", DataType::Varchar(None), false),
33 Column::new("table_name", DataType::Varchar(None), false),
34 Column::new("first_page_id", DataType::UInt32, false),
35 ]))
36});
37
38pub static COLUMNS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
39 Arc::new(Schema::new(vec![
40 Column::new("table_catalog", DataType::Varchar(None), false),
41 Column::new("table_schema", DataType::Varchar(None), false),
42 Column::new("table_name", DataType::Varchar(None), false),
43 Column::new("column_name", DataType::Varchar(None), false),
44 Column::new("data_type", DataType::Varchar(None), false),
45 Column::new("nullable", DataType::Boolean, false),
46 Column::new("default", DataType::Varchar(None), false),
47 ]))
48});
49
50pub static INDEXES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
51 Arc::new(Schema::new(vec![
52 Column::new("table_catalog", DataType::Varchar(None), false),
53 Column::new("table_schema", DataType::Varchar(None), false),
54 Column::new("table_name", DataType::Varchar(None), false),
55 Column::new("index_name", DataType::Varchar(None), false),
56 Column::new("key_schema", DataType::Varchar(None), false),
57 Column::new("internal_max_size", DataType::UInt32, false),
58 Column::new("leaf_max_size", DataType::UInt32, false),
59 Column::new("header_page_id", DataType::UInt32, false),
60 ]))
61});
62
63pub fn load_catalog_data(db: &mut Database) -> QuillSQLResult<()> {
64 load_information_schema(&mut db.catalog)?;
65 load_schemas(db)?;
66 create_default_schema_if_not_exists(&mut db.catalog)?;
67 load_user_tables(db)?;
68 load_descriptor_tables(db)?;
69 load_user_indexes(db)?;
70 load_descriptor_indexes(db)?;
71 db.catalog.refresh_information_schema_projection()?;
72 Ok(())
73}
74
75fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> QuillSQLResult<()> {
76 if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
77 catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
78 }
79 Ok(())
80}
81
82fn load_information_schema(catalog: &mut Catalog) -> QuillSQLResult<()> {
83 let holt = catalog.holt_store.clone();
84 let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);
85
86 for (table_name, schema) in [
87 (INFORMATION_SCHEMA_SCHEMAS, SCHEMAS_SCHEMA.clone()),
88 (INFORMATION_SCHEMA_TABLES, TABLES_SCHEMA.clone()),
89 (INFORMATION_SCHEMA_COLUMNS, COLUMNS_SCHEMA.clone()),
90 (INFORMATION_SCHEMA_INDEXES, INDEXES_SCHEMA.clone()),
91 ] {
92 let table_ref = TableReference::Full {
93 catalog: DEFAULT_CATALOG_NAME.to_string(),
94 schema: INFORMATION_SCHEMA_NAME.to_string(),
95 table: table_name.to_string(),
96 };
97 let table_id = match holt.table_descriptor(&table_ref)? {
98 Some(table_id) => table_id,
99 None => holt.create_table_descriptor(&table_ref, schema.as_ref())?,
100 };
101 information_schema.tables.insert(
102 table_name.to_string(),
103 CatalogTable::new(table_name, schema, table_id),
104 );
105 }
106
107 catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
108 Ok(())
109}
110
111fn load_schemas(db: &mut Database) -> QuillSQLResult<()> {
112 for tuple in information_schema_rows(db, INFORMATION_SCHEMA_SCHEMAS)? {
113 let ScalarValue::Varchar(Some(schema_name)) = tuple.value(1)? else {
114 return Err(QuillSQLError::Internal(
115 "invalid schema value in information_schema.schemas".to_string(),
116 ));
117 };
118 db.catalog
119 .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
120 }
121 Ok(())
122}
123
124fn load_user_tables(db: &mut Database) -> QuillSQLResult<()> {
125 let mut column_map: HashMap<(String, String, String), Vec<Column>> = HashMap::new();
126 for tuple in information_schema_rows(db, INFORMATION_SCHEMA_COLUMNS)? {
127 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
128 return Err(QuillSQLError::Internal(
129 "invalid catalog in information_schema.columns".to_string(),
130 ));
131 };
132 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
133 return Err(QuillSQLError::Internal(
134 "invalid schema in information_schema.columns".to_string(),
135 ));
136 };
137 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
138 return Err(QuillSQLError::Internal(
139 "invalid table in information_schema.columns".to_string(),
140 ));
141 };
142 let ScalarValue::Varchar(Some(column_name)) = tuple.value(3)? else {
143 return Err(QuillSQLError::Internal(
144 "invalid column name in information_schema.columns".to_string(),
145 ));
146 };
147 let ScalarValue::Varchar(Some(data_type_str)) = tuple.value(4)? else {
148 return Err(QuillSQLError::Internal(
149 "invalid data type in information_schema.columns".to_string(),
150 ));
151 };
152 let ScalarValue::Boolean(Some(nullable)) = tuple.value(5)? else {
153 return Err(QuillSQLError::Internal(
154 "invalid nullable flag in information_schema.columns".to_string(),
155 ));
156 };
157 let ScalarValue::Varchar(Some(default)) = tuple.value(6)? else {
158 return Err(QuillSQLError::Internal(
159 "invalid default in information_schema.columns".to_string(),
160 ));
161 };
162
163 let data_type: DataType = data_type_str.as_str().try_into()?;
164 let default_value = ScalarValue::from_string(default, data_type)?;
165 column_map
166 .entry((catalog.clone(), schema.clone(), table.clone()))
167 .or_default()
168 .push(
169 Column::new(column_name.clone(), data_type, *nullable).with_default(default_value),
170 );
171 }
172
173 for tuple in information_schema_rows(db, INFORMATION_SCHEMA_TABLES)? {
174 let ScalarValue::Varchar(Some(catalog)) = tuple.value(0)? else {
175 return Err(QuillSQLError::Internal(
176 "invalid catalog in information_schema.tables".to_string(),
177 ));
178 };
179 let ScalarValue::Varchar(Some(schema)) = tuple.value(1)? else {
180 return Err(QuillSQLError::Internal(
181 "invalid schema in information_schema.tables".to_string(),
182 ));
183 };
184 let ScalarValue::Varchar(Some(table)) = tuple.value(2)? else {
185 return Err(QuillSQLError::Internal(
186 "invalid table name in information_schema.tables".to_string(),
187 ));
188 };
189 let table_ref = TableReference::Full {
190 catalog: catalog.to_string(),
191 schema: schema.to_string(),
192 table: table.to_string(),
193 };
194 let Some(table_id) = db.catalog.holt_store.table_descriptor(&table_ref)? else {
195 continue;
196 };
197 let columns = column_map
198 .remove(&(catalog.clone(), schema.clone(), table.clone()))
199 .unwrap_or_default();
200 db.catalog.load_table(
201 table_ref,
202 table.clone(),
203 Arc::new(Schema::new(columns)),
204 table_id,
205 )?;
206 }
207 Ok(())
208}
209
210fn load_user_indexes(db: &mut Database) -> QuillSQLResult<()> {
211 for tuple in information_schema_rows(db, INFORMATION_SCHEMA_INDEXES)? {
212 let ScalarValue::Varchar(Some(catalog_name)) = tuple.value(0)? else {
213 return Err(QuillSQLError::Internal(
214 "invalid catalog in information_schema.indexes".to_string(),
215 ));
216 };
217 let ScalarValue::Varchar(Some(table_schema_name)) = tuple.value(1)? else {
218 return Err(QuillSQLError::Internal(
219 "invalid schema in information_schema.indexes".to_string(),
220 ));
221 };
222 let ScalarValue::Varchar(Some(table_name)) = tuple.value(2)? else {
223 return Err(QuillSQLError::Internal(
224 "invalid table in information_schema.indexes".to_string(),
225 ));
226 };
227 let ScalarValue::Varchar(Some(index_name)) = tuple.value(3)? else {
228 return Err(QuillSQLError::Internal(
229 "invalid index name in information_schema.indexes".to_string(),
230 ));
231 };
232 let ScalarValue::Varchar(Some(key_schema_str)) = tuple.value(4)? else {
233 return Err(QuillSQLError::Internal(
234 "invalid key schema in information_schema.indexes".to_string(),
235 ));
236 };
237
238 let table_ref = TableReference::Full {
239 catalog: catalog_name.clone(),
240 schema: table_schema_name.clone(),
241 table: table_name.clone(),
242 };
243 let Some(index_id) = db
244 .catalog
245 .holt_store
246 .index_descriptor(&table_ref, index_name)?
247 else {
248 continue;
249 };
250 let table_schema = db.catalog.table_schema(&table_ref)?;
251 let key_schema = Arc::new(parse_key_schema_from_varchar(
252 key_schema_str.as_str(),
253 table_schema,
254 )?);
255 db.catalog
256 .load_index(table_ref, index_name, key_schema, index_id)?;
257 }
258 Ok(())
259}
260
261fn load_descriptor_tables(db: &mut Database) -> QuillSQLResult<()> {
262 for descriptor in db.catalog.holt_store.table_descriptors()? {
263 if descriptor.table_ref.schema() == Some(INFORMATION_SCHEMA_NAME) {
264 continue;
265 }
266 if db.catalog.try_table_schema(&descriptor.table_ref).is_some() {
267 continue;
268 }
269 let schema_name = descriptor
270 .table_ref
271 .schema()
272 .unwrap_or(DEFAULT_SCHEMA_NAME)
273 .to_string();
274 if !db.catalog.schemas.contains_key(&schema_name) {
275 db.catalog
276 .load_schema(schema_name.clone(), CatalogSchema::new(schema_name));
277 }
278 db.catalog.load_table(
279 descriptor.table_ref,
280 descriptor.table_name,
281 descriptor.schema,
282 descriptor.table_id,
283 )?;
284 }
285 Ok(())
286}
287
288fn load_descriptor_indexes(db: &mut Database) -> QuillSQLResult<()> {
289 for descriptor in db.catalog.holt_store.index_descriptors()? {
290 if descriptor.table_ref.schema() == Some(INFORMATION_SCHEMA_NAME) {
291 continue;
292 }
293 if db.catalog.try_table_schema(&descriptor.table_ref).is_none() {
294 continue;
295 }
296 if db
297 .catalog
298 .table_indexes(&descriptor.table_ref)
299 .map(|indexes| {
300 indexes
301 .iter()
302 .any(|index| index.name == descriptor.index_name)
303 })
304 .unwrap_or(false)
305 {
306 continue;
307 }
308 db.catalog.load_index(
309 descriptor.table_ref,
310 descriptor.index_name,
311 descriptor.key_schema,
312 descriptor.index_id,
313 )?;
314 }
315 Ok(())
316}
317
318fn information_schema_rows(db: &Database, table_name: &str) -> QuillSQLResult<Vec<Tuple>> {
319 let information_schema = db
320 .catalog
321 .schemas
322 .get(INFORMATION_SCHEMA_NAME)
323 .ok_or_else(|| QuillSQLError::Internal("information_schema not initialized".to_string()))?;
324 let table = information_schema.tables.get(table_name).ok_or_else(|| {
325 QuillSQLError::Internal(format!("information_schema.{table_name} missing"))
326 })?;
327 let table_ref = TableReference::Full {
328 catalog: DEFAULT_CATALOG_NAME.to_string(),
329 schema: INFORMATION_SCHEMA_NAME.to_string(),
330 table: table_name.to_string(),
331 };
332 let handle = HoltTableHandle::new(
333 table_ref,
334 table.schema.clone(),
335 table.table_id,
336 db.catalog.holt_store.clone(),
337 );
338 let mut stream = handle.full_scan()?;
339 let mut rows = Vec::new();
340 while let Some((_rid, meta, tuple)) = stream.next()? {
341 if !meta.is_deleted {
342 rows.push(tuple);
343 }
344 }
345 Ok(rows)
346}
347
348pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
349 key_schema
350 .columns
351 .iter()
352 .map(|col| col.name.as_str())
353 .collect::<Vec<_>>()
354 .join(", ")
355}
356
357fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> QuillSQLResult<Schema> {
358 let column_names = varchar
359 .split(',')
360 .map(|name| name.trim())
361 .collect::<Vec<&str>>();
362 let indices = column_names
363 .into_iter()
364 .map(|name| table_schema.index_of(None, name))
365 .collect::<QuillSQLResult<Vec<usize>>>()?;
366 table_schema.project(&indices)
367}