1use vibesql_ast::{
7 DescribeStmt, ShowColumnsStmt, ShowCreateTableStmt, ShowDatabasesStmt, ShowIndexStmt,
8 ShowTablesStmt,
9};
10use vibesql_catalog::{IndexType, ReferentialAction, SortOrder};
11use vibesql_storage::{Database, Row};
12use vibesql_types::SqlValue;
13
14use crate::errors::ExecutorError;
15use crate::select::SelectResult;
16
17pub struct IntrospectionExecutor<'a> {
19 db: &'a Database,
20}
21
22impl<'a> IntrospectionExecutor<'a> {
23 pub fn new(db: &'a Database) -> Self {
25 Self { db }
26 }
27
28 pub fn execute_show_tables(
33 &self,
34 stmt: &ShowTablesStmt,
35 ) -> Result<SelectResult, ExecutorError> {
36 let schema_name =
38 stmt.database.as_deref().unwrap_or_else(|| self.db.catalog.get_current_schema());
39
40 let tables = if let Some(schema) = self.db.catalog.get_schema(schema_name) {
42 schema.list_tables()
43 } else {
44 let upper_name = schema_name.to_uppercase();
46 self.db
47 .catalog
48 .list_schemas()
49 .into_iter()
50 .find(|s| s.to_uppercase() == upper_name)
51 .and_then(|actual_name| self.db.catalog.get_schema(&actual_name))
52 .map(|schema| schema.list_tables())
53 .unwrap_or_default()
54 };
55
56 let filtered_tables: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
58 tables.into_iter().filter(|t| like_match(pattern, t)).collect()
59 } else {
60 tables
61 };
62
63 let column_name = format!("Tables_in_{}", schema_name);
65 let rows: Vec<Row> = filtered_tables
66 .into_iter()
67 .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
68 .collect();
69
70 Ok(SelectResult { columns: vec![column_name], rows })
71 }
72
73 pub fn execute_show_databases(
78 &self,
79 stmt: &ShowDatabasesStmt,
80 ) -> Result<SelectResult, ExecutorError> {
81 let schemas = self.db.catalog.list_schemas();
82
83 let filtered_schemas: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
85 schemas.into_iter().filter(|s| like_match(pattern, s)).collect()
86 } else {
87 schemas
88 };
89
90 let rows: Vec<Row> = filtered_schemas
91 .into_iter()
92 .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
93 .collect();
94
95 Ok(SelectResult { columns: vec!["Database".to_string()], rows })
96 }
97
98 pub fn execute_show_columns(
102 &self,
103 stmt: &ShowColumnsStmt,
104 ) -> Result<SelectResult, ExecutorError> {
105 let table_name = if let Some(ref db) = stmt.database {
107 format!("{}.{}", db, stmt.table_name)
108 } else {
109 stmt.table_name.clone()
110 };
111
112 let table = self
113 .db
114 .catalog
115 .get_table(&table_name)
116 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
117
118 let pk_columns: std::collections::HashSet<String> =
120 table.primary_key.as_ref().map(|pk| pk.iter().cloned().collect()).unwrap_or_default();
121
122 let unique_columns: std::collections::HashSet<String> =
124 table.unique_constraints.iter().flatten().cloned().collect();
125
126 let mut rows: Vec<Row> = Vec::with_capacity(table.columns.len());
127
128 for col in &table.columns {
129 if let Some(ref pattern) = stmt.like_pattern {
131 if !like_match(pattern, &col.name) {
132 continue;
133 }
134 }
135
136 let field = SqlValue::Varchar(col.name.clone());
138
139 let type_str = format_data_type(&col.data_type);
141 let col_type = SqlValue::Varchar(type_str);
142
143 let nullable = SqlValue::Varchar(if col.nullable { "YES" } else { "NO" }.into());
145
146 let key = if pk_columns.contains(&col.name) {
148 SqlValue::Varchar("PRI".into())
149 } else if unique_columns.contains(&col.name) {
150 SqlValue::Varchar("UNI".into())
151 } else {
152 SqlValue::Varchar("".into())
153 };
154
155 let default = col
157 .default_value
158 .as_ref()
159 .map(|expr| SqlValue::Varchar(format!("{:?}", expr)))
160 .unwrap_or(SqlValue::Null);
161
162 let extra = SqlValue::Varchar("".into());
164
165 if stmt.full {
166 rows.push(Row::new(vec![
168 field,
169 col_type,
170 SqlValue::Null, nullable,
172 key,
173 default,
174 extra,
175 SqlValue::Varchar("select,insert,update,references".into()), SqlValue::Varchar("".into()), ]));
178 } else {
179 rows.push(Row::new(vec![field, col_type, nullable, key, default, extra]));
180 }
181 }
182
183 let columns = if stmt.full {
184 vec![
185 "Field".to_string(),
186 "Type".to_string(),
187 "Collation".to_string(),
188 "Null".to_string(),
189 "Key".to_string(),
190 "Default".to_string(),
191 "Extra".to_string(),
192 "Privileges".to_string(),
193 "Comment".to_string(),
194 ]
195 } else {
196 vec![
197 "Field".to_string(),
198 "Type".to_string(),
199 "Null".to_string(),
200 "Key".to_string(),
201 "Default".to_string(),
202 "Extra".to_string(),
203 ]
204 };
205
206 Ok(SelectResult { columns, rows })
207 }
208
209 pub fn execute_describe(&self, stmt: &DescribeStmt) -> Result<SelectResult, ExecutorError> {
211 let show_columns = ShowColumnsStmt {
213 table_name: stmt.table_name.clone(),
214 database: None,
215 full: false,
216 like_pattern: stmt.column_pattern.clone(),
217 where_clause: None,
218 };
219 self.execute_show_columns(&show_columns)
220 }
221
222 pub fn execute_show_index(&self, stmt: &ShowIndexStmt) -> Result<SelectResult, ExecutorError> {
226 let table_name = if let Some(ref db) = stmt.database {
228 format!("{}.{}", db, stmt.table_name)
229 } else {
230 stmt.table_name.clone()
231 };
232
233 let table = self
235 .db
236 .catalog
237 .get_table(&table_name)
238 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
239
240 let mut rows: Vec<Row> = Vec::new();
241
242 if let Some(ref pk_cols) = table.primary_key {
244 for (seq, col_name) in pk_cols.iter().enumerate() {
245 rows.push(Row::new(vec![
246 SqlValue::Varchar(stmt.table_name.clone()), SqlValue::Integer(0), SqlValue::Varchar("PRIMARY".into()), SqlValue::Integer((seq + 1) as i64), SqlValue::Varchar(col_name.clone()), SqlValue::Varchar("A".into()), SqlValue::Null, SqlValue::Null, SqlValue::Null, SqlValue::Varchar("".into()), SqlValue::Varchar("BTREE".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("YES".into()), ]));
261 }
262 }
263
264 let indexes = self.db.catalog.get_table_indexes(&stmt.table_name);
266 for index in indexes {
267 for (seq, col) in index.columns.iter().enumerate() {
268 let non_unique = if index.is_unique { 0 } else { 1 };
269 let collation = match col.order {
270 SortOrder::Ascending => "A",
271 SortOrder::Descending => "D",
272 };
273 let index_type = match index.index_type {
274 IndexType::BTree => "BTREE",
275 IndexType::Hash => "HASH",
276 IndexType::RTree => "RTREE",
277 IndexType::Fulltext => "FULLTEXT",
278 IndexType::IVFFlat { .. } => "IVFFLAT",
279 IndexType::Hnsw { .. } => "HNSW",
280 };
281
282 rows.push(Row::new(vec![
283 SqlValue::Varchar(stmt.table_name.clone()), SqlValue::Integer(non_unique), SqlValue::Varchar(index.name.clone()), SqlValue::Integer((seq + 1) as i64), SqlValue::Varchar(col.column_name.clone()), SqlValue::Varchar(collation.into()), SqlValue::Null, col.prefix_length
291 .map(|l| SqlValue::Integer(l as i64))
292 .unwrap_or(SqlValue::Null), SqlValue::Null, SqlValue::Varchar("".into()), SqlValue::Varchar(index_type.into()), SqlValue::Varchar("".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("YES".into()), ]));
300 }
301 }
302
303 Ok(SelectResult {
304 columns: vec![
305 "Table".to_string(),
306 "Non_unique".to_string(),
307 "Key_name".to_string(),
308 "Seq_in_index".to_string(),
309 "Column_name".to_string(),
310 "Collation".to_string(),
311 "Cardinality".to_string(),
312 "Sub_part".to_string(),
313 "Packed".to_string(),
314 "Null".to_string(),
315 "Index_type".to_string(),
316 "Comment".to_string(),
317 "Index_comment".to_string(),
318 "Visible".to_string(),
319 ],
320 rows,
321 })
322 }
323
324 pub fn execute_show_create_table(
328 &self,
329 stmt: &ShowCreateTableStmt,
330 ) -> Result<SelectResult, ExecutorError> {
331 let table = self
332 .db
333 .catalog
334 .get_table(&stmt.table_name)
335 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
336
337 let mut sql = format!("CREATE TABLE {} (\n", table.name);
339 let mut definitions: Vec<String> = Vec::new();
340
341 for col in &table.columns {
343 let mut col_def = format!(" {} {}", col.name, format_data_type(&col.data_type));
344
345 if !col.nullable {
346 col_def.push_str(" NOT NULL");
347 }
348
349 if let Some(ref default) = col.default_value {
350 col_def.push_str(&format!(" DEFAULT {:?}", default));
351 }
352
353 definitions.push(col_def);
354 }
355
356 if let Some(ref pk_cols) = table.primary_key {
358 definitions.push(format!(" PRIMARY KEY ({})", pk_cols.join(", ")));
359 }
360
361 for unique_cols in &table.unique_constraints {
363 definitions.push(format!(" UNIQUE ({})", unique_cols.join(", ")));
364 }
365
366 for fk in &table.foreign_keys {
368 let mut fk_def = format!(
369 " FOREIGN KEY ({}) REFERENCES {} ({})",
370 fk.column_names.join(", "),
371 fk.parent_table,
372 fk.parent_column_names.join(", ")
373 );
374
375 fk_def.push_str(&format!(" ON DELETE {}", format_referential_action(&fk.on_delete)));
377
378 fk_def.push_str(&format!(" ON UPDATE {}", format_referential_action(&fk.on_update)));
380
381 definitions.push(fk_def);
382 }
383
384 for (name, _expr) in &table.check_constraints {
386 definitions.push(format!(" CONSTRAINT {} CHECK (...)", name));
387 }
388
389 sql.push_str(&definitions.join(",\n"));
390 sql.push_str("\n)");
391
392 let rows =
393 vec![Row::new(vec![SqlValue::Varchar(table.name.clone()), SqlValue::Varchar(sql)])];
394
395 Ok(SelectResult { columns: vec!["Table".to_string(), "Create Table".to_string()], rows })
396 }
397}
398
399fn like_match(pattern: &str, text: &str) -> bool {
403 let pattern = pattern.to_lowercase();
404 let text = text.to_lowercase();
405
406 let pattern_chars: Vec<char> = pattern.chars().collect();
407 let text_chars: Vec<char> = text.chars().collect();
408
409 like_match_impl(&pattern_chars, &text_chars)
410}
411
412fn like_match_impl(pattern: &[char], text: &[char]) -> bool {
414 match (pattern.first(), text.first()) {
415 (None, None) => true,
416 (None, Some(_)) => false,
417 (Some('%'), _) => {
418 like_match_impl(&pattern[1..], text)
421 || (!text.is_empty() && like_match_impl(pattern, &text[1..]))
422 }
423 (Some('_'), Some(_)) => {
424 like_match_impl(&pattern[1..], &text[1..])
426 }
427 (Some('_'), None) => false,
428 (Some('\\'), _) if pattern.len() > 1 => {
429 if text.first() == Some(&pattern[1]) {
431 like_match_impl(&pattern[2..], &text[1..])
432 } else {
433 false
434 }
435 }
436 (Some(p), Some(t)) if *p == *t => like_match_impl(&pattern[1..], &text[1..]),
437 _ => false,
438 }
439}
440
441fn format_data_type(dt: &vibesql_types::DataType) -> String {
443 use vibesql_types::DataType;
444
445 match dt {
446 DataType::Integer => "INT".to_string(),
447 DataType::Smallint => "SMALLINT".to_string(),
448 DataType::Bigint => "BIGINT".to_string(),
449 DataType::Unsigned => "UNSIGNED BIGINT".to_string(),
450 DataType::Real => "REAL".to_string(),
451 DataType::Float { precision } => format!("FLOAT({})", precision),
452 DataType::DoublePrecision => "DOUBLE PRECISION".to_string(),
453 DataType::Numeric { precision, scale } => {
454 format!("NUMERIC({}, {})", precision, scale)
455 }
456 DataType::Decimal { precision, scale } => {
457 format!("DECIMAL({}, {})", precision, scale)
458 }
459 DataType::Varchar { max_length } => {
460 if let Some(len) = max_length {
461 format!("VARCHAR({})", len)
462 } else {
463 "VARCHAR".to_string()
464 }
465 }
466 DataType::Character { length } => {
467 format!("CHAR({})", length)
468 }
469 DataType::CharacterLargeObject => "CLOB".to_string(),
470 DataType::Name => "NAME".to_string(),
471 DataType::Boolean => "BOOLEAN".to_string(),
472 DataType::Date => "DATE".to_string(),
473 DataType::Time { with_timezone } => {
474 if *with_timezone {
475 "TIME WITH TIME ZONE".to_string()
476 } else {
477 "TIME".to_string()
478 }
479 }
480 DataType::Timestamp { with_timezone } => {
481 if *with_timezone {
482 "TIMESTAMP WITH TIME ZONE".to_string()
483 } else {
484 "TIMESTAMP".to_string()
485 }
486 }
487 DataType::Interval { start_field, end_field } => {
488 if let Some(end) = end_field {
489 format!("INTERVAL {:?} TO {:?}", start_field, end)
490 } else {
491 format!("INTERVAL {:?}", start_field)
492 }
493 }
494 DataType::BinaryLargeObject => "BLOB".to_string(),
495 DataType::Bit { length } => {
496 if let Some(len) = length {
497 format!("BIT({})", len)
498 } else {
499 "BIT".to_string()
500 }
501 }
502 DataType::UserDefined { type_name } => type_name.clone(),
503 DataType::Vector { dimensions } => format!("VECTOR({})", dimensions),
504 DataType::Null => "NULL".to_string(),
505 }
506}
507
508fn format_referential_action(action: &ReferentialAction) -> &'static str {
510 match action {
511 ReferentialAction::NoAction => "NO ACTION",
512 ReferentialAction::Restrict => "RESTRICT",
513 ReferentialAction::Cascade => "CASCADE",
514 ReferentialAction::SetNull => "SET NULL",
515 ReferentialAction::SetDefault => "SET DEFAULT",
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use vibesql_catalog::{ColumnSchema, TableSchema};
523 use vibesql_types::DataType;
524
525 fn create_test_db() -> Database {
526 let mut db = Database::new();
527 db.catalog.set_case_sensitive_identifiers(false);
528
529 let columns = vec![
531 ColumnSchema::new("id".to_string(), DataType::Integer, false),
532 ColumnSchema::new(
533 "name".to_string(),
534 DataType::Varchar { max_length: Some(100) },
535 true,
536 ),
537 ColumnSchema::new(
538 "email".to_string(),
539 DataType::Varchar { max_length: Some(255) },
540 false,
541 ),
542 ];
543 let schema =
544 TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
545 db.create_table(schema).unwrap();
546
547 let order_columns = vec![
549 ColumnSchema::new("id".to_string(), DataType::Integer, false),
550 ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
551 ColumnSchema::new(
552 "total".to_string(),
553 DataType::Decimal { precision: 10, scale: 2 },
554 true,
555 ),
556 ];
557 let order_schema = TableSchema::with_primary_key(
558 "orders".to_string(),
559 order_columns,
560 vec!["id".to_string()],
561 );
562 db.create_table(order_schema).unwrap();
563
564 db
565 }
566
567 #[test]
568 fn test_show_tables() {
569 let db = create_test_db();
570 let executor = IntrospectionExecutor::new(&db);
571
572 let stmt = ShowTablesStmt { database: None, like_pattern: None, where_clause: None };
573
574 let result = executor.execute_show_tables(&stmt).unwrap();
575 assert_eq!(result.columns.len(), 1);
576 assert!(result.columns[0].starts_with("Tables_in_"));
577
578 assert_eq!(result.rows.len(), 2);
580
581 let table_names: Vec<&str> = result
583 .rows
584 .iter()
585 .filter_map(|r| match &r.values[0] {
586 SqlValue::Varchar(s) => Some(s.as_str()),
587 _ => None,
588 })
589 .collect();
590
591 assert!(table_names.contains(&"users") || table_names.contains(&"USERS"));
592 assert!(table_names.contains(&"orders") || table_names.contains(&"ORDERS"));
593 }
594
595 #[test]
596 fn test_show_tables_with_like() {
597 let db = create_test_db();
598 let executor = IntrospectionExecutor::new(&db);
599
600 let stmt = ShowTablesStmt {
601 database: None,
602 like_pattern: Some("user%".to_string()),
603 where_clause: None,
604 };
605
606 let result = executor.execute_show_tables(&stmt).unwrap();
607 assert_eq!(result.rows.len(), 1);
609 }
610
611 #[test]
612 fn test_show_databases() {
613 let db = create_test_db();
614 let executor = IntrospectionExecutor::new(&db);
615
616 let stmt = ShowDatabasesStmt { like_pattern: None, where_clause: None };
617
618 let result = executor.execute_show_databases(&stmt).unwrap();
619 assert_eq!(result.columns, vec!["Database"]);
620
621 assert!(!result.rows.is_empty());
623
624 let schema_names: Vec<&str> = result
625 .rows
626 .iter()
627 .filter_map(|r| match &r.values[0] {
628 SqlValue::Varchar(s) => Some(s.as_str()),
629 _ => None,
630 })
631 .collect();
632
633 assert!(schema_names.contains(&"public"));
634 }
635
636 #[test]
637 fn test_show_columns() {
638 let db = create_test_db();
639 let executor = IntrospectionExecutor::new(&db);
640
641 let stmt = ShowColumnsStmt {
642 table_name: "users".to_string(),
643 database: None,
644 full: false,
645 like_pattern: None,
646 where_clause: None,
647 };
648
649 let result = executor.execute_show_columns(&stmt).unwrap();
650 assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
651 assert_eq!(result.rows.len(), 3); }
653
654 #[test]
655 fn test_show_columns_full() {
656 let db = create_test_db();
657 let executor = IntrospectionExecutor::new(&db);
658
659 let stmt = ShowColumnsStmt {
660 table_name: "users".to_string(),
661 database: None,
662 full: true,
663 like_pattern: None,
664 where_clause: None,
665 };
666
667 let result = executor.execute_show_columns(&stmt).unwrap();
668 assert_eq!(result.columns.len(), 9);
669 assert!(result.columns.contains(&"Privileges".to_string()));
670 assert!(result.columns.contains(&"Comment".to_string()));
671 }
672
673 #[test]
674 fn test_describe() {
675 let db = create_test_db();
676 let executor = IntrospectionExecutor::new(&db);
677
678 let stmt = DescribeStmt { table_name: "users".to_string(), column_pattern: None };
679
680 let result = executor.execute_describe(&stmt).unwrap();
681 assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
682 assert_eq!(result.rows.len(), 3);
683 }
684
685 #[test]
686 fn test_show_index() {
687 let db = create_test_db();
688 let executor = IntrospectionExecutor::new(&db);
689
690 let stmt = ShowIndexStmt { table_name: "users".to_string(), database: None };
691
692 let result = executor.execute_show_index(&stmt).unwrap();
693
694 assert!(!result.rows.is_empty());
696
697 let has_primary = result.rows.iter().any(|r| {
699 match &r.values[2] {
700 SqlValue::Varchar(s) => s == "PRIMARY",
702 _ => false,
703 }
704 });
705 assert!(has_primary);
706 }
707
708 #[test]
709 fn test_show_create_table() {
710 let db = create_test_db();
711 let executor = IntrospectionExecutor::new(&db);
712
713 let stmt = ShowCreateTableStmt { table_name: "users".to_string() };
714
715 let result = executor.execute_show_create_table(&stmt).unwrap();
716 assert_eq!(result.columns, vec!["Table", "Create Table"]);
717 assert_eq!(result.rows.len(), 1);
718
719 if let SqlValue::Varchar(sql) = &result.rows[0].values[1] {
721 assert!(sql.contains("CREATE TABLE"));
722 assert!(sql.contains("users"));
723 assert!(sql.contains("id"));
724 assert!(sql.contains("name"));
725 assert!(sql.contains("email"));
726 assert!(sql.contains("PRIMARY KEY"));
727 } else {
728 panic!("Expected VARCHAR for Create Table");
729 }
730 }
731
732 #[test]
733 fn test_like_match() {
734 assert!(like_match("test", "test"));
736 assert!(like_match("test", "TEST")); assert!(!like_match("test", "testing"));
738
739 assert!(like_match("test%", "test"));
741 assert!(like_match("test%", "testing"));
742 assert!(like_match("%test", "test"));
743 assert!(like_match("%test", "mytest"));
744 assert!(like_match("%test%", "test"));
745 assert!(like_match("%test%", "mytesting"));
746
747 assert!(like_match("te_t", "test"));
749 assert!(!like_match("te_t", "tet"));
750 assert!(!like_match("te_t", "testt"));
751
752 assert!(like_match("te\\_t", "te_t"));
754 assert!(!like_match("te\\_t", "test"));
755 }
756}