vibesql_executor/
introspection.rs

1//! Database introspection command execution (SHOW, DESCRIBE)
2//!
3//! This module implements MySQL-style introspection commands that query
4//! database metadata and return result sets.
5
6use vibesql_ast::{DescribeStmt, ShowColumnsStmt, ShowCreateTableStmt, ShowDatabasesStmt, ShowIndexStmt, ShowTablesStmt};
7use vibesql_catalog::{IndexType, ReferentialAction, SortOrder};
8use vibesql_storage::{Database, Row};
9use vibesql_types::SqlValue;
10
11use crate::errors::ExecutorError;
12use crate::select::SelectResult;
13
14/// Executor for database introspection commands
15pub struct IntrospectionExecutor<'a> {
16    db: &'a Database,
17}
18
19impl<'a> IntrospectionExecutor<'a> {
20    /// Create a new introspection executor
21    pub fn new(db: &'a Database) -> Self {
22        Self { db }
23    }
24
25    /// Execute SHOW TABLES statement
26    ///
27    /// Returns a result set with a single column "Tables_in_<database>" containing
28    /// the names of all tables in the specified (or current) schema.
29    pub fn execute_show_tables(&self, stmt: &ShowTablesStmt) -> Result<SelectResult, ExecutorError> {
30        // Get the schema to query (default to current schema)
31        let schema_name = stmt.database.as_deref().unwrap_or_else(|| {
32            self.db.catalog.get_current_schema()
33        });
34
35        // Get tables from the schema
36        let tables = if let Some(schema) = self.db.catalog.get_schema(schema_name) {
37            schema.list_tables()
38        } else {
39            // Try case-insensitive lookup
40            let upper_name = schema_name.to_uppercase();
41            self.db.catalog.list_schemas()
42                .into_iter()
43                .find(|s| s.to_uppercase() == upper_name)
44                .and_then(|actual_name| self.db.catalog.get_schema(&actual_name))
45                .map(|schema| schema.list_tables())
46                .unwrap_or_default()
47        };
48
49        // Filter by LIKE pattern if provided
50        let filtered_tables: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
51            tables.into_iter().filter(|t| like_match(pattern, t)).collect()
52        } else {
53            tables
54        };
55
56        // Build result set
57        let column_name = format!("Tables_in_{}", schema_name);
58        let rows: Vec<Row> = filtered_tables
59            .into_iter()
60            .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
61            .collect();
62
63        Ok(SelectResult {
64            columns: vec![column_name],
65            rows,
66        })
67    }
68
69    /// Execute SHOW DATABASES statement
70    ///
71    /// Returns a result set with a single column "Database" containing
72    /// the names of all schemas (databases) in the catalog.
73    pub fn execute_show_databases(&self, stmt: &ShowDatabasesStmt) -> Result<SelectResult, ExecutorError> {
74        let schemas = self.db.catalog.list_schemas();
75
76        // Filter by LIKE pattern if provided
77        let filtered_schemas: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
78            schemas.into_iter().filter(|s| like_match(pattern, s)).collect()
79        } else {
80            schemas
81        };
82
83        let rows: Vec<Row> = filtered_schemas
84            .into_iter()
85            .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
86            .collect();
87
88        Ok(SelectResult {
89            columns: vec!["Database".to_string()],
90            rows,
91        })
92    }
93
94    /// Execute SHOW COLUMNS statement
95    ///
96    /// Returns a result set with columns: Field, Type, Null, Key, Default, Extra
97    pub fn execute_show_columns(&self, stmt: &ShowColumnsStmt) -> Result<SelectResult, ExecutorError> {
98        // Get the table
99        let table_name = if let Some(ref db) = stmt.database {
100            format!("{}.{}", db, stmt.table_name)
101        } else {
102            stmt.table_name.clone()
103        };
104
105        let table = self.db.catalog.get_table(&table_name)
106            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
107
108        // Determine which columns are part of the primary key
109        let pk_columns: std::collections::HashSet<String> = table.primary_key
110            .as_ref()
111            .map(|pk| pk.iter().cloned().collect())
112            .unwrap_or_default();
113
114        // Determine which columns are part of unique constraints
115        let unique_columns: std::collections::HashSet<String> = table.unique_constraints
116            .iter()
117            .flatten()
118            .cloned()
119            .collect();
120
121        let mut rows: Vec<Row> = Vec::with_capacity(table.columns.len());
122
123        for col in &table.columns {
124            // Check if column matches LIKE pattern
125            if let Some(ref pattern) = stmt.like_pattern {
126                if !like_match(pattern, &col.name) {
127                    continue;
128                }
129            }
130
131            // Field: column name
132            let field = SqlValue::Varchar(col.name.clone());
133
134            // Type: data type as string
135            let type_str = format_data_type(&col.data_type);
136            let col_type = SqlValue::Varchar(type_str);
137
138            // Null: YES or NO
139            let nullable = SqlValue::Varchar(if col.nullable { "YES" } else { "NO" }.into());
140
141            // Key: PRI for primary key, UNI for unique, empty otherwise
142            let key = if pk_columns.contains(&col.name) {
143                SqlValue::Varchar("PRI".into())
144            } else if unique_columns.contains(&col.name) {
145                SqlValue::Varchar("UNI".into())
146            } else {
147                SqlValue::Varchar("".into())
148            };
149
150            // Default: default value or NULL
151            let default = col.default_value
152                .as_ref()
153                .map(|expr| SqlValue::Varchar(format!("{:?}", expr)))
154                .unwrap_or(SqlValue::Null);
155
156            // Extra: auto_increment, etc. (we don't have auto_increment info yet)
157            let extra = SqlValue::Varchar("".into());
158
159            if stmt.full {
160                // SHOW FULL COLUMNS includes: Collation, Privileges, Comment
161                rows.push(Row::new(vec![
162                    field,
163                    col_type,
164                    SqlValue::Null, // Collation
165                    nullable,
166                    key,
167                    default,
168                    extra,
169                    SqlValue::Varchar("select,insert,update,references".into()), // Privileges
170                    SqlValue::Varchar("".into()), // Comment
171                ]));
172            } else {
173                rows.push(Row::new(vec![field, col_type, nullable, key, default, extra]));
174            }
175        }
176
177        let columns = if stmt.full {
178            vec![
179                "Field".to_string(),
180                "Type".to_string(),
181                "Collation".to_string(),
182                "Null".to_string(),
183                "Key".to_string(),
184                "Default".to_string(),
185                "Extra".to_string(),
186                "Privileges".to_string(),
187                "Comment".to_string(),
188            ]
189        } else {
190            vec![
191                "Field".to_string(),
192                "Type".to_string(),
193                "Null".to_string(),
194                "Key".to_string(),
195                "Default".to_string(),
196                "Extra".to_string(),
197            ]
198        };
199
200        Ok(SelectResult { columns, rows })
201    }
202
203    /// Execute DESCRIBE statement (alias for SHOW COLUMNS)
204    pub fn execute_describe(&self, stmt: &DescribeStmt) -> Result<SelectResult, ExecutorError> {
205        // DESCRIBE is equivalent to SHOW COLUMNS FROM table
206        let show_columns = ShowColumnsStmt {
207            table_name: stmt.table_name.clone(),
208            database: None,
209            full: false,
210            like_pattern: stmt.column_pattern.clone(),
211            where_clause: None,
212        };
213        self.execute_show_columns(&show_columns)
214    }
215
216    /// Execute SHOW INDEX statement
217    ///
218    /// Returns a result set with index information in MySQL format.
219    pub fn execute_show_index(&self, stmt: &ShowIndexStmt) -> Result<SelectResult, ExecutorError> {
220        // Get the table name (with optional database qualifier)
221        let table_name = if let Some(ref db) = stmt.database {
222            format!("{}.{}", db, stmt.table_name)
223        } else {
224            stmt.table_name.clone()
225        };
226
227        // Verify table exists
228        let table = self.db.catalog.get_table(&table_name)
229            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
230
231        let mut rows: Vec<Row> = Vec::new();
232
233        // Add primary key as an index (if present)
234        if let Some(ref pk_cols) = table.primary_key {
235            for (seq, col_name) in pk_cols.iter().enumerate() {
236                rows.push(Row::new(vec![
237                    SqlValue::Varchar(stmt.table_name.clone()), // Table
238                    SqlValue::Integer(0), // Non_unique (0 = unique)
239                    SqlValue::Varchar("PRIMARY".into()), // Key_name
240                    SqlValue::Integer((seq + 1) as i64), // Seq_in_index
241                    SqlValue::Varchar(col_name.clone()), // Column_name
242                    SqlValue::Varchar("A".into()), // Collation (A = ascending)
243                    SqlValue::Null, // Cardinality
244                    SqlValue::Null, // Sub_part
245                    SqlValue::Null, // Packed
246                    SqlValue::Varchar("".into()), // Null
247                    SqlValue::Varchar("BTREE".into()), // Index_type
248                    SqlValue::Varchar("".into()), // Comment
249                    SqlValue::Varchar("".into()), // Index_comment
250                    SqlValue::Varchar("YES".into()), // Visible
251                ]));
252            }
253        }
254
255        // Add explicit indexes from catalog
256        let indexes = self.db.catalog.get_table_indexes(&stmt.table_name);
257        for index in indexes {
258            for (seq, col) in index.columns.iter().enumerate() {
259                let non_unique = if index.is_unique { 0 } else { 1 };
260                let collation = match col.order {
261                    SortOrder::Ascending => "A",
262                    SortOrder::Descending => "D",
263                };
264                let index_type = match index.index_type {
265                    IndexType::BTree => "BTREE",
266                    IndexType::Hash => "HASH",
267                    IndexType::RTree => "RTREE",
268                    IndexType::Fulltext => "FULLTEXT",
269                };
270
271                rows.push(Row::new(vec![
272                    SqlValue::Varchar(stmt.table_name.clone()), // Table
273                    SqlValue::Integer(non_unique), // Non_unique
274                    SqlValue::Varchar(index.name.clone()), // Key_name
275                    SqlValue::Integer((seq + 1) as i64), // Seq_in_index
276                    SqlValue::Varchar(col.column_name.clone()), // Column_name
277                    SqlValue::Varchar(collation.into()), // Collation
278                    SqlValue::Null, // Cardinality
279                    col.prefix_length.map(|l| SqlValue::Integer(l as i64)).unwrap_or(SqlValue::Null), // Sub_part
280                    SqlValue::Null, // Packed
281                    SqlValue::Varchar("".into()), // Null
282                    SqlValue::Varchar(index_type.into()), // Index_type
283                    SqlValue::Varchar("".into()), // Comment
284                    SqlValue::Varchar("".into()), // Index_comment
285                    SqlValue::Varchar("YES".into()), // Visible
286                ]));
287            }
288        }
289
290        Ok(SelectResult {
291            columns: vec![
292                "Table".to_string(),
293                "Non_unique".to_string(),
294                "Key_name".to_string(),
295                "Seq_in_index".to_string(),
296                "Column_name".to_string(),
297                "Collation".to_string(),
298                "Cardinality".to_string(),
299                "Sub_part".to_string(),
300                "Packed".to_string(),
301                "Null".to_string(),
302                "Index_type".to_string(),
303                "Comment".to_string(),
304                "Index_comment".to_string(),
305                "Visible".to_string(),
306            ],
307            rows,
308        })
309    }
310
311    /// Execute SHOW CREATE TABLE statement
312    ///
313    /// Returns a result set with columns: Table, Create Table
314    pub fn execute_show_create_table(&self, stmt: &ShowCreateTableStmt) -> Result<SelectResult, ExecutorError> {
315        let table = self.db.catalog.get_table(&stmt.table_name)
316            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
317
318        // Build the CREATE TABLE statement
319        let mut sql = format!("CREATE TABLE {} (\n", table.name);
320        let mut definitions: Vec<String> = Vec::new();
321
322        // Add column definitions
323        for col in &table.columns {
324            let mut col_def = format!("  {} {}", col.name, format_data_type(&col.data_type));
325
326            if !col.nullable {
327                col_def.push_str(" NOT NULL");
328            }
329
330            if let Some(ref default) = col.default_value {
331                col_def.push_str(&format!(" DEFAULT {:?}", default));
332            }
333
334            definitions.push(col_def);
335        }
336
337        // Add PRIMARY KEY constraint
338        if let Some(ref pk_cols) = table.primary_key {
339            definitions.push(format!("  PRIMARY KEY ({})", pk_cols.join(", ")));
340        }
341
342        // Add UNIQUE constraints
343        for unique_cols in &table.unique_constraints {
344            definitions.push(format!("  UNIQUE ({})", unique_cols.join(", ")));
345        }
346
347        // Add FOREIGN KEY constraints
348        for fk in &table.foreign_keys {
349            let mut fk_def = format!(
350                "  FOREIGN KEY ({}) REFERENCES {} ({})",
351                fk.column_names.join(", "),
352                fk.parent_table,
353                fk.parent_column_names.join(", ")
354            );
355
356            // Add ON DELETE action
357            fk_def.push_str(&format!(" ON DELETE {}", format_referential_action(&fk.on_delete)));
358
359            // Add ON UPDATE action
360            fk_def.push_str(&format!(" ON UPDATE {}", format_referential_action(&fk.on_update)));
361
362            definitions.push(fk_def);
363        }
364
365        // Add CHECK constraints
366        for (name, _expr) in &table.check_constraints {
367            definitions.push(format!("  CONSTRAINT {} CHECK (...)", name));
368        }
369
370        sql.push_str(&definitions.join(",\n"));
371        sql.push_str("\n)");
372
373        let rows = vec![Row::new(vec![
374            SqlValue::Varchar(table.name.clone()),
375            SqlValue::Varchar(sql),
376        ])];
377
378        Ok(SelectResult {
379            columns: vec!["Table".to_string(), "Create Table".to_string()],
380            rows,
381        })
382    }
383}
384
385/// Match a SQL LIKE pattern against a string (case-insensitive)
386///
387/// Supports % (match zero or more characters) and _ (match exactly one character)
388fn like_match(pattern: &str, text: &str) -> bool {
389    let pattern = pattern.to_lowercase();
390    let text = text.to_lowercase();
391
392    let pattern_chars: Vec<char> = pattern.chars().collect();
393    let text_chars: Vec<char> = text.chars().collect();
394
395    like_match_impl(&pattern_chars, &text_chars)
396}
397
398/// Recursive implementation of LIKE pattern matching
399fn like_match_impl(pattern: &[char], text: &[char]) -> bool {
400    match (pattern.first(), text.first()) {
401        (None, None) => true,
402        (None, Some(_)) => false,
403        (Some('%'), _) => {
404            // % matches zero or more characters
405            // Try matching 0 characters (skip %) or matching 1 character
406            like_match_impl(&pattern[1..], text) ||
407            (!text.is_empty() && like_match_impl(pattern, &text[1..]))
408        }
409        (Some('_'), Some(_)) => {
410            // _ matches exactly one character
411            like_match_impl(&pattern[1..], &text[1..])
412        }
413        (Some('_'), None) => false,
414        (Some('\\'), _) if pattern.len() > 1 => {
415            // Escape sequence - match the next character literally
416            if text.first() == Some(&pattern[1]) {
417                like_match_impl(&pattern[2..], &text[1..])
418            } else {
419                false
420            }
421        }
422        (Some(p), Some(t)) if *p == *t => {
423            like_match_impl(&pattern[1..], &text[1..])
424        }
425        _ => false,
426    }
427}
428
429/// Format a DataType as a SQL string
430fn format_data_type(dt: &vibesql_types::DataType) -> String {
431    use vibesql_types::DataType;
432
433    match dt {
434        DataType::Integer => "INT".to_string(),
435        DataType::Smallint => "SMALLINT".to_string(),
436        DataType::Bigint => "BIGINT".to_string(),
437        DataType::Unsigned => "UNSIGNED BIGINT".to_string(),
438        DataType::Real => "REAL".to_string(),
439        DataType::Float { precision } => format!("FLOAT({})", precision),
440        DataType::DoublePrecision => "DOUBLE PRECISION".to_string(),
441        DataType::Numeric { precision, scale } => {
442            format!("NUMERIC({}, {})", precision, scale)
443        }
444        DataType::Decimal { precision, scale } => {
445            format!("DECIMAL({}, {})", precision, scale)
446        }
447        DataType::Varchar { max_length } => {
448            if let Some(len) = max_length {
449                format!("VARCHAR({})", len)
450            } else {
451                "VARCHAR".to_string()
452            }
453        }
454        DataType::Character { length } => {
455            format!("CHAR({})", length)
456        }
457        DataType::CharacterLargeObject => "CLOB".to_string(),
458        DataType::Name => "NAME".to_string(),
459        DataType::Boolean => "BOOLEAN".to_string(),
460        DataType::Date => "DATE".to_string(),
461        DataType::Time { with_timezone } => {
462            if *with_timezone {
463                "TIME WITH TIME ZONE".to_string()
464            } else {
465                "TIME".to_string()
466            }
467        }
468        DataType::Timestamp { with_timezone } => {
469            if *with_timezone {
470                "TIMESTAMP WITH TIME ZONE".to_string()
471            } else {
472                "TIMESTAMP".to_string()
473            }
474        }
475        DataType::Interval { start_field, end_field } => {
476            if let Some(end) = end_field {
477                format!("INTERVAL {:?} TO {:?}", start_field, end)
478            } else {
479                format!("INTERVAL {:?}", start_field)
480            }
481        }
482        DataType::BinaryLargeObject => "BLOB".to_string(),
483        DataType::Bit { length } => {
484            if let Some(len) = length {
485                format!("BIT({})", len)
486            } else {
487                "BIT".to_string()
488            }
489        }
490        DataType::UserDefined { type_name } => type_name.clone(),
491        DataType::Vector { dimensions } => format!("VECTOR({})", dimensions),
492        DataType::Null => "NULL".to_string(),
493    }
494}
495
496/// Format a referential action as a SQL string
497fn format_referential_action(action: &ReferentialAction) -> &'static str {
498    match action {
499        ReferentialAction::NoAction => "NO ACTION",
500        ReferentialAction::Restrict => "RESTRICT",
501        ReferentialAction::Cascade => "CASCADE",
502        ReferentialAction::SetNull => "SET NULL",
503        ReferentialAction::SetDefault => "SET DEFAULT",
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use super::*;
510    use vibesql_catalog::{ColumnSchema, TableSchema};
511    use vibesql_types::DataType;
512
513    fn create_test_db() -> Database {
514        let mut db = Database::new();
515        db.catalog.set_case_sensitive_identifiers(false);
516
517        // Create a users table
518        let columns = vec![
519            ColumnSchema::new("id".to_string(), DataType::Integer, false),
520            ColumnSchema::new("name".to_string(), DataType::Varchar { max_length: Some(100) }, true),
521            ColumnSchema::new("email".to_string(), DataType::Varchar { max_length: Some(255) }, false),
522        ];
523        let schema = TableSchema::with_primary_key(
524            "users".to_string(),
525            columns,
526            vec!["id".to_string()],
527        );
528        db.create_table(schema).unwrap();
529
530        // Create an orders table
531        let order_columns = vec![
532            ColumnSchema::new("id".to_string(), DataType::Integer, false),
533            ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
534            ColumnSchema::new("total".to_string(), DataType::Decimal { precision: 10, scale: 2 }, true),
535        ];
536        let order_schema = TableSchema::with_primary_key(
537            "orders".to_string(),
538            order_columns,
539            vec!["id".to_string()],
540        );
541        db.create_table(order_schema).unwrap();
542
543        db
544    }
545
546    #[test]
547    fn test_show_tables() {
548        let db = create_test_db();
549        let executor = IntrospectionExecutor::new(&db);
550
551        let stmt = ShowTablesStmt {
552            database: None,
553            like_pattern: None,
554            where_clause: None,
555        };
556
557        let result = executor.execute_show_tables(&stmt).unwrap();
558        assert_eq!(result.columns.len(), 1);
559        assert!(result.columns[0].starts_with("Tables_in_"));
560
561        // Should have 2 tables
562        assert_eq!(result.rows.len(), 2);
563
564        // Extract table names
565        let table_names: Vec<&str> = result.rows.iter()
566            .filter_map(|r| match &r.values[0] {
567                SqlValue::Varchar(s) => Some(s.as_str()),
568                _ => None,
569            })
570            .collect();
571
572        assert!(table_names.contains(&"users") || table_names.contains(&"USERS"));
573        assert!(table_names.contains(&"orders") || table_names.contains(&"ORDERS"));
574    }
575
576    #[test]
577    fn test_show_tables_with_like() {
578        let db = create_test_db();
579        let executor = IntrospectionExecutor::new(&db);
580
581        let stmt = ShowTablesStmt {
582            database: None,
583            like_pattern: Some("user%".to_string()),
584            where_clause: None,
585        };
586
587        let result = executor.execute_show_tables(&stmt).unwrap();
588        // Only 'users' should match
589        assert_eq!(result.rows.len(), 1);
590    }
591
592    #[test]
593    fn test_show_databases() {
594        let db = create_test_db();
595        let executor = IntrospectionExecutor::new(&db);
596
597        let stmt = ShowDatabasesStmt {
598            like_pattern: None,
599            where_clause: None,
600        };
601
602        let result = executor.execute_show_databases(&stmt).unwrap();
603        assert_eq!(result.columns, vec!["Database"]);
604
605        // Should have at least the 'public' schema
606        assert!(!result.rows.is_empty());
607
608        let schema_names: Vec<&str> = result.rows.iter()
609            .filter_map(|r| match &r.values[0] {
610                SqlValue::Varchar(s) => Some(s.as_str()),
611                _ => None,
612            })
613            .collect();
614
615        assert!(schema_names.contains(&"public"));
616    }
617
618    #[test]
619    fn test_show_columns() {
620        let db = create_test_db();
621        let executor = IntrospectionExecutor::new(&db);
622
623        let stmt = ShowColumnsStmt {
624            table_name: "users".to_string(),
625            database: None,
626            full: false,
627            like_pattern: None,
628            where_clause: None,
629        };
630
631        let result = executor.execute_show_columns(&stmt).unwrap();
632        assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
633        assert_eq!(result.rows.len(), 3); // id, name, email
634    }
635
636    #[test]
637    fn test_show_columns_full() {
638        let db = create_test_db();
639        let executor = IntrospectionExecutor::new(&db);
640
641        let stmt = ShowColumnsStmt {
642            table_name: "users".to_string(),
643            database: None,
644            full: true,
645            like_pattern: None,
646            where_clause: None,
647        };
648
649        let result = executor.execute_show_columns(&stmt).unwrap();
650        assert_eq!(result.columns.len(), 9);
651        assert!(result.columns.contains(&"Privileges".to_string()));
652        assert!(result.columns.contains(&"Comment".to_string()));
653    }
654
655    #[test]
656    fn test_describe() {
657        let db = create_test_db();
658        let executor = IntrospectionExecutor::new(&db);
659
660        let stmt = DescribeStmt {
661            table_name: "users".to_string(),
662            column_pattern: None,
663        };
664
665        let result = executor.execute_describe(&stmt).unwrap();
666        assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
667        assert_eq!(result.rows.len(), 3);
668    }
669
670    #[test]
671    fn test_show_index() {
672        let db = create_test_db();
673        let executor = IntrospectionExecutor::new(&db);
674
675        let stmt = ShowIndexStmt {
676            table_name: "users".to_string(),
677            database: None,
678        };
679
680        let result = executor.execute_show_index(&stmt).unwrap();
681
682        // Should have at least the PRIMARY key index
683        assert!(!result.rows.is_empty());
684
685        // Check that PRIMARY KEY index is present
686        let has_primary = result.rows.iter().any(|r| {
687            match &r.values[2] { // Key_name column
688                SqlValue::Varchar(s) => s == "PRIMARY",
689                _ => false,
690            }
691        });
692        assert!(has_primary);
693    }
694
695    #[test]
696    fn test_show_create_table() {
697        let db = create_test_db();
698        let executor = IntrospectionExecutor::new(&db);
699
700        let stmt = ShowCreateTableStmt {
701            table_name: "users".to_string(),
702        };
703
704        let result = executor.execute_show_create_table(&stmt).unwrap();
705        assert_eq!(result.columns, vec!["Table", "Create Table"]);
706        assert_eq!(result.rows.len(), 1);
707
708        // Check the CREATE TABLE statement contains expected elements
709        if let SqlValue::Varchar(sql) = &result.rows[0].values[1] {
710            assert!(sql.contains("CREATE TABLE"));
711            assert!(sql.contains("users"));
712            assert!(sql.contains("id"));
713            assert!(sql.contains("name"));
714            assert!(sql.contains("email"));
715            assert!(sql.contains("PRIMARY KEY"));
716        } else {
717            panic!("Expected VARCHAR for Create Table");
718        }
719    }
720
721    #[test]
722    fn test_like_match() {
723        // Exact match
724        assert!(like_match("test", "test"));
725        assert!(like_match("test", "TEST")); // case-insensitive
726        assert!(!like_match("test", "testing"));
727
728        // % matches zero or more characters
729        assert!(like_match("test%", "test"));
730        assert!(like_match("test%", "testing"));
731        assert!(like_match("%test", "test"));
732        assert!(like_match("%test", "mytest"));
733        assert!(like_match("%test%", "test"));
734        assert!(like_match("%test%", "mytesting"));
735
736        // _ matches exactly one character
737        assert!(like_match("te_t", "test"));
738        assert!(!like_match("te_t", "tet"));
739        assert!(!like_match("te_t", "testt"));
740
741        // Escape sequences
742        assert!(like_match("te\\_t", "te_t"));
743        assert!(!like_match("te\\_t", "test"));
744    }
745}