1use vibesql_ast::{DescribeStmt, ShowColumnsStmt, ShowCreateTableStmt, ShowDatabasesStmt, ShowIndexStmt, ShowTablesStmt};
7use vibesql_catalog::{IndexType, ReferentialAction, SortOrder};
8use vibesql_storage::{Database, Row};
9use vibesql_types::SqlValue;
10
11use crate::errors::ExecutorError;
12use crate::select::SelectResult;
13
14pub struct IntrospectionExecutor<'a> {
16 db: &'a Database,
17}
18
19impl<'a> IntrospectionExecutor<'a> {
20 pub fn new(db: &'a Database) -> Self {
22 Self { db }
23 }
24
25 pub fn execute_show_tables(&self, stmt: &ShowTablesStmt) -> Result<SelectResult, ExecutorError> {
30 let schema_name = stmt.database.as_deref().unwrap_or_else(|| {
32 self.db.catalog.get_current_schema()
33 });
34
35 let tables = if let Some(schema) = self.db.catalog.get_schema(schema_name) {
37 schema.list_tables()
38 } else {
39 let upper_name = schema_name.to_uppercase();
41 self.db.catalog.list_schemas()
42 .into_iter()
43 .find(|s| s.to_uppercase() == upper_name)
44 .and_then(|actual_name| self.db.catalog.get_schema(&actual_name))
45 .map(|schema| schema.list_tables())
46 .unwrap_or_default()
47 };
48
49 let filtered_tables: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
51 tables.into_iter().filter(|t| like_match(pattern, t)).collect()
52 } else {
53 tables
54 };
55
56 let column_name = format!("Tables_in_{}", schema_name);
58 let rows: Vec<Row> = filtered_tables
59 .into_iter()
60 .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
61 .collect();
62
63 Ok(SelectResult {
64 columns: vec![column_name],
65 rows,
66 })
67 }
68
69 pub fn execute_show_databases(&self, stmt: &ShowDatabasesStmt) -> Result<SelectResult, ExecutorError> {
74 let schemas = self.db.catalog.list_schemas();
75
76 let filtered_schemas: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
78 schemas.into_iter().filter(|s| like_match(pattern, s)).collect()
79 } else {
80 schemas
81 };
82
83 let rows: Vec<Row> = filtered_schemas
84 .into_iter()
85 .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
86 .collect();
87
88 Ok(SelectResult {
89 columns: vec!["Database".to_string()],
90 rows,
91 })
92 }
93
94 pub fn execute_show_columns(&self, stmt: &ShowColumnsStmt) -> Result<SelectResult, ExecutorError> {
98 let table_name = if let Some(ref db) = stmt.database {
100 format!("{}.{}", db, stmt.table_name)
101 } else {
102 stmt.table_name.clone()
103 };
104
105 let table = self.db.catalog.get_table(&table_name)
106 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
107
108 let pk_columns: std::collections::HashSet<String> = table.primary_key
110 .as_ref()
111 .map(|pk| pk.iter().cloned().collect())
112 .unwrap_or_default();
113
114 let unique_columns: std::collections::HashSet<String> = table.unique_constraints
116 .iter()
117 .flatten()
118 .cloned()
119 .collect();
120
121 let mut rows: Vec<Row> = Vec::with_capacity(table.columns.len());
122
123 for col in &table.columns {
124 if let Some(ref pattern) = stmt.like_pattern {
126 if !like_match(pattern, &col.name) {
127 continue;
128 }
129 }
130
131 let field = SqlValue::Varchar(col.name.clone());
133
134 let type_str = format_data_type(&col.data_type);
136 let col_type = SqlValue::Varchar(type_str);
137
138 let nullable = SqlValue::Varchar(if col.nullable { "YES" } else { "NO" }.into());
140
141 let key = if pk_columns.contains(&col.name) {
143 SqlValue::Varchar("PRI".into())
144 } else if unique_columns.contains(&col.name) {
145 SqlValue::Varchar("UNI".into())
146 } else {
147 SqlValue::Varchar("".into())
148 };
149
150 let default = col.default_value
152 .as_ref()
153 .map(|expr| SqlValue::Varchar(format!("{:?}", expr)))
154 .unwrap_or(SqlValue::Null);
155
156 let extra = SqlValue::Varchar("".into());
158
159 if stmt.full {
160 rows.push(Row::new(vec![
162 field,
163 col_type,
164 SqlValue::Null, nullable,
166 key,
167 default,
168 extra,
169 SqlValue::Varchar("select,insert,update,references".into()), SqlValue::Varchar("".into()), ]));
172 } else {
173 rows.push(Row::new(vec![field, col_type, nullable, key, default, extra]));
174 }
175 }
176
177 let columns = if stmt.full {
178 vec![
179 "Field".to_string(),
180 "Type".to_string(),
181 "Collation".to_string(),
182 "Null".to_string(),
183 "Key".to_string(),
184 "Default".to_string(),
185 "Extra".to_string(),
186 "Privileges".to_string(),
187 "Comment".to_string(),
188 ]
189 } else {
190 vec![
191 "Field".to_string(),
192 "Type".to_string(),
193 "Null".to_string(),
194 "Key".to_string(),
195 "Default".to_string(),
196 "Extra".to_string(),
197 ]
198 };
199
200 Ok(SelectResult { columns, rows })
201 }
202
203 pub fn execute_describe(&self, stmt: &DescribeStmt) -> Result<SelectResult, ExecutorError> {
205 let show_columns = ShowColumnsStmt {
207 table_name: stmt.table_name.clone(),
208 database: None,
209 full: false,
210 like_pattern: stmt.column_pattern.clone(),
211 where_clause: None,
212 };
213 self.execute_show_columns(&show_columns)
214 }
215
216 pub fn execute_show_index(&self, stmt: &ShowIndexStmt) -> Result<SelectResult, ExecutorError> {
220 let table_name = if let Some(ref db) = stmt.database {
222 format!("{}.{}", db, stmt.table_name)
223 } else {
224 stmt.table_name.clone()
225 };
226
227 let table = self.db.catalog.get_table(&table_name)
229 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
230
231 let mut rows: Vec<Row> = Vec::new();
232
233 if let Some(ref pk_cols) = table.primary_key {
235 for (seq, col_name) in pk_cols.iter().enumerate() {
236 rows.push(Row::new(vec![
237 SqlValue::Varchar(stmt.table_name.clone()), SqlValue::Integer(0), SqlValue::Varchar("PRIMARY".into()), SqlValue::Integer((seq + 1) as i64), SqlValue::Varchar(col_name.clone()), SqlValue::Varchar("A".into()), SqlValue::Null, SqlValue::Null, SqlValue::Null, SqlValue::Varchar("".into()), SqlValue::Varchar("BTREE".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("YES".into()), ]));
252 }
253 }
254
255 let indexes = self.db.catalog.get_table_indexes(&stmt.table_name);
257 for index in indexes {
258 for (seq, col) in index.columns.iter().enumerate() {
259 let non_unique = if index.is_unique { 0 } else { 1 };
260 let collation = match col.order {
261 SortOrder::Ascending => "A",
262 SortOrder::Descending => "D",
263 };
264 let index_type = match index.index_type {
265 IndexType::BTree => "BTREE",
266 IndexType::Hash => "HASH",
267 IndexType::RTree => "RTREE",
268 IndexType::Fulltext => "FULLTEXT",
269 };
270
271 rows.push(Row::new(vec![
272 SqlValue::Varchar(stmt.table_name.clone()), SqlValue::Integer(non_unique), SqlValue::Varchar(index.name.clone()), SqlValue::Integer((seq + 1) as i64), SqlValue::Varchar(col.column_name.clone()), SqlValue::Varchar(collation.into()), SqlValue::Null, col.prefix_length.map(|l| SqlValue::Integer(l as i64)).unwrap_or(SqlValue::Null), SqlValue::Null, SqlValue::Varchar("".into()), SqlValue::Varchar(index_type.into()), SqlValue::Varchar("".into()), SqlValue::Varchar("".into()), SqlValue::Varchar("YES".into()), ]));
287 }
288 }
289
290 Ok(SelectResult {
291 columns: vec![
292 "Table".to_string(),
293 "Non_unique".to_string(),
294 "Key_name".to_string(),
295 "Seq_in_index".to_string(),
296 "Column_name".to_string(),
297 "Collation".to_string(),
298 "Cardinality".to_string(),
299 "Sub_part".to_string(),
300 "Packed".to_string(),
301 "Null".to_string(),
302 "Index_type".to_string(),
303 "Comment".to_string(),
304 "Index_comment".to_string(),
305 "Visible".to_string(),
306 ],
307 rows,
308 })
309 }
310
311 pub fn execute_show_create_table(&self, stmt: &ShowCreateTableStmt) -> Result<SelectResult, ExecutorError> {
315 let table = self.db.catalog.get_table(&stmt.table_name)
316 .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
317
318 let mut sql = format!("CREATE TABLE {} (\n", table.name);
320 let mut definitions: Vec<String> = Vec::new();
321
322 for col in &table.columns {
324 let mut col_def = format!(" {} {}", col.name, format_data_type(&col.data_type));
325
326 if !col.nullable {
327 col_def.push_str(" NOT NULL");
328 }
329
330 if let Some(ref default) = col.default_value {
331 col_def.push_str(&format!(" DEFAULT {:?}", default));
332 }
333
334 definitions.push(col_def);
335 }
336
337 if let Some(ref pk_cols) = table.primary_key {
339 definitions.push(format!(" PRIMARY KEY ({})", pk_cols.join(", ")));
340 }
341
342 for unique_cols in &table.unique_constraints {
344 definitions.push(format!(" UNIQUE ({})", unique_cols.join(", ")));
345 }
346
347 for fk in &table.foreign_keys {
349 let mut fk_def = format!(
350 " FOREIGN KEY ({}) REFERENCES {} ({})",
351 fk.column_names.join(", "),
352 fk.parent_table,
353 fk.parent_column_names.join(", ")
354 );
355
356 fk_def.push_str(&format!(" ON DELETE {}", format_referential_action(&fk.on_delete)));
358
359 fk_def.push_str(&format!(" ON UPDATE {}", format_referential_action(&fk.on_update)));
361
362 definitions.push(fk_def);
363 }
364
365 for (name, _expr) in &table.check_constraints {
367 definitions.push(format!(" CONSTRAINT {} CHECK (...)", name));
368 }
369
370 sql.push_str(&definitions.join(",\n"));
371 sql.push_str("\n)");
372
373 let rows = vec![Row::new(vec![
374 SqlValue::Varchar(table.name.clone()),
375 SqlValue::Varchar(sql),
376 ])];
377
378 Ok(SelectResult {
379 columns: vec!["Table".to_string(), "Create Table".to_string()],
380 rows,
381 })
382 }
383}
384
385fn like_match(pattern: &str, text: &str) -> bool {
389 let pattern = pattern.to_lowercase();
390 let text = text.to_lowercase();
391
392 let pattern_chars: Vec<char> = pattern.chars().collect();
393 let text_chars: Vec<char> = text.chars().collect();
394
395 like_match_impl(&pattern_chars, &text_chars)
396}
397
398fn like_match_impl(pattern: &[char], text: &[char]) -> bool {
400 match (pattern.first(), text.first()) {
401 (None, None) => true,
402 (None, Some(_)) => false,
403 (Some('%'), _) => {
404 like_match_impl(&pattern[1..], text) ||
407 (!text.is_empty() && like_match_impl(pattern, &text[1..]))
408 }
409 (Some('_'), Some(_)) => {
410 like_match_impl(&pattern[1..], &text[1..])
412 }
413 (Some('_'), None) => false,
414 (Some('\\'), _) if pattern.len() > 1 => {
415 if text.first() == Some(&pattern[1]) {
417 like_match_impl(&pattern[2..], &text[1..])
418 } else {
419 false
420 }
421 }
422 (Some(p), Some(t)) if *p == *t => {
423 like_match_impl(&pattern[1..], &text[1..])
424 }
425 _ => false,
426 }
427}
428
429fn format_data_type(dt: &vibesql_types::DataType) -> String {
431 use vibesql_types::DataType;
432
433 match dt {
434 DataType::Integer => "INT".to_string(),
435 DataType::Smallint => "SMALLINT".to_string(),
436 DataType::Bigint => "BIGINT".to_string(),
437 DataType::Unsigned => "UNSIGNED BIGINT".to_string(),
438 DataType::Real => "REAL".to_string(),
439 DataType::Float { precision } => format!("FLOAT({})", precision),
440 DataType::DoublePrecision => "DOUBLE PRECISION".to_string(),
441 DataType::Numeric { precision, scale } => {
442 format!("NUMERIC({}, {})", precision, scale)
443 }
444 DataType::Decimal { precision, scale } => {
445 format!("DECIMAL({}, {})", precision, scale)
446 }
447 DataType::Varchar { max_length } => {
448 if let Some(len) = max_length {
449 format!("VARCHAR({})", len)
450 } else {
451 "VARCHAR".to_string()
452 }
453 }
454 DataType::Character { length } => {
455 format!("CHAR({})", length)
456 }
457 DataType::CharacterLargeObject => "CLOB".to_string(),
458 DataType::Name => "NAME".to_string(),
459 DataType::Boolean => "BOOLEAN".to_string(),
460 DataType::Date => "DATE".to_string(),
461 DataType::Time { with_timezone } => {
462 if *with_timezone {
463 "TIME WITH TIME ZONE".to_string()
464 } else {
465 "TIME".to_string()
466 }
467 }
468 DataType::Timestamp { with_timezone } => {
469 if *with_timezone {
470 "TIMESTAMP WITH TIME ZONE".to_string()
471 } else {
472 "TIMESTAMP".to_string()
473 }
474 }
475 DataType::Interval { start_field, end_field } => {
476 if let Some(end) = end_field {
477 format!("INTERVAL {:?} TO {:?}", start_field, end)
478 } else {
479 format!("INTERVAL {:?}", start_field)
480 }
481 }
482 DataType::BinaryLargeObject => "BLOB".to_string(),
483 DataType::Bit { length } => {
484 if let Some(len) = length {
485 format!("BIT({})", len)
486 } else {
487 "BIT".to_string()
488 }
489 }
490 DataType::UserDefined { type_name } => type_name.clone(),
491 DataType::Vector { dimensions } => format!("VECTOR({})", dimensions),
492 DataType::Null => "NULL".to_string(),
493 }
494}
495
496fn format_referential_action(action: &ReferentialAction) -> &'static str {
498 match action {
499 ReferentialAction::NoAction => "NO ACTION",
500 ReferentialAction::Restrict => "RESTRICT",
501 ReferentialAction::Cascade => "CASCADE",
502 ReferentialAction::SetNull => "SET NULL",
503 ReferentialAction::SetDefault => "SET DEFAULT",
504 }
505}
506
507#[cfg(test)]
508mod tests {
509 use super::*;
510 use vibesql_catalog::{ColumnSchema, TableSchema};
511 use vibesql_types::DataType;
512
513 fn create_test_db() -> Database {
514 let mut db = Database::new();
515 db.catalog.set_case_sensitive_identifiers(false);
516
517 let columns = vec![
519 ColumnSchema::new("id".to_string(), DataType::Integer, false),
520 ColumnSchema::new("name".to_string(), DataType::Varchar { max_length: Some(100) }, true),
521 ColumnSchema::new("email".to_string(), DataType::Varchar { max_length: Some(255) }, false),
522 ];
523 let schema = TableSchema::with_primary_key(
524 "users".to_string(),
525 columns,
526 vec!["id".to_string()],
527 );
528 db.create_table(schema).unwrap();
529
530 let order_columns = vec![
532 ColumnSchema::new("id".to_string(), DataType::Integer, false),
533 ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
534 ColumnSchema::new("total".to_string(), DataType::Decimal { precision: 10, scale: 2 }, true),
535 ];
536 let order_schema = TableSchema::with_primary_key(
537 "orders".to_string(),
538 order_columns,
539 vec!["id".to_string()],
540 );
541 db.create_table(order_schema).unwrap();
542
543 db
544 }
545
546 #[test]
547 fn test_show_tables() {
548 let db = create_test_db();
549 let executor = IntrospectionExecutor::new(&db);
550
551 let stmt = ShowTablesStmt {
552 database: None,
553 like_pattern: None,
554 where_clause: None,
555 };
556
557 let result = executor.execute_show_tables(&stmt).unwrap();
558 assert_eq!(result.columns.len(), 1);
559 assert!(result.columns[0].starts_with("Tables_in_"));
560
561 assert_eq!(result.rows.len(), 2);
563
564 let table_names: Vec<&str> = result.rows.iter()
566 .filter_map(|r| match &r.values[0] {
567 SqlValue::Varchar(s) => Some(s.as_str()),
568 _ => None,
569 })
570 .collect();
571
572 assert!(table_names.contains(&"users") || table_names.contains(&"USERS"));
573 assert!(table_names.contains(&"orders") || table_names.contains(&"ORDERS"));
574 }
575
576 #[test]
577 fn test_show_tables_with_like() {
578 let db = create_test_db();
579 let executor = IntrospectionExecutor::new(&db);
580
581 let stmt = ShowTablesStmt {
582 database: None,
583 like_pattern: Some("user%".to_string()),
584 where_clause: None,
585 };
586
587 let result = executor.execute_show_tables(&stmt).unwrap();
588 assert_eq!(result.rows.len(), 1);
590 }
591
592 #[test]
593 fn test_show_databases() {
594 let db = create_test_db();
595 let executor = IntrospectionExecutor::new(&db);
596
597 let stmt = ShowDatabasesStmt {
598 like_pattern: None,
599 where_clause: None,
600 };
601
602 let result = executor.execute_show_databases(&stmt).unwrap();
603 assert_eq!(result.columns, vec!["Database"]);
604
605 assert!(!result.rows.is_empty());
607
608 let schema_names: Vec<&str> = result.rows.iter()
609 .filter_map(|r| match &r.values[0] {
610 SqlValue::Varchar(s) => Some(s.as_str()),
611 _ => None,
612 })
613 .collect();
614
615 assert!(schema_names.contains(&"public"));
616 }
617
618 #[test]
619 fn test_show_columns() {
620 let db = create_test_db();
621 let executor = IntrospectionExecutor::new(&db);
622
623 let stmt = ShowColumnsStmt {
624 table_name: "users".to_string(),
625 database: None,
626 full: false,
627 like_pattern: None,
628 where_clause: None,
629 };
630
631 let result = executor.execute_show_columns(&stmt).unwrap();
632 assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
633 assert_eq!(result.rows.len(), 3); }
635
636 #[test]
637 fn test_show_columns_full() {
638 let db = create_test_db();
639 let executor = IntrospectionExecutor::new(&db);
640
641 let stmt = ShowColumnsStmt {
642 table_name: "users".to_string(),
643 database: None,
644 full: true,
645 like_pattern: None,
646 where_clause: None,
647 };
648
649 let result = executor.execute_show_columns(&stmt).unwrap();
650 assert_eq!(result.columns.len(), 9);
651 assert!(result.columns.contains(&"Privileges".to_string()));
652 assert!(result.columns.contains(&"Comment".to_string()));
653 }
654
655 #[test]
656 fn test_describe() {
657 let db = create_test_db();
658 let executor = IntrospectionExecutor::new(&db);
659
660 let stmt = DescribeStmt {
661 table_name: "users".to_string(),
662 column_pattern: None,
663 };
664
665 let result = executor.execute_describe(&stmt).unwrap();
666 assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
667 assert_eq!(result.rows.len(), 3);
668 }
669
670 #[test]
671 fn test_show_index() {
672 let db = create_test_db();
673 let executor = IntrospectionExecutor::new(&db);
674
675 let stmt = ShowIndexStmt {
676 table_name: "users".to_string(),
677 database: None,
678 };
679
680 let result = executor.execute_show_index(&stmt).unwrap();
681
682 assert!(!result.rows.is_empty());
684
685 let has_primary = result.rows.iter().any(|r| {
687 match &r.values[2] { SqlValue::Varchar(s) => s == "PRIMARY",
689 _ => false,
690 }
691 });
692 assert!(has_primary);
693 }
694
695 #[test]
696 fn test_show_create_table() {
697 let db = create_test_db();
698 let executor = IntrospectionExecutor::new(&db);
699
700 let stmt = ShowCreateTableStmt {
701 table_name: "users".to_string(),
702 };
703
704 let result = executor.execute_show_create_table(&stmt).unwrap();
705 assert_eq!(result.columns, vec!["Table", "Create Table"]);
706 assert_eq!(result.rows.len(), 1);
707
708 if let SqlValue::Varchar(sql) = &result.rows[0].values[1] {
710 assert!(sql.contains("CREATE TABLE"));
711 assert!(sql.contains("users"));
712 assert!(sql.contains("id"));
713 assert!(sql.contains("name"));
714 assert!(sql.contains("email"));
715 assert!(sql.contains("PRIMARY KEY"));
716 } else {
717 panic!("Expected VARCHAR for Create Table");
718 }
719 }
720
721 #[test]
722 fn test_like_match() {
723 assert!(like_match("test", "test"));
725 assert!(like_match("test", "TEST")); assert!(!like_match("test", "testing"));
727
728 assert!(like_match("test%", "test"));
730 assert!(like_match("test%", "testing"));
731 assert!(like_match("%test", "test"));
732 assert!(like_match("%test", "mytest"));
733 assert!(like_match("%test%", "test"));
734 assert!(like_match("%test%", "mytesting"));
735
736 assert!(like_match("te_t", "test"));
738 assert!(!like_match("te_t", "tet"));
739 assert!(!like_match("te_t", "testt"));
740
741 assert!(like_match("te\\_t", "te_t"));
743 assert!(!like_match("te\\_t", "test"));
744 }
745}