prax_query/async_optimize/
introspect.rs

1//! Concurrent database introspection utilities.
2//!
3//! This module provides high-performance introspection by fetching
4//! table metadata (columns, indexes, foreign keys) concurrently.
5//!
6//! # Performance
7//!
8//! For a database with 50 tables, concurrent introspection can reduce
9//! total time from ~5 seconds (sequential) to ~1.5 seconds (concurrent)
10//! - approximately a 60% improvement.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use prax_query::async_optimize::introspect::{
16//!     ConcurrentIntrospector, IntrospectionConfig,
17//! };
18//!
19//! let introspector = ConcurrentIntrospector::new(IntrospectionConfig::default());
20//!
21//! // Fetch metadata for all tables concurrently
22//! let results = introspector
23//!     .introspect_tables(table_names, |name| async move {
24//!         let columns = fetch_columns(&name).await?;
25//!         let indexes = fetch_indexes(&name).await?;
26//!         let foreign_keys = fetch_foreign_keys(&name).await?;
27//!         Ok(TableMetadata { name, columns, indexes, foreign_keys })
28//!     })
29//!     .await;
30//! ```
31
32use std::future::Future;
33use std::time::{Duration, Instant};
34
35use super::concurrent::{ConcurrencyConfig, ConcurrentExecutor, TaskResult};
36
37/// Configuration for concurrent introspection.
38#[derive(Debug, Clone)]
39pub struct IntrospectionConfig {
40    /// Maximum concurrent table introspections.
41    pub max_concurrency: usize,
42    /// Timeout per table.
43    pub table_timeout: Duration,
44    /// Whether to continue on individual table errors.
45    pub continue_on_error: bool,
46    /// Batch size for multi-table queries (0 = no batching).
47    pub batch_size: usize,
48}
49
50impl Default for IntrospectionConfig {
51    fn default() -> Self {
52        Self {
53            max_concurrency: 8,
54            table_timeout: Duration::from_secs(30),
55            continue_on_error: true,
56            batch_size: 0, // No batching by default
57        }
58    }
59}
60
61impl IntrospectionConfig {
62    /// Create a config optimized for large databases.
63    #[must_use]
64    pub fn for_large_database() -> Self {
65        Self {
66            max_concurrency: 16,
67            table_timeout: Duration::from_secs(60),
68            continue_on_error: true,
69            batch_size: 50,
70        }
71    }
72
73    /// Create a config optimized for small databases.
74    #[must_use]
75    pub fn for_small_database() -> Self {
76        Self {
77            max_concurrency: 4,
78            table_timeout: Duration::from_secs(15),
79            continue_on_error: true,
80            batch_size: 0,
81        }
82    }
83
84    /// Set maximum concurrency.
85    #[must_use]
86    pub fn with_max_concurrency(mut self, max: usize) -> Self {
87        self.max_concurrency = max.max(1);
88        self
89    }
90
91    /// Set table timeout.
92    #[must_use]
93    pub fn with_table_timeout(mut self, timeout: Duration) -> Self {
94        self.table_timeout = timeout;
95        self
96    }
97
98    /// Set batch size for multi-table queries.
99    #[must_use]
100    pub fn with_batch_size(mut self, size: usize) -> Self {
101        self.batch_size = size;
102        self
103    }
104}
105
106/// Metadata for a single table.
107#[derive(Debug, Clone)]
108pub struct TableMetadata {
109    /// Table name.
110    pub name: String,
111    /// Column information.
112    pub columns: Vec<ColumnMetadata>,
113    /// Index information.
114    pub indexes: Vec<IndexMetadata>,
115    /// Foreign key information.
116    pub foreign_keys: Vec<ForeignKeyMetadata>,
117    /// Primary key columns.
118    pub primary_key: Vec<String>,
119    /// Table comment.
120    pub comment: Option<String>,
121}
122
123impl TableMetadata {
124    /// Create empty metadata for a table.
125    pub fn new(name: impl Into<String>) -> Self {
126        Self {
127            name: name.into(),
128            columns: Vec::new(),
129            indexes: Vec::new(),
130            foreign_keys: Vec::new(),
131            primary_key: Vec::new(),
132            comment: None,
133        }
134    }
135}
136
137/// Column metadata.
138#[derive(Debug, Clone)]
139pub struct ColumnMetadata {
140    /// Column name.
141    pub name: String,
142    /// Database type.
143    pub db_type: String,
144    /// Whether nullable.
145    pub nullable: bool,
146    /// Default value.
147    pub default: Option<String>,
148    /// Whether auto-increment.
149    pub auto_increment: bool,
150    /// Whether part of primary key.
151    pub is_primary_key: bool,
152}
153
154/// Index metadata.
155#[derive(Debug, Clone)]
156pub struct IndexMetadata {
157    /// Index name.
158    pub name: String,
159    /// Indexed columns.
160    pub columns: Vec<String>,
161    /// Whether unique.
162    pub is_unique: bool,
163    /// Whether primary key index.
164    pub is_primary: bool,
165    /// Index type (btree, hash, etc).
166    pub index_type: Option<String>,
167}
168
169/// Foreign key metadata.
170#[derive(Debug, Clone)]
171pub struct ForeignKeyMetadata {
172    /// Constraint name.
173    pub name: String,
174    /// Columns in this table.
175    pub columns: Vec<String>,
176    /// Referenced table.
177    pub referenced_table: String,
178    /// Referenced columns.
179    pub referenced_columns: Vec<String>,
180    /// On delete action.
181    pub on_delete: String,
182    /// On update action.
183    pub on_update: String,
184}
185
186/// Result of concurrent introspection.
187#[derive(Debug)]
188pub struct IntrospectionResult {
189    /// Successfully introspected tables.
190    pub tables: Vec<TableMetadata>,
191    /// Tables that failed to introspect.
192    pub errors: Vec<IntrospectionError>,
193    /// Total introspection time.
194    pub duration: Duration,
195    /// Maximum concurrent operations observed.
196    pub max_concurrency: usize,
197}
198
199impl IntrospectionResult {
200    /// Check if all tables were introspected successfully.
201    pub fn is_complete(&self) -> bool {
202        self.errors.is_empty()
203    }
204
205    /// Get table metadata by name.
206    pub fn get_table(&self, name: &str) -> Option<&TableMetadata> {
207        self.tables.iter().find(|t| t.name == name)
208    }
209}
210
211/// Error during table introspection.
212#[derive(Debug, Clone)]
213pub struct IntrospectionError {
214    /// Table name.
215    pub table: String,
216    /// Error message.
217    pub message: String,
218    /// Whether this was a timeout.
219    pub is_timeout: bool,
220}
221
222impl std::fmt::Display for IntrospectionError {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        if self.is_timeout {
225            write!(f, "Timeout introspecting table '{}': {}", self.table, self.message)
226        } else {
227            write!(f, "Error introspecting table '{}': {}", self.table, self.message)
228        }
229    }
230}
231
232/// Concurrent database introspector.
233pub struct ConcurrentIntrospector {
234    config: IntrospectionConfig,
235    executor: ConcurrentExecutor,
236}
237
238impl ConcurrentIntrospector {
239    /// Create a new concurrent introspector.
240    pub fn new(config: IntrospectionConfig) -> Self {
241        let executor_config = ConcurrencyConfig::default()
242            .with_max_concurrency(config.max_concurrency)
243            .with_timeout(config.table_timeout)
244            .with_continue_on_error(config.continue_on_error);
245
246        Self {
247            config,
248            executor: ConcurrentExecutor::new(executor_config),
249        }
250    }
251
252    /// Introspect tables concurrently using a custom operation.
253    ///
254    /// The `operation` function is called for each table name and should
255    /// return the table metadata.
256    pub async fn introspect_tables<F, Fut>(
257        &self,
258        table_names: Vec<String>,
259        operation: F,
260    ) -> IntrospectionResult
261    where
262        F: Fn(String) -> Fut + Clone + Send + 'static,
263        Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
264    {
265        let start = Instant::now();
266
267        // Create tasks for each table
268        let tasks: Vec<_> = table_names
269            .into_iter()
270            .map(|name| {
271                let op = operation.clone();
272                move || op(name)
273            })
274            .collect();
275
276        let (results, stats) = self.executor.execute_all(tasks).await;
277
278        // Separate successes and failures
279        let mut tables = Vec::new();
280        let mut errors = Vec::new();
281
282        for result in results {
283            match result {
284                TaskResult::Success { value, .. } => {
285                    tables.push(value);
286                }
287                TaskResult::Error(e) => {
288                    errors.push(IntrospectionError {
289                        table: format!("task_{}", e.task_id),
290                        message: e.message,
291                        is_timeout: e.is_timeout,
292                    });
293                }
294            }
295        }
296
297        IntrospectionResult {
298            tables,
299            errors,
300            duration: start.elapsed(),
301            max_concurrency: stats.max_concurrent,
302        }
303    }
304
305    /// Introspect tables with associated names for error tracking.
306    pub async fn introspect_named<F, Fut>(
307        &self,
308        table_names: Vec<String>,
309        operation: F,
310    ) -> IntrospectionResult
311    where
312        F: Fn(String) -> Fut + Clone + Send + 'static,
313        Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
314    {
315        let start = Instant::now();
316        let names_for_errors: Vec<_> = table_names.clone();
317
318        // Create tasks for each table
319        let tasks: Vec<_> = table_names
320            .into_iter()
321            .map(|name| {
322                let op = operation.clone();
323                move || op(name)
324            })
325            .collect();
326
327        let (results, stats) = self.executor.execute_all(tasks).await;
328
329        // Separate successes and failures
330        let mut tables = Vec::new();
331        let mut errors = Vec::new();
332
333        for (idx, result) in results.into_iter().enumerate() {
334            match result {
335                TaskResult::Success { value, .. } => {
336                    tables.push(value);
337                }
338                TaskResult::Error(e) => {
339                    let table_name = names_for_errors
340                        .get(idx)
341                        .cloned()
342                        .unwrap_or_else(|| format!("unknown_{}", idx));
343                    errors.push(IntrospectionError {
344                        table: table_name,
345                        message: e.message,
346                        is_timeout: e.is_timeout,
347                    });
348                }
349            }
350        }
351
352        IntrospectionResult {
353            tables,
354            errors,
355            duration: start.elapsed(),
356            max_concurrency: stats.max_concurrent,
357        }
358    }
359
360    /// Get the configuration.
361    pub fn config(&self) -> &IntrospectionConfig {
362        &self.config
363    }
364}
365
366/// Helper to batch introspection queries.
367///
368/// Some databases allow fetching metadata for multiple tables in a single query.
369/// This helper creates batches of table names for such queries.
370pub struct BatchIntrospector {
371    batch_size: usize,
372}
373
374impl BatchIntrospector {
375    /// Create a new batch introspector.
376    pub fn new(batch_size: usize) -> Self {
377        Self {
378            batch_size: batch_size.max(1),
379        }
380    }
381
382    /// Create batches of table names.
383    pub fn create_batches(&self, tables: Vec<String>) -> Vec<Vec<String>> {
384        tables.chunks(self.batch_size).map(|c| c.to_vec()).collect()
385    }
386
387    /// Execute batched introspection.
388    pub async fn introspect_batched<F, Fut>(
389        &self,
390        tables: Vec<String>,
391        max_concurrency: usize,
392        operation: F,
393    ) -> IntrospectionResult
394    where
395        F: Fn(Vec<String>) -> Fut + Clone + Send + 'static,
396        Fut: Future<Output = Result<Vec<TableMetadata>, String>> + Send + 'static,
397    {
398        let start = Instant::now();
399        let batches = self.create_batches(tables);
400
401        let config = IntrospectionConfig::default().with_max_concurrency(max_concurrency);
402        let executor = ConcurrentExecutor::new(
403            ConcurrencyConfig::default()
404                .with_max_concurrency(config.max_concurrency)
405                .with_continue_on_error(true),
406        );
407
408        let tasks: Vec<_> = batches
409            .into_iter()
410            .map(|batch| {
411                let op = operation.clone();
412                move || op(batch)
413            })
414            .collect();
415
416        let (results, stats) = executor.execute_all(tasks).await;
417
418        // Flatten results
419        let mut tables = Vec::new();
420        let mut errors = Vec::new();
421
422        for result in results {
423            match result {
424                TaskResult::Success { value, .. } => {
425                    tables.extend(value);
426                }
427                TaskResult::Error(e) => {
428                    errors.push(IntrospectionError {
429                        table: format!("batch_{}", e.task_id),
430                        message: e.message,
431                        is_timeout: e.is_timeout,
432                    });
433                }
434            }
435        }
436
437        IntrospectionResult {
438            tables,
439            errors,
440            duration: start.elapsed(),
441            max_concurrency: stats.max_concurrent,
442        }
443    }
444}
445
446/// Introspection phase for tracking progress.
447#[derive(Debug, Clone, Copy, PartialEq, Eq)]
448pub enum IntrospectionPhase {
449    /// Fetching table list.
450    Tables,
451    /// Fetching columns.
452    Columns,
453    /// Fetching primary keys.
454    PrimaryKeys,
455    /// Fetching foreign keys.
456    ForeignKeys,
457    /// Fetching indexes.
458    Indexes,
459    /// Fetching enums.
460    Enums,
461    /// Fetching views.
462    Views,
463    /// Complete.
464    Complete,
465}
466
467impl IntrospectionPhase {
468    /// Get the next phase.
469    pub fn next(self) -> Self {
470        match self {
471            Self::Tables => Self::Columns,
472            Self::Columns => Self::PrimaryKeys,
473            Self::PrimaryKeys => Self::ForeignKeys,
474            Self::ForeignKeys => Self::Indexes,
475            Self::Indexes => Self::Enums,
476            Self::Enums => Self::Views,
477            Self::Views => Self::Complete,
478            Self::Complete => Self::Complete,
479        }
480    }
481
482    /// Get human-readable name.
483    pub fn name(&self) -> &'static str {
484        match self {
485            Self::Tables => "tables",
486            Self::Columns => "columns",
487            Self::PrimaryKeys => "primary keys",
488            Self::ForeignKeys => "foreign keys",
489            Self::Indexes => "indexes",
490            Self::Enums => "enums",
491            Self::Views => "views",
492            Self::Complete => "complete",
493        }
494    }
495}
496
497/// Progress callback for introspection.
498pub type ProgressCallback = Box<dyn Fn(IntrospectionPhase, usize, usize) + Send + Sync>;
499
500/// Builder for creating concurrent introspection with progress reporting.
501pub struct IntrospectorBuilder {
502    config: IntrospectionConfig,
503    progress_callback: Option<ProgressCallback>,
504}
505
506impl IntrospectorBuilder {
507    /// Create a new builder.
508    pub fn new() -> Self {
509        Self {
510            config: IntrospectionConfig::default(),
511            progress_callback: None,
512        }
513    }
514
515    /// Set the configuration.
516    pub fn config(mut self, config: IntrospectionConfig) -> Self {
517        self.config = config;
518        self
519    }
520
521    /// Set progress callback.
522    pub fn on_progress<F>(mut self, callback: F) -> Self
523    where
524        F: Fn(IntrospectionPhase, usize, usize) + Send + Sync + 'static,
525    {
526        self.progress_callback = Some(Box::new(callback));
527        self
528    }
529
530    /// Build the introspector.
531    pub fn build(self) -> ConcurrentIntrospector {
532        ConcurrentIntrospector::new(self.config)
533    }
534}
535
536impl Default for IntrospectorBuilder {
537    fn default() -> Self {
538        Self::new()
539    }
540}
541
542/// SQL query templates for concurrent introspection.
543pub mod queries {
544    use crate::sql::DatabaseType;
545
546    /// Generate SQL to fetch all columns for multiple tables at once.
547    pub fn batch_columns_query(db_type: DatabaseType, tables: &[&str], schema: Option<&str>) -> String {
548        let schema_name = schema.unwrap_or("public");
549        let table_list = tables
550            .iter()
551            .map(|t| format!("'{}'", t))
552            .collect::<Vec<_>>()
553            .join(", ");
554
555        match db_type {
556            DatabaseType::PostgreSQL => {
557                format!(
558                    r#"
559                    SELECT
560                        c.table_name,
561                        c.column_name,
562                        c.data_type,
563                        c.udt_name,
564                        c.is_nullable = 'YES' as nullable,
565                        c.column_default,
566                        c.character_maximum_length,
567                        c.numeric_precision,
568                        c.numeric_scale,
569                        col_description(
570                            (c.table_schema || '.' || c.table_name)::regclass,
571                            c.ordinal_position
572                        ) as comment,
573                        CASE
574                            WHEN c.column_default LIKE 'nextval%' THEN true
575                            WHEN c.is_identity = 'YES' THEN true
576                            ELSE false
577                        END as auto_increment
578                    FROM information_schema.columns c
579                    WHERE c.table_schema = '{}'
580                    AND c.table_name IN ({})
581                    ORDER BY c.table_name, c.ordinal_position
582                    "#,
583                    schema_name, table_list
584                )
585            }
586            DatabaseType::MySQL => {
587                format!(
588                    r#"
589                    SELECT
590                        c.TABLE_NAME,
591                        c.COLUMN_NAME,
592                        c.DATA_TYPE,
593                        c.COLUMN_TYPE,
594                        c.IS_NULLABLE = 'YES' as nullable,
595                        c.COLUMN_DEFAULT,
596                        c.CHARACTER_MAXIMUM_LENGTH,
597                        c.NUMERIC_PRECISION,
598                        c.NUMERIC_SCALE,
599                        c.COLUMN_COMMENT,
600                        c.EXTRA LIKE '%auto_increment%' as auto_increment
601                    FROM information_schema.COLUMNS c
602                    WHERE c.TABLE_SCHEMA = DATABASE()
603                    AND c.TABLE_NAME IN ({})
604                    ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION
605                    "#,
606                    table_list
607                )
608            }
609            _ => String::new(),
610        }
611    }
612
613    /// Generate SQL to fetch all indexes for multiple tables at once.
614    pub fn batch_indexes_query(db_type: DatabaseType, tables: &[&str], schema: Option<&str>) -> String {
615        let schema_name = schema.unwrap_or("public");
616        let table_list = tables
617            .iter()
618            .map(|t| format!("'{}'", t))
619            .collect::<Vec<_>>()
620            .join(", ");
621
622        match db_type {
623            DatabaseType::PostgreSQL => {
624                format!(
625                    r#"
626                    SELECT
627                        t.relname as table_name,
628                        i.relname as index_name,
629                        a.attname as column_name,
630                        ix.indisunique as is_unique,
631                        ix.indisprimary as is_primary,
632                        am.amname as index_type
633                    FROM pg_class t
634                    JOIN pg_index ix ON t.oid = ix.indrelid
635                    JOIN pg_class i ON i.oid = ix.indexrelid
636                    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
637                    JOIN pg_am am ON i.relam = am.oid
638                    JOIN pg_namespace n ON n.oid = t.relnamespace
639                    WHERE n.nspname = '{}'
640                    AND t.relname IN ({})
641                    ORDER BY t.relname, i.relname, a.attnum
642                    "#,
643                    schema_name, table_list
644                )
645            }
646            DatabaseType::MySQL => {
647                format!(
648                    r#"
649                    SELECT
650                        s.TABLE_NAME,
651                        s.INDEX_NAME,
652                        s.COLUMN_NAME,
653                        s.NON_UNIQUE = 0 as is_unique,
654                        s.INDEX_NAME = 'PRIMARY' as is_primary,
655                        s.INDEX_TYPE
656                    FROM information_schema.STATISTICS s
657                    WHERE s.TABLE_SCHEMA = DATABASE()
658                    AND s.TABLE_NAME IN ({})
659                    ORDER BY s.TABLE_NAME, s.INDEX_NAME, s.SEQ_IN_INDEX
660                    "#,
661                    table_list
662                )
663            }
664            _ => String::new(),
665        }
666    }
667
668    /// Generate SQL to fetch all foreign keys for multiple tables at once.
669    pub fn batch_foreign_keys_query(db_type: DatabaseType, tables: &[&str], schema: Option<&str>) -> String {
670        let schema_name = schema.unwrap_or("public");
671        let table_list = tables
672            .iter()
673            .map(|t| format!("'{}'", t))
674            .collect::<Vec<_>>()
675            .join(", ");
676
677        match db_type {
678            DatabaseType::PostgreSQL => {
679                format!(
680                    r#"
681                    SELECT
682                        tc.table_name,
683                        tc.constraint_name,
684                        kcu.column_name,
685                        ccu.table_name AS foreign_table,
686                        ccu.table_schema AS foreign_schema,
687                        ccu.column_name AS foreign_column,
688                        rc.delete_rule,
689                        rc.update_rule
690                    FROM information_schema.table_constraints tc
691                    JOIN information_schema.key_column_usage kcu
692                        ON tc.constraint_name = kcu.constraint_name
693                        AND tc.table_schema = kcu.table_schema
694                    JOIN information_schema.constraint_column_usage ccu
695                        ON ccu.constraint_name = tc.constraint_name
696                        AND ccu.table_schema = tc.table_schema
697                    JOIN information_schema.referential_constraints rc
698                        ON tc.constraint_name = rc.constraint_name
699                        AND tc.table_schema = rc.constraint_schema
700                    WHERE tc.constraint_type = 'FOREIGN KEY'
701                    AND tc.table_schema = '{}'
702                    AND tc.table_name IN ({})
703                    ORDER BY tc.table_name, tc.constraint_name, kcu.ordinal_position
704                    "#,
705                    schema_name, table_list
706                )
707            }
708            DatabaseType::MySQL => {
709                format!(
710                    r#"
711                    SELECT
712                        kcu.TABLE_NAME,
713                        kcu.CONSTRAINT_NAME,
714                        kcu.COLUMN_NAME,
715                        kcu.REFERENCED_TABLE_NAME,
716                        kcu.REFERENCED_TABLE_SCHEMA,
717                        kcu.REFERENCED_COLUMN_NAME,
718                        rc.DELETE_RULE,
719                        rc.UPDATE_RULE
720                    FROM information_schema.KEY_COLUMN_USAGE kcu
721                    JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
722                        ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
723                        AND kcu.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
724                    WHERE kcu.TABLE_SCHEMA = DATABASE()
725                    AND kcu.REFERENCED_TABLE_NAME IS NOT NULL
726                    AND kcu.TABLE_NAME IN ({})
727                    ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME, kcu.ORDINAL_POSITION
728                    "#,
729                    table_list
730                )
731            }
732            _ => String::new(),
733        }
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740
741    #[tokio::test]
742    async fn test_concurrent_introspector() {
743        let config = IntrospectionConfig::default().with_max_concurrency(4);
744        let introspector = ConcurrentIntrospector::new(config);
745
746        let tables = vec!["users".to_string(), "posts".to_string(), "comments".to_string()];
747
748        let result = introspector
749            .introspect_tables(tables, |name| async move {
750                // Simulate introspection
751                tokio::time::sleep(Duration::from_millis(10)).await;
752                Ok(TableMetadata::new(name))
753            })
754            .await;
755
756        assert_eq!(result.tables.len(), 3);
757        assert!(result.errors.is_empty());
758    }
759
760    #[tokio::test]
761    async fn test_batch_introspector() {
762        let batch = BatchIntrospector::new(2);
763
764        let tables = vec![
765            "t1".to_string(),
766            "t2".to_string(),
767            "t3".to_string(),
768            "t4".to_string(),
769            "t5".to_string(),
770        ];
771
772        let batches = batch.create_batches(tables);
773        assert_eq!(batches.len(), 3);
774        assert_eq!(batches[0].len(), 2);
775        assert_eq!(batches[1].len(), 2);
776        assert_eq!(batches[2].len(), 1);
777    }
778
779    #[tokio::test]
780    async fn test_introspection_with_errors() {
781        let config = IntrospectionConfig::default().with_max_concurrency(2);
782        let introspector = ConcurrentIntrospector::new(config);
783
784        let tables = vec!["good1".to_string(), "bad".to_string(), "good2".to_string()];
785
786        let result = introspector
787            .introspect_named(tables, |name| async move {
788                if name == "bad" {
789                    Err("Table not found".to_string())
790                } else {
791                    Ok(TableMetadata::new(name))
792                }
793            })
794            .await;
795
796        assert_eq!(result.tables.len(), 2);
797        assert_eq!(result.errors.len(), 1);
798        assert_eq!(result.errors[0].table, "bad");
799    }
800
801    #[test]
802    fn test_introspection_phase_progression() {
803        let mut phase = IntrospectionPhase::Tables;
804
805        assert_eq!(phase.name(), "tables");
806
807        phase = phase.next();
808        assert_eq!(phase, IntrospectionPhase::Columns);
809
810        phase = phase.next();
811        assert_eq!(phase, IntrospectionPhase::PrimaryKeys);
812
813        // Progress to complete
814        while phase != IntrospectionPhase::Complete {
815            phase = phase.next();
816        }
817
818        // Should stay at complete
819        assert_eq!(phase.next(), IntrospectionPhase::Complete);
820    }
821
822    #[test]
823    fn test_batch_columns_query() {
824        let sql = queries::batch_columns_query(
825            crate::sql::DatabaseType::PostgreSQL,
826            &["users", "posts"],
827            Some("public"),
828        );
829
830        assert!(sql.contains("information_schema.columns"));
831        assert!(sql.contains("'users', 'posts'"));
832    }
833}
834