vibesql_executor/
introspection.rs

1//! Database introspection command execution (SHOW, DESCRIBE)
2//!
3//! This module implements MySQL-style introspection commands that query
4//! database metadata and return result sets.
5
6use vibesql_ast::{
7    DescribeStmt, ShowColumnsStmt, ShowCreateTableStmt, ShowDatabasesStmt, ShowIndexStmt,
8    ShowTablesStmt,
9};
10use vibesql_catalog::{IndexType, ReferentialAction, SortOrder};
11use vibesql_storage::{Database, Row};
12use vibesql_types::SqlValue;
13
14use crate::{errors::ExecutorError, select::SelectResult};
15
16/// Executor for database introspection commands
17pub struct IntrospectionExecutor<'a> {
18    db: &'a Database,
19}
20
21impl<'a> IntrospectionExecutor<'a> {
22    /// Create a new introspection executor
23    pub fn new(db: &'a Database) -> Self {
24        Self { db }
25    }
26
27    /// Execute SHOW TABLES statement
28    ///
29    /// Returns a result set with a single column "Tables_in_<database>" containing
30    /// the names of all tables in the specified (or current) schema.
31    pub fn execute_show_tables(
32        &self,
33        stmt: &ShowTablesStmt,
34    ) -> Result<SelectResult, ExecutorError> {
35        // Get the schema to query (default to current schema)
36        let schema_name =
37            stmt.database.as_deref().unwrap_or_else(|| self.db.catalog.get_current_schema());
38
39        // Get tables from the schema
40        let tables = if let Some(schema) = self.db.catalog.get_schema(schema_name) {
41            schema.list_tables()
42        } else {
43            // Try case-insensitive lookup
44            let upper_name = schema_name.to_uppercase();
45            self.db
46                .catalog
47                .list_schemas()
48                .into_iter()
49                .find(|s| s.to_uppercase() == upper_name)
50                .and_then(|actual_name| self.db.catalog.get_schema(&actual_name))
51                .map(|schema| schema.list_tables())
52                .unwrap_or_default()
53        };
54
55        // Filter by LIKE pattern if provided
56        let filtered_tables: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
57            tables.into_iter().filter(|t| like_match(pattern, t)).collect()
58        } else {
59            tables
60        };
61
62        // Build result set
63        let column_name = format!("Tables_in_{}", schema_name);
64        let rows: Vec<Row> = filtered_tables
65            .into_iter()
66            .map(|name| Row::new(vec![SqlValue::Varchar(arcstr::ArcStr::from(name))]))
67            .collect();
68
69        Ok(SelectResult { columns: vec![column_name], rows })
70    }
71
72    /// Execute SHOW DATABASES statement
73    ///
74    /// Returns a result set with a single column "Database" containing
75    /// the names of all schemas (databases) in the catalog.
76    pub fn execute_show_databases(
77        &self,
78        stmt: &ShowDatabasesStmt,
79    ) -> Result<SelectResult, ExecutorError> {
80        let schemas = self.db.catalog.list_schemas();
81
82        // Filter by LIKE pattern if provided
83        let filtered_schemas: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
84            schemas.into_iter().filter(|s| like_match(pattern, s)).collect()
85        } else {
86            schemas
87        };
88
89        let rows: Vec<Row> = filtered_schemas
90            .into_iter()
91            .map(|name| Row::new(vec![SqlValue::Varchar(arcstr::ArcStr::from(name))]))
92            .collect();
93
94        Ok(SelectResult { columns: vec!["Database".to_string()], rows })
95    }
96
97    /// Execute SHOW COLUMNS statement
98    ///
99    /// Returns a result set with columns: Field, Type, Null, Key, Default, Extra
100    pub fn execute_show_columns(
101        &self,
102        stmt: &ShowColumnsStmt,
103    ) -> Result<SelectResult, ExecutorError> {
104        // Get the table
105        let table_name = if let Some(ref db) = stmt.database {
106            format!("{}.{}", db, stmt.table_name)
107        } else {
108            stmt.table_name.clone()
109        };
110
111        let table = self
112            .db
113            .catalog
114            .get_table(&table_name)
115            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
116
117        // Determine which columns are part of the primary key
118        let pk_columns: std::collections::HashSet<String> =
119            table.primary_key.as_ref().map(|pk| pk.iter().cloned().collect()).unwrap_or_default();
120
121        // Determine which columns are part of unique constraints
122        let unique_columns: std::collections::HashSet<String> =
123            table.unique_constraints.iter().flatten().cloned().collect();
124
125        let mut rows: Vec<Row> = Vec::with_capacity(table.columns.len());
126
127        for col in &table.columns {
128            // Check if column matches LIKE pattern
129            if let Some(ref pattern) = stmt.like_pattern {
130                if !like_match(pattern, &col.name) {
131                    continue;
132                }
133            }
134
135            // Field: column name
136            let field = SqlValue::Varchar(arcstr::ArcStr::from(col.name.clone()));
137
138            // Type: data type as string
139            let type_str = format_data_type(&col.data_type);
140            let col_type = SqlValue::Varchar(arcstr::ArcStr::from(type_str));
141
142            // Null: YES or NO
143            let nullable = SqlValue::Varchar(if col.nullable { "YES" } else { "NO" }.into());
144
145            // Key: PRI for primary key, UNI for unique, empty otherwise
146            let key = if pk_columns.contains(&col.name) {
147                SqlValue::Varchar("PRI".into())
148            } else if unique_columns.contains(&col.name) {
149                SqlValue::Varchar("UNI".into())
150            } else {
151                SqlValue::Varchar("".into())
152            };
153
154            // Default: default value or NULL
155            let default = col
156                .default_value
157                .as_ref()
158                .map(|expr| SqlValue::Varchar(arcstr::ArcStr::from(format!("{:?}", expr))))
159                .unwrap_or(SqlValue::Null);
160
161            // Extra: auto_increment, etc. (we don't have auto_increment info yet)
162            let extra = SqlValue::Varchar("".into());
163
164            if stmt.full {
165                // SHOW FULL COLUMNS includes: Collation, Privileges, Comment
166                rows.push(Row::new(vec![
167                    field,
168                    col_type,
169                    SqlValue::Null, // Collation
170                    nullable,
171                    key,
172                    default,
173                    extra,
174                    SqlValue::Varchar("select,insert,update,references".into()), // Privileges
175                    SqlValue::Varchar("".into()),                                // Comment
176                ]));
177            } else {
178                rows.push(Row::new(vec![field, col_type, nullable, key, default, extra]));
179            }
180        }
181
182        let columns = if stmt.full {
183            vec![
184                "Field".to_string(),
185                "Type".to_string(),
186                "Collation".to_string(),
187                "Null".to_string(),
188                "Key".to_string(),
189                "Default".to_string(),
190                "Extra".to_string(),
191                "Privileges".to_string(),
192                "Comment".to_string(),
193            ]
194        } else {
195            vec![
196                "Field".to_string(),
197                "Type".to_string(),
198                "Null".to_string(),
199                "Key".to_string(),
200                "Default".to_string(),
201                "Extra".to_string(),
202            ]
203        };
204
205        Ok(SelectResult { columns, rows })
206    }
207
208    /// Execute DESCRIBE statement (alias for SHOW COLUMNS)
209    pub fn execute_describe(&self, stmt: &DescribeStmt) -> Result<SelectResult, ExecutorError> {
210        // DESCRIBE is equivalent to SHOW COLUMNS FROM table
211        let show_columns = ShowColumnsStmt {
212            table_name: stmt.table_name.clone(),
213            database: None,
214            full: false,
215            like_pattern: stmt.column_pattern.clone(),
216            where_clause: None,
217        };
218        self.execute_show_columns(&show_columns)
219    }
220
221    /// Execute SHOW INDEX statement
222    ///
223    /// Returns a result set with index information in MySQL format.
224    pub fn execute_show_index(&self, stmt: &ShowIndexStmt) -> Result<SelectResult, ExecutorError> {
225        // Get the table name (with optional database qualifier)
226        let table_name = if let Some(ref db) = stmt.database {
227            format!("{}.{}", db, stmt.table_name)
228        } else {
229            stmt.table_name.clone()
230        };
231
232        // Verify table exists
233        let table = self
234            .db
235            .catalog
236            .get_table(&table_name)
237            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
238
239        let mut rows: Vec<Row> = Vec::new();
240
241        // Add primary key as an index (if present)
242        if let Some(ref pk_cols) = table.primary_key {
243            for (seq, col_name) in pk_cols.iter().enumerate() {
244                rows.push(Row::new(vec![
245                    SqlValue::Varchar(arcstr::ArcStr::from(stmt.table_name.clone())), // Table
246                    SqlValue::Integer(0), // Non_unique (0 = unique)
247                    SqlValue::Varchar("PRIMARY".into()), // Key_name
248                    SqlValue::Integer((seq + 1) as i64), // Seq_in_index
249                    SqlValue::Varchar(arcstr::ArcStr::from(col_name.to_string())), // Column_name
250                    SqlValue::Varchar("A".into()), // Collation (A = ascending)
251                    SqlValue::Null,       // Cardinality
252                    SqlValue::Null,       // Sub_part
253                    SqlValue::Null,       // Packed
254                    SqlValue::Varchar("".into()), // Null
255                    SqlValue::Varchar("BTREE".into()), // Index_type
256                    SqlValue::Varchar("".into()), // Comment
257                    SqlValue::Varchar("".into()), // Index_comment
258                    SqlValue::Varchar("YES".into()), // Visible
259                ]));
260            }
261        }
262
263        // Add explicit indexes from catalog
264        let indexes = self.db.catalog.get_table_indexes(&stmt.table_name);
265        for index in indexes {
266            for (seq, col) in index.columns.iter().enumerate() {
267                let non_unique = if index.is_unique { 0 } else { 1 };
268                let collation = match col.order() {
269                    SortOrder::Ascending => "A",
270                    SortOrder::Descending => "D",
271                };
272                let index_type = match index.index_type {
273                    IndexType::BTree => "BTREE",
274                    IndexType::Hash => "HASH",
275                    IndexType::RTree => "RTREE",
276                    IndexType::Fulltext => "FULLTEXT",
277                    IndexType::IVFFlat { .. } => "IVFFLAT",
278                    IndexType::Hnsw { .. } => "HNSW",
279                };
280
281                // Get column name or expression representation
282                let column_name = col
283                    .column_name()
284                    .map(|s| s.to_string())
285                    .unwrap_or_else(|| "(expression)".to_string());
286
287                rows.push(Row::new(vec![
288                    SqlValue::Varchar(arcstr::ArcStr::from(stmt.table_name.clone())), // Table
289                    SqlValue::Integer(non_unique),                                    // Non_unique
290                    SqlValue::Varchar(arcstr::ArcStr::from(index.name.clone())),      // Key_name
291                    SqlValue::Integer((seq + 1) as i64),                              // Seq_in_index
292                    SqlValue::Varchar(arcstr::ArcStr::from(column_name)),             // Column_name
293                    SqlValue::Varchar(collation.into()),                              // Collation
294                    SqlValue::Null,                                                   // Cardinality
295                    col.prefix_length()
296                        .map(|l| SqlValue::Integer(l as i64))
297                        .unwrap_or(SqlValue::Null), // Sub_part
298                    SqlValue::Null,                                                   // Packed
299                    SqlValue::Varchar("".into()),                                     // Null
300                    SqlValue::Varchar(index_type.into()),                             // Index_type
301                    SqlValue::Varchar("".into()),                                     // Comment
302                    SqlValue::Varchar("".into()),                                     // Index_comment
303                    SqlValue::Varchar("YES".into()),                                  // Visible
304                ]));
305            }
306        }
307
308        Ok(SelectResult {
309            columns: vec![
310                "Table".to_string(),
311                "Non_unique".to_string(),
312                "Key_name".to_string(),
313                "Seq_in_index".to_string(),
314                "Column_name".to_string(),
315                "Collation".to_string(),
316                "Cardinality".to_string(),
317                "Sub_part".to_string(),
318                "Packed".to_string(),
319                "Null".to_string(),
320                "Index_type".to_string(),
321                "Comment".to_string(),
322                "Index_comment".to_string(),
323                "Visible".to_string(),
324            ],
325            rows,
326        })
327    }
328
329    /// Execute SHOW CREATE TABLE statement
330    ///
331    /// Returns a result set with columns: Table, Create Table
332    pub fn execute_show_create_table(
333        &self,
334        stmt: &ShowCreateTableStmt,
335    ) -> Result<SelectResult, ExecutorError> {
336        let table = self
337            .db
338            .catalog
339            .get_table(&stmt.table_name)
340            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
341
342        // Build the CREATE TABLE statement
343        let mut sql = format!("CREATE TABLE {} (\n", table.name);
344        let mut definitions: Vec<String> = Vec::new();
345
346        // Add column definitions
347        for col in &table.columns {
348            let mut col_def = format!("  {} {}", col.name, format_data_type(&col.data_type));
349
350            if !col.nullable {
351                col_def.push_str(" NOT NULL");
352            }
353
354            if let Some(ref default) = col.default_value {
355                col_def.push_str(&format!(" DEFAULT {:?}", default));
356            }
357
358            definitions.push(col_def);
359        }
360
361        // Add PRIMARY KEY constraint
362        if let Some(ref pk_cols) = table.primary_key {
363            definitions.push(format!("  PRIMARY KEY ({})", pk_cols.join(", ")));
364        }
365
366        // Add UNIQUE constraints
367        for unique_cols in &table.unique_constraints {
368            definitions.push(format!("  UNIQUE ({})", unique_cols.join(", ")));
369        }
370
371        // Add FOREIGN KEY constraints
372        for fk in &table.foreign_keys {
373            let mut fk_def = format!(
374                "  FOREIGN KEY ({}) REFERENCES {} ({})",
375                fk.column_names.join(", "),
376                fk.parent_table,
377                fk.parent_column_names.join(", ")
378            );
379
380            // Add ON DELETE action
381            fk_def.push_str(&format!(" ON DELETE {}", format_referential_action(&fk.on_delete)));
382
383            // Add ON UPDATE action
384            fk_def.push_str(&format!(" ON UPDATE {}", format_referential_action(&fk.on_update)));
385
386            definitions.push(fk_def);
387        }
388
389        // Add CHECK constraints
390        for (name, _expr) in &table.check_constraints {
391            definitions.push(format!("  CONSTRAINT {} CHECK (...)", name));
392        }
393
394        sql.push_str(&definitions.join(",\n"));
395        sql.push_str("\n)");
396
397        let rows = vec![Row::new(vec![
398            SqlValue::Varchar(arcstr::ArcStr::from(table.name.clone())),
399            SqlValue::Varchar(arcstr::ArcStr::from(sql)),
400        ])];
401
402        Ok(SelectResult { columns: vec!["Table".to_string(), "Create Table".to_string()], rows })
403    }
404}
405
406/// Match a SQL LIKE pattern against a string (case-insensitive)
407///
408/// Supports % (match zero or more characters) and _ (match exactly one character)
409fn like_match(pattern: &str, text: &str) -> bool {
410    let pattern = pattern.to_lowercase();
411    let text = text.to_lowercase();
412
413    let pattern_chars: Vec<char> = pattern.chars().collect();
414    let text_chars: Vec<char> = text.chars().collect();
415
416    like_match_impl(&pattern_chars, &text_chars)
417}
418
419/// Recursive implementation of LIKE pattern matching
420fn like_match_impl(pattern: &[char], text: &[char]) -> bool {
421    match (pattern.first(), text.first()) {
422        (None, None) => true,
423        (None, Some(_)) => false,
424        (Some('%'), _) => {
425            // % matches zero or more characters
426            // Try matching 0 characters (skip %) or matching 1 character
427            like_match_impl(&pattern[1..], text)
428                || (!text.is_empty() && like_match_impl(pattern, &text[1..]))
429        }
430        (Some('_'), Some(_)) => {
431            // _ matches exactly one character
432            like_match_impl(&pattern[1..], &text[1..])
433        }
434        (Some('_'), None) => false,
435        (Some('\\'), _) if pattern.len() > 1 => {
436            // Escape sequence - match the next character literally
437            if text.first() == Some(&pattern[1]) {
438                like_match_impl(&pattern[2..], &text[1..])
439            } else {
440                false
441            }
442        }
443        (Some(p), Some(t)) if *p == *t => like_match_impl(&pattern[1..], &text[1..]),
444        _ => false,
445    }
446}
447
448/// Format a DataType as a SQL string
449fn format_data_type(dt: &vibesql_types::DataType) -> String {
450    use vibesql_types::DataType;
451
452    match dt {
453        DataType::Integer => "INT".to_string(),
454        DataType::Smallint => "SMALLINT".to_string(),
455        DataType::Bigint => "BIGINT".to_string(),
456        DataType::Unsigned => "UNSIGNED BIGINT".to_string(),
457        DataType::Real => "REAL".to_string(),
458        DataType::Float { precision } => format!("FLOAT({})", precision),
459        DataType::DoublePrecision => "DOUBLE PRECISION".to_string(),
460        DataType::Numeric { precision, scale } => {
461            format!("NUMERIC({}, {})", precision, scale)
462        }
463        DataType::Decimal { precision, scale } => {
464            format!("DECIMAL({}, {})", precision, scale)
465        }
466        DataType::Varchar { max_length } => {
467            if let Some(len) = max_length {
468                format!("VARCHAR({})", len)
469            } else {
470                "VARCHAR".to_string()
471            }
472        }
473        DataType::Character { length } => {
474            format!("CHAR({})", length)
475        }
476        DataType::CharacterLargeObject => "CLOB".to_string(),
477        DataType::Name => "NAME".to_string(),
478        DataType::Boolean => "BOOLEAN".to_string(),
479        DataType::Date => "DATE".to_string(),
480        DataType::Time { with_timezone } => {
481            if *with_timezone {
482                "TIME WITH TIME ZONE".to_string()
483            } else {
484                "TIME".to_string()
485            }
486        }
487        DataType::Timestamp { with_timezone } => {
488            if *with_timezone {
489                "TIMESTAMP WITH TIME ZONE".to_string()
490            } else {
491                "TIMESTAMP".to_string()
492            }
493        }
494        DataType::Interval { start_field, end_field } => {
495            if let Some(end) = end_field {
496                format!("INTERVAL {:?} TO {:?}", start_field, end)
497            } else {
498                format!("INTERVAL {:?}", start_field)
499            }
500        }
501        DataType::BinaryLargeObject => "BLOB".to_string(),
502        DataType::Bit { length } => {
503            if let Some(len) = length {
504                format!("BIT({})", len)
505            } else {
506                "BIT".to_string()
507            }
508        }
509        DataType::UserDefined { type_name } => type_name.clone(),
510        DataType::Vector { dimensions } => format!("VECTOR({})", dimensions),
511        DataType::Null => "NULL".to_string(),
512    }
513}
514
515/// Format a referential action as a SQL string
516fn format_referential_action(action: &ReferentialAction) -> &'static str {
517    match action {
518        ReferentialAction::NoAction => "NO ACTION",
519        ReferentialAction::Restrict => "RESTRICT",
520        ReferentialAction::Cascade => "CASCADE",
521        ReferentialAction::SetNull => "SET NULL",
522        ReferentialAction::SetDefault => "SET DEFAULT",
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use vibesql_catalog::{ColumnSchema, TableSchema};
529    use vibesql_types::DataType;
530
531    use super::*;
532
533    fn create_test_db() -> Database {
534        let mut db = Database::new();
535        db.catalog.set_case_sensitive_identifiers(false);
536
537        // Create a users table
538        let columns = vec![
539            ColumnSchema::new("id".to_string(), DataType::Integer, false),
540            ColumnSchema::new(
541                "name".to_string(),
542                DataType::Varchar { max_length: Some(100) },
543                true,
544            ),
545            ColumnSchema::new(
546                "email".to_string(),
547                DataType::Varchar { max_length: Some(255) },
548                false,
549            ),
550        ];
551        let schema =
552            TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
553        db.create_table(schema).unwrap();
554
555        // Create an orders table
556        let order_columns = vec![
557            ColumnSchema::new("id".to_string(), DataType::Integer, false),
558            ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
559            ColumnSchema::new(
560                "total".to_string(),
561                DataType::Decimal { precision: 10, scale: 2 },
562                true,
563            ),
564        ];
565        let order_schema = TableSchema::with_primary_key(
566            "orders".to_string(),
567            order_columns,
568            vec!["id".to_string()],
569        );
570        db.create_table(order_schema).unwrap();
571
572        db
573    }
574
575    #[test]
576    fn test_show_tables() {
577        let db = create_test_db();
578        let executor = IntrospectionExecutor::new(&db);
579
580        let stmt = ShowTablesStmt { database: None, like_pattern: None, where_clause: None };
581
582        let result = executor.execute_show_tables(&stmt).unwrap();
583        assert_eq!(result.columns.len(), 1);
584        assert!(result.columns[0].starts_with("Tables_in_"));
585
586        // Should have 2 tables
587        assert_eq!(result.rows.len(), 2);
588
589        // Extract table names
590        let table_names: Vec<&str> = result
591            .rows
592            .iter()
593            .filter_map(|r| match &r.values[0] {
594                SqlValue::Varchar(s) => Some(s.as_ref()),
595                _ => None,
596            })
597            .collect();
598
599        assert!(table_names.contains(&"users") || table_names.contains(&"USERS"));
600        assert!(table_names.contains(&"orders") || table_names.contains(&"ORDERS"));
601    }
602
603    #[test]
604    fn test_show_tables_with_like() {
605        let db = create_test_db();
606        let executor = IntrospectionExecutor::new(&db);
607
608        let stmt = ShowTablesStmt {
609            database: None,
610            like_pattern: Some("user%".to_string()),
611            where_clause: None,
612        };
613
614        let result = executor.execute_show_tables(&stmt).unwrap();
615        // Only 'users' should match
616        assert_eq!(result.rows.len(), 1);
617    }
618
619    #[test]
620    fn test_show_databases() {
621        let db = create_test_db();
622        let executor = IntrospectionExecutor::new(&db);
623
624        let stmt = ShowDatabasesStmt { like_pattern: None, where_clause: None };
625
626        let result = executor.execute_show_databases(&stmt).unwrap();
627        assert_eq!(result.columns, vec!["Database"]);
628
629        // Should have at least the default schema
630        assert!(!result.rows.is_empty());
631
632        let schema_names: Vec<&str> = result
633            .rows
634            .iter()
635            .filter_map(|r| match &r.values[0] {
636                SqlValue::Varchar(s) => Some(s.as_ref()),
637                _ => None,
638            })
639            .collect();
640
641        assert!(schema_names.contains(&vibesql_catalog::DEFAULT_SCHEMA));
642    }
643
644    #[test]
645    fn test_show_columns() {
646        let db = create_test_db();
647        let executor = IntrospectionExecutor::new(&db);
648
649        let stmt = ShowColumnsStmt {
650            table_name: "users".to_string(),
651            database: None,
652            full: false,
653            like_pattern: None,
654            where_clause: None,
655        };
656
657        let result = executor.execute_show_columns(&stmt).unwrap();
658        assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
659        assert_eq!(result.rows.len(), 3); // id, name, email
660    }
661
662    #[test]
663    fn test_show_columns_full() {
664        let db = create_test_db();
665        let executor = IntrospectionExecutor::new(&db);
666
667        let stmt = ShowColumnsStmt {
668            table_name: "users".to_string(),
669            database: None,
670            full: true,
671            like_pattern: None,
672            where_clause: None,
673        };
674
675        let result = executor.execute_show_columns(&stmt).unwrap();
676        assert_eq!(result.columns.len(), 9);
677        assert!(result.columns.contains(&"Privileges".to_string()));
678        assert!(result.columns.contains(&"Comment".to_string()));
679    }
680
681    #[test]
682    fn test_describe() {
683        let db = create_test_db();
684        let executor = IntrospectionExecutor::new(&db);
685
686        let stmt = DescribeStmt { table_name: "users".to_string(), column_pattern: None };
687
688        let result = executor.execute_describe(&stmt).unwrap();
689        assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
690        assert_eq!(result.rows.len(), 3);
691    }
692
693    #[test]
694    fn test_show_index() {
695        let db = create_test_db();
696        let executor = IntrospectionExecutor::new(&db);
697
698        let stmt = ShowIndexStmt { table_name: "users".to_string(), database: None };
699
700        let result = executor.execute_show_index(&stmt).unwrap();
701
702        // Should have at least the PRIMARY key index
703        assert!(!result.rows.is_empty());
704
705        // Check that PRIMARY KEY index is present
706        let has_primary = result.rows.iter().any(|r| {
707            match &r.values[2] {
708                // Key_name column
709                SqlValue::Varchar(s) => s.as_str() == "PRIMARY",
710                _ => false,
711            }
712        });
713        assert!(has_primary);
714    }
715
716    #[test]
717    fn test_show_create_table() {
718        let db = create_test_db();
719        let executor = IntrospectionExecutor::new(&db);
720
721        let stmt = ShowCreateTableStmt { table_name: "users".to_string() };
722
723        let result = executor.execute_show_create_table(&stmt).unwrap();
724        assert_eq!(result.columns, vec!["Table", "Create Table"]);
725        assert_eq!(result.rows.len(), 1);
726
727        // Check the CREATE TABLE statement contains expected elements
728        if let SqlValue::Varchar(sql) = &result.rows[0].values[1] {
729            assert!(sql.contains("CREATE TABLE"));
730            assert!(sql.contains("users"));
731            assert!(sql.contains("id"));
732            assert!(sql.contains("name"));
733            assert!(sql.contains("email"));
734            assert!(sql.contains("PRIMARY KEY"));
735        } else {
736            panic!("Expected VARCHAR for Create Table");
737        }
738    }
739
740    #[test]
741    fn test_like_match() {
742        // Exact match
743        assert!(like_match("test", "test"));
744        assert!(like_match("test", "TEST")); // case-insensitive
745        assert!(!like_match("test", "testing"));
746
747        // % matches zero or more characters
748        assert!(like_match("test%", "test"));
749        assert!(like_match("test%", "testing"));
750        assert!(like_match("%test", "test"));
751        assert!(like_match("%test", "mytest"));
752        assert!(like_match("%test%", "test"));
753        assert!(like_match("%test%", "mytesting"));
754
755        // _ matches exactly one character
756        assert!(like_match("te_t", "test"));
757        assert!(!like_match("te_t", "tet"));
758        assert!(!like_match("te_t", "testt"));
759
760        // Escape sequences
761        assert!(like_match("te\\_t", "te_t"));
762        assert!(!like_match("te\\_t", "test"));
763    }
764}