1use vibesql_ast::{
7 DescribeStmt, ShowColumnsStmt, ShowCreateTableStmt, ShowDatabasesStmt, ShowIndexStmt,
8 ShowTablesStmt,
9};
10use vibesql_catalog::{IndexType, ReferentialAction, SortOrder};
11use vibesql_storage::{Database, Row};
12use vibesql_types::SqlValue;
13
14use crate::{errors::ExecutorError, select::SelectResult};
15
16pub struct IntrospectionExecutor<'a> {
18 db: &'a Database,
19}
20
21impl<'a> IntrospectionExecutor<'a> {
22 pub fn new(db: &'a Database) -> Self {
24 Self { db }
25 }
26
27 pub fn execute_show_tables(
32 &self,
33 stmt: &ShowTablesStmt,
34 ) -> Result<SelectResult, ExecutorError> {
35 let schema_name =
37 stmt.database.as_deref().unwrap_or_else(|| self.db.catalog.get_current_schema());
38
39 let tables = if let Some(schema) = self.db.catalog.get_schema(schema_name) {
41 schema.list_tables()
42 } else {
43 let upper_name = schema_name.to_uppercase();
45 self.db
46 .catalog
47 .list_schemas()
48 .into_iter()
49 .find(|s| s.to_uppercase() == upper_name)
50 .and_then(|actual_name| self.db.catalog.get_schema(&actual_name))
51 .map(|schema| schema.list_tables())
52 .unwrap_or_default()
53 };
54
55 let filtered_tables: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
57 tables.into_iter().filter(|t| like_match(pattern, t)).collect()
58 } else {
59 tables
60 };
61
62 let column_name = format!("Tables_in_{}", schema_name);
64 let rows: Vec<Row> = filtered_tables
65 .into_iter()
66 .map(|name| Row::new(vec![SqlValue::Varchar(arcstr::ArcStr::from(name))]))
67 .collect();
68
69 Ok(SelectResult { columns: vec![column_name], rows })
70 }
71
72 pub fn execute_show_databases(
77 &self,
78 stmt: &ShowDatabasesStmt,
79 ) -> Result<SelectResult, ExecutorError> {
80 let schemas = self.db.catalog.list_schemas();
81
82 let filtered_schemas: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
84 schemas.into_iter().filter(|s| like_match(pattern, s)).collect()
85 } else {
86 schemas
87 };
88
89 let rows: Vec<Row> = filtered_schemas
90 .into_iter()
91 .map(|name| Row::new(vec![SqlValue::Varchar(arcstr::ArcStr::from(name))]))
92 .collect();
93
94 Ok(SelectResult { columns: vec!["Database".to_string()], rows })
95 }
96
97 pub fn execute_show_columns(
101 &self,
102 stmt: &ShowColumnsStmt,
103 ) -> Result<SelectResult, ExecutorError> {
104 let table_name = if let Some(ref db) = stmt.database {
106 format!("{}.{}", db, stmt.table_name)
107 } else {
108 stmt.table_name.clone()
109 };
110
111 let table = self
112 .db
113 .catalog
114 .get_table(&table_name)
115 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
116
117 let pk_columns: std::collections::HashSet<String> =
119 table.primary_key.as_ref().map(|pk| pk.iter().cloned().collect()).unwrap_or_default();
120
121 let unique_columns: std::collections::HashSet<String> =
123 table.unique_constraints.iter().flatten().cloned().collect();
124
125 let mut rows: Vec<Row> = Vec::with_capacity(table.columns.len());
126
127 for col in &table.columns {
128 if let Some(ref pattern) = stmt.like_pattern {
130 if !like_match(pattern, &col.name) {
131 continue;
132 }
133 }
134
135 let field = SqlValue::Varchar(arcstr::ArcStr::from(col.name.clone()));
137
138 let type_str = format_data_type(&col.data_type);
140 let col_type = SqlValue::Varchar(arcstr::ArcStr::from(type_str));
141
142 let nullable = SqlValue::Varchar(if col.nullable { "YES" } else { "NO" }.into());
144
145 let key = if pk_columns.contains(&col.name) {
147 SqlValue::Varchar("PRI".into())
148 } else if unique_columns.contains(&col.name) {
149 SqlValue::Varchar("UNI".into())
150 } else {
151 SqlValue::Varchar("".into())
152 };
153
154 let default = col
156 .default_value
157 .as_ref()
158 .map(|expr| SqlValue::Varchar(arcstr::ArcStr::from(format!("{:?}", expr))))
159 .unwrap_or(SqlValue::Null);
160
161 let extra = SqlValue::Varchar("".into());
163
164 if stmt.full {
165 rows.push(Row::new(vec![
167 field,
168 col_type,
169 SqlValue::Null, nullable,
171 key,
172 default,
173 extra,
174 SqlValue::Varchar("select,insert,update,references".into()), SqlValue::Varchar("".into()), ]));
177 } else {
178 rows.push(Row::new(vec![field, col_type, nullable, key, default, extra]));
179 }
180 }
181
182 let columns = if stmt.full {
183 vec![
184 "Field".to_string(),
185 "Type".to_string(),
186 "Collation".to_string(),
187 "Null".to_string(),
188 "Key".to_string(),
189 "Default".to_string(),
190 "Extra".to_string(),
191 "Privileges".to_string(),
192 "Comment".to_string(),
193 ]
194 } else {
195 vec![
196 "Field".to_string(),
197 "Type".to_string(),
198 "Null".to_string(),
199 "Key".to_string(),
200 "Default".to_string(),
201 "Extra".to_string(),
202 ]
203 };
204
205 Ok(SelectResult { columns, rows })
206 }
207
208 pub fn execute_describe(&self, stmt: &DescribeStmt) -> Result<SelectResult, ExecutorError> {
210 let show_columns = ShowColumnsStmt {
212 table_name: stmt.table_name.clone(),
213 database: None,
214 full: false,
215 like_pattern: stmt.column_pattern.clone(),
216 where_clause: None,
217 };
218 self.execute_show_columns(&show_columns)
219 }
220
221 pub fn execute_show_index(&self, stmt: &ShowIndexStmt) -> Result<SelectResult, ExecutorError> {
225 let table_name = if let Some(ref db) = stmt.database {
227 format!("{}.{}", db, stmt.table_name)
228 } else {
229 stmt.table_name.clone()
230 };
231
232 let table = self
234 .db
235 .catalog
236 .get_table(&table_name)
237 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
238
239 let mut rows: Vec<Row> = Vec::new();
240
241 if let Some(ref pk_cols) = table.primary_key {
243 for (seq, col_name) in pk_cols.iter().enumerate() {
244 rows.push(Row::new(vec![
245 SqlValue::Varchar(arcstr::ArcStr::from(stmt.table_name.clone())), SqlValue::Integer(0), SqlValue::Varchar("PRIMARY".into()), SqlValue::Integer((seq + 1) as i64), SqlValue::Varchar(arcstr::ArcStr::from(col_name.to_string())), SqlValue::Varchar("A".into()), SqlValue::Null, SqlValue::Null, SqlValue::Null, SqlValue::Varchar("".into()), SqlValue::Varchar("BTREE".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("YES".into()), ]));
260 }
261 }
262
263 let indexes = self.db.catalog.get_table_indexes(&stmt.table_name);
265 for index in indexes {
266 for (seq, col) in index.columns.iter().enumerate() {
267 let non_unique = if index.is_unique { 0 } else { 1 };
268 let collation = match col.order() {
269 SortOrder::Ascending => "A",
270 SortOrder::Descending => "D",
271 };
272 let index_type = match index.index_type {
273 IndexType::BTree => "BTREE",
274 IndexType::Hash => "HASH",
275 IndexType::RTree => "RTREE",
276 IndexType::Fulltext => "FULLTEXT",
277 IndexType::IVFFlat { .. } => "IVFFLAT",
278 IndexType::Hnsw { .. } => "HNSW",
279 };
280
281 let column_name = col
283 .column_name()
284 .map(|s| s.to_string())
285 .unwrap_or_else(|| "(expression)".to_string());
286
287 rows.push(Row::new(vec![
288 SqlValue::Varchar(arcstr::ArcStr::from(stmt.table_name.clone())), SqlValue::Integer(non_unique), SqlValue::Varchar(arcstr::ArcStr::from(index.name.clone())), SqlValue::Integer((seq + 1) as i64), SqlValue::Varchar(arcstr::ArcStr::from(column_name)), SqlValue::Varchar(collation.into()), SqlValue::Null, col.prefix_length()
296 .map(|l| SqlValue::Integer(l as i64))
297 .unwrap_or(SqlValue::Null), SqlValue::Null, SqlValue::Varchar("".into()), SqlValue::Varchar(index_type.into()), SqlValue::Varchar("".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("YES".into()), ]));
305 }
306 }
307
308 Ok(SelectResult {
309 columns: vec![
310 "Table".to_string(),
311 "Non_unique".to_string(),
312 "Key_name".to_string(),
313 "Seq_in_index".to_string(),
314 "Column_name".to_string(),
315 "Collation".to_string(),
316 "Cardinality".to_string(),
317 "Sub_part".to_string(),
318 "Packed".to_string(),
319 "Null".to_string(),
320 "Index_type".to_string(),
321 "Comment".to_string(),
322 "Index_comment".to_string(),
323 "Visible".to_string(),
324 ],
325 rows,
326 })
327 }
328
329 pub fn execute_show_create_table(
333 &self,
334 stmt: &ShowCreateTableStmt,
335 ) -> Result<SelectResult, ExecutorError> {
336 let table = self
337 .db
338 .catalog
339 .get_table(&stmt.table_name)
340 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
341
342 let mut sql = format!("CREATE TABLE {} (\n", table.name);
344 let mut definitions: Vec<String> = Vec::new();
345
346 for col in &table.columns {
348 let mut col_def = format!(" {} {}", col.name, format_data_type(&col.data_type));
349
350 if !col.nullable {
351 col_def.push_str(" NOT NULL");
352 }
353
354 if let Some(ref default) = col.default_value {
355 col_def.push_str(&format!(" DEFAULT {:?}", default));
356 }
357
358 definitions.push(col_def);
359 }
360
361 if let Some(ref pk_cols) = table.primary_key {
363 definitions.push(format!(" PRIMARY KEY ({})", pk_cols.join(", ")));
364 }
365
366 for unique_cols in &table.unique_constraints {
368 definitions.push(format!(" UNIQUE ({})", unique_cols.join(", ")));
369 }
370
371 for fk in &table.foreign_keys {
373 let mut fk_def = format!(
374 " FOREIGN KEY ({}) REFERENCES {} ({})",
375 fk.column_names.join(", "),
376 fk.parent_table,
377 fk.parent_column_names.join(", ")
378 );
379
380 fk_def.push_str(&format!(" ON DELETE {}", format_referential_action(&fk.on_delete)));
382
383 fk_def.push_str(&format!(" ON UPDATE {}", format_referential_action(&fk.on_update)));
385
386 definitions.push(fk_def);
387 }
388
389 for (name, _expr) in &table.check_constraints {
391 definitions.push(format!(" CONSTRAINT {} CHECK (...)", name));
392 }
393
394 sql.push_str(&definitions.join(",\n"));
395 sql.push_str("\n)");
396
397 let rows = vec![Row::new(vec![
398 SqlValue::Varchar(arcstr::ArcStr::from(table.name.clone())),
399 SqlValue::Varchar(arcstr::ArcStr::from(sql)),
400 ])];
401
402 Ok(SelectResult { columns: vec!["Table".to_string(), "Create Table".to_string()], rows })
403 }
404}
405
406fn like_match(pattern: &str, text: &str) -> bool {
410 let pattern = pattern.to_lowercase();
411 let text = text.to_lowercase();
412
413 let pattern_chars: Vec<char> = pattern.chars().collect();
414 let text_chars: Vec<char> = text.chars().collect();
415
416 like_match_impl(&pattern_chars, &text_chars)
417}
418
419fn like_match_impl(pattern: &[char], text: &[char]) -> bool {
421 match (pattern.first(), text.first()) {
422 (None, None) => true,
423 (None, Some(_)) => false,
424 (Some('%'), _) => {
425 like_match_impl(&pattern[1..], text)
428 || (!text.is_empty() && like_match_impl(pattern, &text[1..]))
429 }
430 (Some('_'), Some(_)) => {
431 like_match_impl(&pattern[1..], &text[1..])
433 }
434 (Some('_'), None) => false,
435 (Some('\\'), _) if pattern.len() > 1 => {
436 if text.first() == Some(&pattern[1]) {
438 like_match_impl(&pattern[2..], &text[1..])
439 } else {
440 false
441 }
442 }
443 (Some(p), Some(t)) if *p == *t => like_match_impl(&pattern[1..], &text[1..]),
444 _ => false,
445 }
446}
447
448fn format_data_type(dt: &vibesql_types::DataType) -> String {
450 use vibesql_types::DataType;
451
452 match dt {
453 DataType::Integer => "INT".to_string(),
454 DataType::Smallint => "SMALLINT".to_string(),
455 DataType::Bigint => "BIGINT".to_string(),
456 DataType::Unsigned => "UNSIGNED BIGINT".to_string(),
457 DataType::Real => "REAL".to_string(),
458 DataType::Float { precision } => format!("FLOAT({})", precision),
459 DataType::DoublePrecision => "DOUBLE PRECISION".to_string(),
460 DataType::Numeric { precision, scale } => {
461 format!("NUMERIC({}, {})", precision, scale)
462 }
463 DataType::Decimal { precision, scale } => {
464 format!("DECIMAL({}, {})", precision, scale)
465 }
466 DataType::Varchar { max_length } => {
467 if let Some(len) = max_length {
468 format!("VARCHAR({})", len)
469 } else {
470 "VARCHAR".to_string()
471 }
472 }
473 DataType::Character { length } => {
474 format!("CHAR({})", length)
475 }
476 DataType::CharacterLargeObject => "CLOB".to_string(),
477 DataType::Name => "NAME".to_string(),
478 DataType::Boolean => "BOOLEAN".to_string(),
479 DataType::Date => "DATE".to_string(),
480 DataType::Time { with_timezone } => {
481 if *with_timezone {
482 "TIME WITH TIME ZONE".to_string()
483 } else {
484 "TIME".to_string()
485 }
486 }
487 DataType::Timestamp { with_timezone } => {
488 if *with_timezone {
489 "TIMESTAMP WITH TIME ZONE".to_string()
490 } else {
491 "TIMESTAMP".to_string()
492 }
493 }
494 DataType::Interval { start_field, end_field } => {
495 if let Some(end) = end_field {
496 format!("INTERVAL {:?} TO {:?}", start_field, end)
497 } else {
498 format!("INTERVAL {:?}", start_field)
499 }
500 }
501 DataType::BinaryLargeObject => "BLOB".to_string(),
502 DataType::Bit { length } => {
503 if let Some(len) = length {
504 format!("BIT({})", len)
505 } else {
506 "BIT".to_string()
507 }
508 }
509 DataType::UserDefined { type_name } => type_name.clone(),
510 DataType::Vector { dimensions } => format!("VECTOR({})", dimensions),
511 DataType::Null => "NULL".to_string(),
512 }
513}
514
515fn format_referential_action(action: &ReferentialAction) -> &'static str {
517 match action {
518 ReferentialAction::NoAction => "NO ACTION",
519 ReferentialAction::Restrict => "RESTRICT",
520 ReferentialAction::Cascade => "CASCADE",
521 ReferentialAction::SetNull => "SET NULL",
522 ReferentialAction::SetDefault => "SET DEFAULT",
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use vibesql_catalog::{ColumnSchema, TableSchema};
529 use vibesql_types::DataType;
530
531 use super::*;
532
533 fn create_test_db() -> Database {
534 let mut db = Database::new();
535 db.catalog.set_case_sensitive_identifiers(false);
536
537 let columns = vec![
539 ColumnSchema::new("id".to_string(), DataType::Integer, false),
540 ColumnSchema::new(
541 "name".to_string(),
542 DataType::Varchar { max_length: Some(100) },
543 true,
544 ),
545 ColumnSchema::new(
546 "email".to_string(),
547 DataType::Varchar { max_length: Some(255) },
548 false,
549 ),
550 ];
551 let schema =
552 TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
553 db.create_table(schema).unwrap();
554
555 let order_columns = vec![
557 ColumnSchema::new("id".to_string(), DataType::Integer, false),
558 ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
559 ColumnSchema::new(
560 "total".to_string(),
561 DataType::Decimal { precision: 10, scale: 2 },
562 true,
563 ),
564 ];
565 let order_schema = TableSchema::with_primary_key(
566 "orders".to_string(),
567 order_columns,
568 vec!["id".to_string()],
569 );
570 db.create_table(order_schema).unwrap();
571
572 db
573 }
574
575 #[test]
576 fn test_show_tables() {
577 let db = create_test_db();
578 let executor = IntrospectionExecutor::new(&db);
579
580 let stmt = ShowTablesStmt { database: None, like_pattern: None, where_clause: None };
581
582 let result = executor.execute_show_tables(&stmt).unwrap();
583 assert_eq!(result.columns.len(), 1);
584 assert!(result.columns[0].starts_with("Tables_in_"));
585
586 assert_eq!(result.rows.len(), 2);
588
589 let table_names: Vec<&str> = result
591 .rows
592 .iter()
593 .filter_map(|r| match &r.values[0] {
594 SqlValue::Varchar(s) => Some(s.as_ref()),
595 _ => None,
596 })
597 .collect();
598
599 assert!(table_names.contains(&"users") || table_names.contains(&"USERS"));
600 assert!(table_names.contains(&"orders") || table_names.contains(&"ORDERS"));
601 }
602
603 #[test]
604 fn test_show_tables_with_like() {
605 let db = create_test_db();
606 let executor = IntrospectionExecutor::new(&db);
607
608 let stmt = ShowTablesStmt {
609 database: None,
610 like_pattern: Some("user%".to_string()),
611 where_clause: None,
612 };
613
614 let result = executor.execute_show_tables(&stmt).unwrap();
615 assert_eq!(result.rows.len(), 1);
617 }
618
619 #[test]
620 fn test_show_databases() {
621 let db = create_test_db();
622 let executor = IntrospectionExecutor::new(&db);
623
624 let stmt = ShowDatabasesStmt { like_pattern: None, where_clause: None };
625
626 let result = executor.execute_show_databases(&stmt).unwrap();
627 assert_eq!(result.columns, vec!["Database"]);
628
629 assert!(!result.rows.is_empty());
631
632 let schema_names: Vec<&str> = result
633 .rows
634 .iter()
635 .filter_map(|r| match &r.values[0] {
636 SqlValue::Varchar(s) => Some(s.as_ref()),
637 _ => None,
638 })
639 .collect();
640
641 assert!(schema_names.contains(&vibesql_catalog::DEFAULT_SCHEMA));
642 }
643
644 #[test]
645 fn test_show_columns() {
646 let db = create_test_db();
647 let executor = IntrospectionExecutor::new(&db);
648
649 let stmt = ShowColumnsStmt {
650 table_name: "users".to_string(),
651 database: None,
652 full: false,
653 like_pattern: None,
654 where_clause: None,
655 };
656
657 let result = executor.execute_show_columns(&stmt).unwrap();
658 assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
659 assert_eq!(result.rows.len(), 3); }
661
662 #[test]
663 fn test_show_columns_full() {
664 let db = create_test_db();
665 let executor = IntrospectionExecutor::new(&db);
666
667 let stmt = ShowColumnsStmt {
668 table_name: "users".to_string(),
669 database: None,
670 full: true,
671 like_pattern: None,
672 where_clause: None,
673 };
674
675 let result = executor.execute_show_columns(&stmt).unwrap();
676 assert_eq!(result.columns.len(), 9);
677 assert!(result.columns.contains(&"Privileges".to_string()));
678 assert!(result.columns.contains(&"Comment".to_string()));
679 }
680
681 #[test]
682 fn test_describe() {
683 let db = create_test_db();
684 let executor = IntrospectionExecutor::new(&db);
685
686 let stmt = DescribeStmt { table_name: "users".to_string(), column_pattern: None };
687
688 let result = executor.execute_describe(&stmt).unwrap();
689 assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
690 assert_eq!(result.rows.len(), 3);
691 }
692
693 #[test]
694 fn test_show_index() {
695 let db = create_test_db();
696 let executor = IntrospectionExecutor::new(&db);
697
698 let stmt = ShowIndexStmt { table_name: "users".to_string(), database: None };
699
700 let result = executor.execute_show_index(&stmt).unwrap();
701
702 assert!(!result.rows.is_empty());
704
705 let has_primary = result.rows.iter().any(|r| {
707 match &r.values[2] {
708 SqlValue::Varchar(s) => s.as_str() == "PRIMARY",
710 _ => false,
711 }
712 });
713 assert!(has_primary);
714 }
715
716 #[test]
717 fn test_show_create_table() {
718 let db = create_test_db();
719 let executor = IntrospectionExecutor::new(&db);
720
721 let stmt = ShowCreateTableStmt { table_name: "users".to_string() };
722
723 let result = executor.execute_show_create_table(&stmt).unwrap();
724 assert_eq!(result.columns, vec!["Table", "Create Table"]);
725 assert_eq!(result.rows.len(), 1);
726
727 if let SqlValue::Varchar(sql) = &result.rows[0].values[1] {
729 assert!(sql.contains("CREATE TABLE"));
730 assert!(sql.contains("users"));
731 assert!(sql.contains("id"));
732 assert!(sql.contains("name"));
733 assert!(sql.contains("email"));
734 assert!(sql.contains("PRIMARY KEY"));
735 } else {
736 panic!("Expected VARCHAR for Create Table");
737 }
738 }
739
740 #[test]
741 fn test_like_match() {
742 assert!(like_match("test", "test"));
744 assert!(like_match("test", "TEST")); assert!(!like_match("test", "testing"));
746
747 assert!(like_match("test%", "test"));
749 assert!(like_match("test%", "testing"));
750 assert!(like_match("%test", "test"));
751 assert!(like_match("%test", "mytest"));
752 assert!(like_match("%test%", "test"));
753 assert!(like_match("%test%", "mytesting"));
754
755 assert!(like_match("te_t", "test"));
757 assert!(!like_match("te_t", "tet"));
758 assert!(!like_match("te_t", "testt"));
759
760 assert!(like_match("te\\_t", "te_t"));
762 assert!(!like_match("te\\_t", "test"));
763 }
764}