vibesql_executor/
introspection.rs

1//! Database introspection command execution (SHOW, DESCRIBE)
2//!
3//! This module implements MySQL-style introspection commands that query
4//! database metadata and return result sets.
5
6use vibesql_ast::{
7    DescribeStmt, ShowColumnsStmt, ShowCreateTableStmt, ShowDatabasesStmt, ShowIndexStmt,
8    ShowTablesStmt,
9};
10use vibesql_catalog::{IndexType, ReferentialAction, SortOrder};
11use vibesql_storage::{Database, Row};
12use vibesql_types::SqlValue;
13
14use crate::errors::ExecutorError;
15use crate::select::SelectResult;
16
17/// Executor for database introspection commands
18pub struct IntrospectionExecutor<'a> {
19    db: &'a Database,
20}
21
22impl<'a> IntrospectionExecutor<'a> {
23    /// Create a new introspection executor
24    pub fn new(db: &'a Database) -> Self {
25        Self { db }
26    }
27
28    /// Execute SHOW TABLES statement
29    ///
30    /// Returns a result set with a single column "Tables_in_<database>" containing
31    /// the names of all tables in the specified (or current) schema.
32    pub fn execute_show_tables(
33        &self,
34        stmt: &ShowTablesStmt,
35    ) -> Result<SelectResult, ExecutorError> {
36        // Get the schema to query (default to current schema)
37        let schema_name =
38            stmt.database.as_deref().unwrap_or_else(|| self.db.catalog.get_current_schema());
39
40        // Get tables from the schema
41        let tables = if let Some(schema) = self.db.catalog.get_schema(schema_name) {
42            schema.list_tables()
43        } else {
44            // Try case-insensitive lookup
45            let upper_name = schema_name.to_uppercase();
46            self.db
47                .catalog
48                .list_schemas()
49                .into_iter()
50                .find(|s| s.to_uppercase() == upper_name)
51                .and_then(|actual_name| self.db.catalog.get_schema(&actual_name))
52                .map(|schema| schema.list_tables())
53                .unwrap_or_default()
54        };
55
56        // Filter by LIKE pattern if provided
57        let filtered_tables: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
58            tables.into_iter().filter(|t| like_match(pattern, t)).collect()
59        } else {
60            tables
61        };
62
63        // Build result set
64        let column_name = format!("Tables_in_{}", schema_name);
65        let rows: Vec<Row> = filtered_tables
66            .into_iter()
67            .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
68            .collect();
69
70        Ok(SelectResult { columns: vec![column_name], rows })
71    }
72
73    /// Execute SHOW DATABASES statement
74    ///
75    /// Returns a result set with a single column "Database" containing
76    /// the names of all schemas (databases) in the catalog.
77    pub fn execute_show_databases(
78        &self,
79        stmt: &ShowDatabasesStmt,
80    ) -> Result<SelectResult, ExecutorError> {
81        let schemas = self.db.catalog.list_schemas();
82
83        // Filter by LIKE pattern if provided
84        let filtered_schemas: Vec<String> = if let Some(ref pattern) = stmt.like_pattern {
85            schemas.into_iter().filter(|s| like_match(pattern, s)).collect()
86        } else {
87            schemas
88        };
89
90        let rows: Vec<Row> = filtered_schemas
91            .into_iter()
92            .map(|name| Row::new(vec![SqlValue::Varchar(name)]))
93            .collect();
94
95        Ok(SelectResult { columns: vec!["Database".to_string()], rows })
96    }
97
98    /// Execute SHOW COLUMNS statement
99    ///
100    /// Returns a result set with columns: Field, Type, Null, Key, Default, Extra
101    pub fn execute_show_columns(
102        &self,
103        stmt: &ShowColumnsStmt,
104    ) -> Result<SelectResult, ExecutorError> {
105        // Get the table
106        let table_name = if let Some(ref db) = stmt.database {
107            format!("{}.{}", db, stmt.table_name)
108        } else {
109            stmt.table_name.clone()
110        };
111
112        let table = self
113            .db
114            .catalog
115            .get_table(&table_name)
116            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
117
118        // Determine which columns are part of the primary key
119        let pk_columns: std::collections::HashSet<String> =
120            table.primary_key.as_ref().map(|pk| pk.iter().cloned().collect()).unwrap_or_default();
121
122        // Determine which columns are part of unique constraints
123        let unique_columns: std::collections::HashSet<String> =
124            table.unique_constraints.iter().flatten().cloned().collect();
125
126        let mut rows: Vec<Row> = Vec::with_capacity(table.columns.len());
127
128        for col in &table.columns {
129            // Check if column matches LIKE pattern
130            if let Some(ref pattern) = stmt.like_pattern {
131                if !like_match(pattern, &col.name) {
132                    continue;
133                }
134            }
135
136            // Field: column name
137            let field = SqlValue::Varchar(col.name.clone());
138
139            // Type: data type as string
140            let type_str = format_data_type(&col.data_type);
141            let col_type = SqlValue::Varchar(type_str);
142
143            // Null: YES or NO
144            let nullable = SqlValue::Varchar(if col.nullable { "YES" } else { "NO" }.into());
145
146            // Key: PRI for primary key, UNI for unique, empty otherwise
147            let key = if pk_columns.contains(&col.name) {
148                SqlValue::Varchar("PRI".into())
149            } else if unique_columns.contains(&col.name) {
150                SqlValue::Varchar("UNI".into())
151            } else {
152                SqlValue::Varchar("".into())
153            };
154
155            // Default: default value or NULL
156            let default = col
157                .default_value
158                .as_ref()
159                .map(|expr| SqlValue::Varchar(format!("{:?}", expr)))
160                .unwrap_or(SqlValue::Null);
161
162            // Extra: auto_increment, etc. (we don't have auto_increment info yet)
163            let extra = SqlValue::Varchar("".into());
164
165            if stmt.full {
166                // SHOW FULL COLUMNS includes: Collation, Privileges, Comment
167                rows.push(Row::new(vec![
168                    field,
169                    col_type,
170                    SqlValue::Null, // Collation
171                    nullable,
172                    key,
173                    default,
174                    extra,
175                    SqlValue::Varchar("select,insert,update,references".into()), // Privileges
176                    SqlValue::Varchar("".into()),                                // Comment
177                ]));
178            } else {
179                rows.push(Row::new(vec![field, col_type, nullable, key, default, extra]));
180            }
181        }
182
183        let columns = if stmt.full {
184            vec![
185                "Field".to_string(),
186                "Type".to_string(),
187                "Collation".to_string(),
188                "Null".to_string(),
189                "Key".to_string(),
190                "Default".to_string(),
191                "Extra".to_string(),
192                "Privileges".to_string(),
193                "Comment".to_string(),
194            ]
195        } else {
196            vec![
197                "Field".to_string(),
198                "Type".to_string(),
199                "Null".to_string(),
200                "Key".to_string(),
201                "Default".to_string(),
202                "Extra".to_string(),
203            ]
204        };
205
206        Ok(SelectResult { columns, rows })
207    }
208
209    /// Execute DESCRIBE statement (alias for SHOW COLUMNS)
210    pub fn execute_describe(&self, stmt: &DescribeStmt) -> Result<SelectResult, ExecutorError> {
211        // DESCRIBE is equivalent to SHOW COLUMNS FROM table
212        let show_columns = ShowColumnsStmt {
213            table_name: stmt.table_name.clone(),
214            database: None,
215            full: false,
216            like_pattern: stmt.column_pattern.clone(),
217            where_clause: None,
218        };
219        self.execute_show_columns(&show_columns)
220    }
221
222    /// Execute SHOW INDEX statement
223    ///
224    /// Returns a result set with index information in MySQL format.
225    pub fn execute_show_index(&self, stmt: &ShowIndexStmt) -> Result<SelectResult, ExecutorError> {
226        // Get the table name (with optional database qualifier)
227        let table_name = if let Some(ref db) = stmt.database {
228            format!("{}.{}", db, stmt.table_name)
229        } else {
230            stmt.table_name.clone()
231        };
232
233        // Verify table exists
234        let table = self
235            .db
236            .catalog
237            .get_table(&table_name)
238            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
239
240        let mut rows: Vec<Row> = Vec::new();
241
242        // Add primary key as an index (if present)
243        if let Some(ref pk_cols) = table.primary_key {
244            for (seq, col_name) in pk_cols.iter().enumerate() {
245                rows.push(Row::new(vec![
246                    SqlValue::Varchar(stmt.table_name.clone()), // Table
247                    SqlValue::Integer(0),                       // Non_unique (0 = unique)
248                    SqlValue::Varchar("PRIMARY".into()),        // Key_name
249                    SqlValue::Integer((seq + 1) as i64),        // Seq_in_index
250                    SqlValue::Varchar(col_name.clone()),        // Column_name
251                    SqlValue::Varchar("A".into()),              // Collation (A = ascending)
252                    SqlValue::Null,                             // Cardinality
253                    SqlValue::Null,                             // Sub_part
254                    SqlValue::Null,                             // Packed
255                    SqlValue::Varchar("".into()),               // Null
256                    SqlValue::Varchar("BTREE".into()),          // Index_type
257                    SqlValue::Varchar("".into()),               // Comment
258                    SqlValue::Varchar("".into()),               // Index_comment
259                    SqlValue::Varchar("YES".into()),            // Visible
260                ]));
261            }
262        }
263
264        // Add explicit indexes from catalog
265        let indexes = self.db.catalog.get_table_indexes(&stmt.table_name);
266        for index in indexes {
267            for (seq, col) in index.columns.iter().enumerate() {
268                let non_unique = if index.is_unique { 0 } else { 1 };
269                let collation = match col.order {
270                    SortOrder::Ascending => "A",
271                    SortOrder::Descending => "D",
272                };
273                let index_type = match index.index_type {
274                    IndexType::BTree => "BTREE",
275                    IndexType::Hash => "HASH",
276                    IndexType::RTree => "RTREE",
277                    IndexType::Fulltext => "FULLTEXT",
278                    IndexType::IVFFlat { .. } => "IVFFLAT",
279                    IndexType::Hnsw { .. } => "HNSW",
280                };
281
282                rows.push(Row::new(vec![
283                    SqlValue::Varchar(stmt.table_name.clone()), // Table
284                    SqlValue::Integer(non_unique),              // Non_unique
285                    SqlValue::Varchar(index.name.clone()),      // Key_name
286                    SqlValue::Integer((seq + 1) as i64),        // Seq_in_index
287                    SqlValue::Varchar(col.column_name.clone()), // Column_name
288                    SqlValue::Varchar(collation.into()),        // Collation
289                    SqlValue::Null,                             // Cardinality
290                    col.prefix_length
291                        .map(|l| SqlValue::Integer(l as i64))
292                        .unwrap_or(SqlValue::Null), // Sub_part
293                    SqlValue::Null,                             // Packed
294                    SqlValue::Varchar("".into()),               // Null
295                    SqlValue::Varchar(index_type.into()),       // Index_type
296                    SqlValue::Varchar("".into()),               // Comment
297                    SqlValue::Varchar("".into()),               // Index_comment
298                    SqlValue::Varchar("YES".into()),            // Visible
299                ]));
300            }
301        }
302
303        Ok(SelectResult {
304            columns: vec![
305                "Table".to_string(),
306                "Non_unique".to_string(),
307                "Key_name".to_string(),
308                "Seq_in_index".to_string(),
309                "Column_name".to_string(),
310                "Collation".to_string(),
311                "Cardinality".to_string(),
312                "Sub_part".to_string(),
313                "Packed".to_string(),
314                "Null".to_string(),
315                "Index_type".to_string(),
316                "Comment".to_string(),
317                "Index_comment".to_string(),
318                "Visible".to_string(),
319            ],
320            rows,
321        })
322    }
323
324    /// Execute SHOW CREATE TABLE statement
325    ///
326    /// Returns a result set with columns: Table, Create Table
327    pub fn execute_show_create_table(
328        &self,
329        stmt: &ShowCreateTableStmt,
330    ) -> Result<SelectResult, ExecutorError> {
331        let table = self
332            .db
333            .catalog
334            .get_table(&stmt.table_name)
335            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
336
337        // Build the CREATE TABLE statement
338        let mut sql = format!("CREATE TABLE {} (\n", table.name);
339        let mut definitions: Vec<String> = Vec::new();
340
341        // Add column definitions
342        for col in &table.columns {
343            let mut col_def = format!("  {} {}", col.name, format_data_type(&col.data_type));
344
345            if !col.nullable {
346                col_def.push_str(" NOT NULL");
347            }
348
349            if let Some(ref default) = col.default_value {
350                col_def.push_str(&format!(" DEFAULT {:?}", default));
351            }
352
353            definitions.push(col_def);
354        }
355
356        // Add PRIMARY KEY constraint
357        if let Some(ref pk_cols) = table.primary_key {
358            definitions.push(format!("  PRIMARY KEY ({})", pk_cols.join(", ")));
359        }
360
361        // Add UNIQUE constraints
362        for unique_cols in &table.unique_constraints {
363            definitions.push(format!("  UNIQUE ({})", unique_cols.join(", ")));
364        }
365
366        // Add FOREIGN KEY constraints
367        for fk in &table.foreign_keys {
368            let mut fk_def = format!(
369                "  FOREIGN KEY ({}) REFERENCES {} ({})",
370                fk.column_names.join(", "),
371                fk.parent_table,
372                fk.parent_column_names.join(", ")
373            );
374
375            // Add ON DELETE action
376            fk_def.push_str(&format!(" ON DELETE {}", format_referential_action(&fk.on_delete)));
377
378            // Add ON UPDATE action
379            fk_def.push_str(&format!(" ON UPDATE {}", format_referential_action(&fk.on_update)));
380
381            definitions.push(fk_def);
382        }
383
384        // Add CHECK constraints
385        for (name, _expr) in &table.check_constraints {
386            definitions.push(format!("  CONSTRAINT {} CHECK (...)", name));
387        }
388
389        sql.push_str(&definitions.join(",\n"));
390        sql.push_str("\n)");
391
392        let rows =
393            vec![Row::new(vec![SqlValue::Varchar(table.name.clone()), SqlValue::Varchar(sql)])];
394
395        Ok(SelectResult { columns: vec!["Table".to_string(), "Create Table".to_string()], rows })
396    }
397}
398
399/// Match a SQL LIKE pattern against a string (case-insensitive)
400///
401/// Supports % (match zero or more characters) and _ (match exactly one character)
402fn like_match(pattern: &str, text: &str) -> bool {
403    let pattern = pattern.to_lowercase();
404    let text = text.to_lowercase();
405
406    let pattern_chars: Vec<char> = pattern.chars().collect();
407    let text_chars: Vec<char> = text.chars().collect();
408
409    like_match_impl(&pattern_chars, &text_chars)
410}
411
412/// Recursive implementation of LIKE pattern matching
413fn like_match_impl(pattern: &[char], text: &[char]) -> bool {
414    match (pattern.first(), text.first()) {
415        (None, None) => true,
416        (None, Some(_)) => false,
417        (Some('%'), _) => {
418            // % matches zero or more characters
419            // Try matching 0 characters (skip %) or matching 1 character
420            like_match_impl(&pattern[1..], text)
421                || (!text.is_empty() && like_match_impl(pattern, &text[1..]))
422        }
423        (Some('_'), Some(_)) => {
424            // _ matches exactly one character
425            like_match_impl(&pattern[1..], &text[1..])
426        }
427        (Some('_'), None) => false,
428        (Some('\\'), _) if pattern.len() > 1 => {
429            // Escape sequence - match the next character literally
430            if text.first() == Some(&pattern[1]) {
431                like_match_impl(&pattern[2..], &text[1..])
432            } else {
433                false
434            }
435        }
436        (Some(p), Some(t)) if *p == *t => like_match_impl(&pattern[1..], &text[1..]),
437        _ => false,
438    }
439}
440
441/// Format a DataType as a SQL string
442fn format_data_type(dt: &vibesql_types::DataType) -> String {
443    use vibesql_types::DataType;
444
445    match dt {
446        DataType::Integer => "INT".to_string(),
447        DataType::Smallint => "SMALLINT".to_string(),
448        DataType::Bigint => "BIGINT".to_string(),
449        DataType::Unsigned => "UNSIGNED BIGINT".to_string(),
450        DataType::Real => "REAL".to_string(),
451        DataType::Float { precision } => format!("FLOAT({})", precision),
452        DataType::DoublePrecision => "DOUBLE PRECISION".to_string(),
453        DataType::Numeric { precision, scale } => {
454            format!("NUMERIC({}, {})", precision, scale)
455        }
456        DataType::Decimal { precision, scale } => {
457            format!("DECIMAL({}, {})", precision, scale)
458        }
459        DataType::Varchar { max_length } => {
460            if let Some(len) = max_length {
461                format!("VARCHAR({})", len)
462            } else {
463                "VARCHAR".to_string()
464            }
465        }
466        DataType::Character { length } => {
467            format!("CHAR({})", length)
468        }
469        DataType::CharacterLargeObject => "CLOB".to_string(),
470        DataType::Name => "NAME".to_string(),
471        DataType::Boolean => "BOOLEAN".to_string(),
472        DataType::Date => "DATE".to_string(),
473        DataType::Time { with_timezone } => {
474            if *with_timezone {
475                "TIME WITH TIME ZONE".to_string()
476            } else {
477                "TIME".to_string()
478            }
479        }
480        DataType::Timestamp { with_timezone } => {
481            if *with_timezone {
482                "TIMESTAMP WITH TIME ZONE".to_string()
483            } else {
484                "TIMESTAMP".to_string()
485            }
486        }
487        DataType::Interval { start_field, end_field } => {
488            if let Some(end) = end_field {
489                format!("INTERVAL {:?} TO {:?}", start_field, end)
490            } else {
491                format!("INTERVAL {:?}", start_field)
492            }
493        }
494        DataType::BinaryLargeObject => "BLOB".to_string(),
495        DataType::Bit { length } => {
496            if let Some(len) = length {
497                format!("BIT({})", len)
498            } else {
499                "BIT".to_string()
500            }
501        }
502        DataType::UserDefined { type_name } => type_name.clone(),
503        DataType::Vector { dimensions } => format!("VECTOR({})", dimensions),
504        DataType::Null => "NULL".to_string(),
505    }
506}
507
508/// Format a referential action as a SQL string
509fn format_referential_action(action: &ReferentialAction) -> &'static str {
510    match action {
511        ReferentialAction::NoAction => "NO ACTION",
512        ReferentialAction::Restrict => "RESTRICT",
513        ReferentialAction::Cascade => "CASCADE",
514        ReferentialAction::SetNull => "SET NULL",
515        ReferentialAction::SetDefault => "SET DEFAULT",
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use vibesql_catalog::{ColumnSchema, TableSchema};
523    use vibesql_types::DataType;
524
525    fn create_test_db() -> Database {
526        let mut db = Database::new();
527        db.catalog.set_case_sensitive_identifiers(false);
528
529        // Create a users table
530        let columns = vec![
531            ColumnSchema::new("id".to_string(), DataType::Integer, false),
532            ColumnSchema::new(
533                "name".to_string(),
534                DataType::Varchar { max_length: Some(100) },
535                true,
536            ),
537            ColumnSchema::new(
538                "email".to_string(),
539                DataType::Varchar { max_length: Some(255) },
540                false,
541            ),
542        ];
543        let schema =
544            TableSchema::with_primary_key("users".to_string(), columns, vec!["id".to_string()]);
545        db.create_table(schema).unwrap();
546
547        // Create an orders table
548        let order_columns = vec![
549            ColumnSchema::new("id".to_string(), DataType::Integer, false),
550            ColumnSchema::new("user_id".to_string(), DataType::Integer, false),
551            ColumnSchema::new(
552                "total".to_string(),
553                DataType::Decimal { precision: 10, scale: 2 },
554                true,
555            ),
556        ];
557        let order_schema = TableSchema::with_primary_key(
558            "orders".to_string(),
559            order_columns,
560            vec!["id".to_string()],
561        );
562        db.create_table(order_schema).unwrap();
563
564        db
565    }
566
567    #[test]
568    fn test_show_tables() {
569        let db = create_test_db();
570        let executor = IntrospectionExecutor::new(&db);
571
572        let stmt = ShowTablesStmt { database: None, like_pattern: None, where_clause: None };
573
574        let result = executor.execute_show_tables(&stmt).unwrap();
575        assert_eq!(result.columns.len(), 1);
576        assert!(result.columns[0].starts_with("Tables_in_"));
577
578        // Should have 2 tables
579        assert_eq!(result.rows.len(), 2);
580
581        // Extract table names
582        let table_names: Vec<&str> = result
583            .rows
584            .iter()
585            .filter_map(|r| match &r.values[0] {
586                SqlValue::Varchar(s) => Some(s.as_str()),
587                _ => None,
588            })
589            .collect();
590
591        assert!(table_names.contains(&"users") || table_names.contains(&"USERS"));
592        assert!(table_names.contains(&"orders") || table_names.contains(&"ORDERS"));
593    }
594
595    #[test]
596    fn test_show_tables_with_like() {
597        let db = create_test_db();
598        let executor = IntrospectionExecutor::new(&db);
599
600        let stmt = ShowTablesStmt {
601            database: None,
602            like_pattern: Some("user%".to_string()),
603            where_clause: None,
604        };
605
606        let result = executor.execute_show_tables(&stmt).unwrap();
607        // Only 'users' should match
608        assert_eq!(result.rows.len(), 1);
609    }
610
611    #[test]
612    fn test_show_databases() {
613        let db = create_test_db();
614        let executor = IntrospectionExecutor::new(&db);
615
616        let stmt = ShowDatabasesStmt { like_pattern: None, where_clause: None };
617
618        let result = executor.execute_show_databases(&stmt).unwrap();
619        assert_eq!(result.columns, vec!["Database"]);
620
621        // Should have at least the 'public' schema
622        assert!(!result.rows.is_empty());
623
624        let schema_names: Vec<&str> = result
625            .rows
626            .iter()
627            .filter_map(|r| match &r.values[0] {
628                SqlValue::Varchar(s) => Some(s.as_str()),
629                _ => None,
630            })
631            .collect();
632
633        assert!(schema_names.contains(&"public"));
634    }
635
636    #[test]
637    fn test_show_columns() {
638        let db = create_test_db();
639        let executor = IntrospectionExecutor::new(&db);
640
641        let stmt = ShowColumnsStmt {
642            table_name: "users".to_string(),
643            database: None,
644            full: false,
645            like_pattern: None,
646            where_clause: None,
647        };
648
649        let result = executor.execute_show_columns(&stmt).unwrap();
650        assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
651        assert_eq!(result.rows.len(), 3); // id, name, email
652    }
653
654    #[test]
655    fn test_show_columns_full() {
656        let db = create_test_db();
657        let executor = IntrospectionExecutor::new(&db);
658
659        let stmt = ShowColumnsStmt {
660            table_name: "users".to_string(),
661            database: None,
662            full: true,
663            like_pattern: None,
664            where_clause: None,
665        };
666
667        let result = executor.execute_show_columns(&stmt).unwrap();
668        assert_eq!(result.columns.len(), 9);
669        assert!(result.columns.contains(&"Privileges".to_string()));
670        assert!(result.columns.contains(&"Comment".to_string()));
671    }
672
673    #[test]
674    fn test_describe() {
675        let db = create_test_db();
676        let executor = IntrospectionExecutor::new(&db);
677
678        let stmt = DescribeStmt { table_name: "users".to_string(), column_pattern: None };
679
680        let result = executor.execute_describe(&stmt).unwrap();
681        assert_eq!(result.columns, vec!["Field", "Type", "Null", "Key", "Default", "Extra"]);
682        assert_eq!(result.rows.len(), 3);
683    }
684
685    #[test]
686    fn test_show_index() {
687        let db = create_test_db();
688        let executor = IntrospectionExecutor::new(&db);
689
690        let stmt = ShowIndexStmt { table_name: "users".to_string(), database: None };
691
692        let result = executor.execute_show_index(&stmt).unwrap();
693
694        // Should have at least the PRIMARY key index
695        assert!(!result.rows.is_empty());
696
697        // Check that PRIMARY KEY index is present
698        let has_primary = result.rows.iter().any(|r| {
699            match &r.values[2] {
700                // Key_name column
701                SqlValue::Varchar(s) => s == "PRIMARY",
702                _ => false,
703            }
704        });
705        assert!(has_primary);
706    }
707
708    #[test]
709    fn test_show_create_table() {
710        let db = create_test_db();
711        let executor = IntrospectionExecutor::new(&db);
712
713        let stmt = ShowCreateTableStmt { table_name: "users".to_string() };
714
715        let result = executor.execute_show_create_table(&stmt).unwrap();
716        assert_eq!(result.columns, vec!["Table", "Create Table"]);
717        assert_eq!(result.rows.len(), 1);
718
719        // Check the CREATE TABLE statement contains expected elements
720        if let SqlValue::Varchar(sql) = &result.rows[0].values[1] {
721            assert!(sql.contains("CREATE TABLE"));
722            assert!(sql.contains("users"));
723            assert!(sql.contains("id"));
724            assert!(sql.contains("name"));
725            assert!(sql.contains("email"));
726            assert!(sql.contains("PRIMARY KEY"));
727        } else {
728            panic!("Expected VARCHAR for Create Table");
729        }
730    }
731
732    #[test]
733    fn test_like_match() {
734        // Exact match
735        assert!(like_match("test", "test"));
736        assert!(like_match("test", "TEST")); // case-insensitive
737        assert!(!like_match("test", "testing"));
738
739        // % matches zero or more characters
740        assert!(like_match("test%", "test"));
741        assert!(like_match("test%", "testing"));
742        assert!(like_match("%test", "test"));
743        assert!(like_match("%test", "mytest"));
744        assert!(like_match("%test%", "test"));
745        assert!(like_match("%test%", "mytesting"));
746
747        // _ matches exactly one character
748        assert!(like_match("te_t", "test"));
749        assert!(!like_match("te_t", "tet"));
750        assert!(!like_match("te_t", "testt"));
751
752        // Escape sequences
753        assert!(like_match("te\\_t", "te_t"));
754        assert!(!like_match("te\\_t", "test"));
755    }
756}