Skip to main content

prax_query/async_optimize/
introspect.rs

1//! Concurrent database introspection utilities.
2//!
3//! This module provides high-performance introspection by fetching
4//! table metadata (columns, indexes, foreign keys) concurrently.
5//!
6//! # Performance
7//!
8//! For a database with 50 tables, concurrent introspection can reduce
9//! total time from ~5 seconds (sequential) to ~1.5 seconds (concurrent)
10//! - approximately a 60% improvement.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use prax_query::async_optimize::introspect::{
16//!     ConcurrentIntrospector, IntrospectionConfig,
17//! };
18//!
19//! let introspector = ConcurrentIntrospector::new(IntrospectionConfig::default());
20//!
21//! // Fetch metadata for all tables concurrently
22//! let results = introspector
23//!     .introspect_tables(table_names, |name| async move {
24//!         let columns = fetch_columns(&name).await?;
25//!         let indexes = fetch_indexes(&name).await?;
26//!         let foreign_keys = fetch_foreign_keys(&name).await?;
27//!         Ok(TableMetadata { name, columns, indexes, foreign_keys })
28//!     })
29//!     .await;
30//! ```
31
32use std::future::Future;
33use std::time::{Duration, Instant};
34
35use super::concurrent::{ConcurrencyConfig, ConcurrentExecutor, TaskResult};
36
37/// Configuration for concurrent introspection.
38#[derive(Debug, Clone)]
39pub struct IntrospectionConfig {
40    /// Maximum concurrent table introspections.
41    pub max_concurrency: usize,
42    /// Timeout per table.
43    pub table_timeout: Duration,
44    /// Whether to continue on individual table errors.
45    pub continue_on_error: bool,
46    /// Batch size for multi-table queries (0 = no batching).
47    pub batch_size: usize,
48}
49
50impl Default for IntrospectionConfig {
51    fn default() -> Self {
52        Self {
53            max_concurrency: 8,
54            table_timeout: Duration::from_secs(30),
55            continue_on_error: true,
56            batch_size: 0, // No batching by default
57        }
58    }
59}
60
61impl IntrospectionConfig {
62    /// Create a config optimized for large databases.
63    #[must_use]
64    pub fn for_large_database() -> Self {
65        Self {
66            max_concurrency: 16,
67            table_timeout: Duration::from_secs(60),
68            continue_on_error: true,
69            batch_size: 50,
70        }
71    }
72
73    /// Create a config optimized for small databases.
74    #[must_use]
75    pub fn for_small_database() -> Self {
76        Self {
77            max_concurrency: 4,
78            table_timeout: Duration::from_secs(15),
79            continue_on_error: true,
80            batch_size: 0,
81        }
82    }
83
84    /// Set maximum concurrency.
85    #[must_use]
86    pub fn with_max_concurrency(mut self, max: usize) -> Self {
87        self.max_concurrency = max.max(1);
88        self
89    }
90
91    /// Set table timeout.
92    #[must_use]
93    pub fn with_table_timeout(mut self, timeout: Duration) -> Self {
94        self.table_timeout = timeout;
95        self
96    }
97
98    /// Set batch size for multi-table queries.
99    #[must_use]
100    pub fn with_batch_size(mut self, size: usize) -> Self {
101        self.batch_size = size;
102        self
103    }
104}
105
106/// Metadata for a single table.
107#[derive(Debug, Clone)]
108pub struct TableMetadata {
109    /// Table name.
110    pub name: String,
111    /// Column information.
112    pub columns: Vec<ColumnMetadata>,
113    /// Index information.
114    pub indexes: Vec<IndexMetadata>,
115    /// Foreign key information.
116    pub foreign_keys: Vec<ForeignKeyMetadata>,
117    /// Primary key columns.
118    pub primary_key: Vec<String>,
119    /// Table comment.
120    pub comment: Option<String>,
121}
122
123impl TableMetadata {
124    /// Create empty metadata for a table.
125    pub fn new(name: impl Into<String>) -> Self {
126        Self {
127            name: name.into(),
128            columns: Vec::new(),
129            indexes: Vec::new(),
130            foreign_keys: Vec::new(),
131            primary_key: Vec::new(),
132            comment: None,
133        }
134    }
135}
136
137/// Column metadata.
138#[derive(Debug, Clone)]
139pub struct ColumnMetadata {
140    /// Column name.
141    pub name: String,
142    /// Database type.
143    pub db_type: String,
144    /// Whether nullable.
145    pub nullable: bool,
146    /// Default value.
147    pub default: Option<String>,
148    /// Whether auto-increment.
149    pub auto_increment: bool,
150    /// Whether part of primary key.
151    pub is_primary_key: bool,
152}
153
154/// Index metadata.
155#[derive(Debug, Clone)]
156pub struct IndexMetadata {
157    /// Index name.
158    pub name: String,
159    /// Indexed columns.
160    pub columns: Vec<String>,
161    /// Whether unique.
162    pub is_unique: bool,
163    /// Whether primary key index.
164    pub is_primary: bool,
165    /// Index type (btree, hash, etc).
166    pub index_type: Option<String>,
167}
168
169/// Foreign key metadata.
170#[derive(Debug, Clone)]
171pub struct ForeignKeyMetadata {
172    /// Constraint name.
173    pub name: String,
174    /// Columns in this table.
175    pub columns: Vec<String>,
176    /// Referenced table.
177    pub referenced_table: String,
178    /// Referenced columns.
179    pub referenced_columns: Vec<String>,
180    /// On delete action.
181    pub on_delete: String,
182    /// On update action.
183    pub on_update: String,
184}
185
186/// Result of concurrent introspection.
187#[derive(Debug)]
188pub struct IntrospectionResult {
189    /// Successfully introspected tables.
190    pub tables: Vec<TableMetadata>,
191    /// Tables that failed to introspect.
192    pub errors: Vec<IntrospectionError>,
193    /// Total introspection time.
194    pub duration: Duration,
195    /// Maximum concurrent operations observed.
196    pub max_concurrency: usize,
197}
198
199impl IntrospectionResult {
200    /// Check if all tables were introspected successfully.
201    pub fn is_complete(&self) -> bool {
202        self.errors.is_empty()
203    }
204
205    /// Get table metadata by name.
206    pub fn get_table(&self, name: &str) -> Option<&TableMetadata> {
207        self.tables.iter().find(|t| t.name == name)
208    }
209}
210
211/// Error during table introspection.
212#[derive(Debug, Clone)]
213pub struct IntrospectionError {
214    /// Table name.
215    pub table: String,
216    /// Error message.
217    pub message: String,
218    /// Whether this was a timeout.
219    pub is_timeout: bool,
220}
221
222impl std::fmt::Display for IntrospectionError {
223    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224        if self.is_timeout {
225            write!(
226                f,
227                "Timeout introspecting table '{}': {}",
228                self.table, self.message
229            )
230        } else {
231            write!(
232                f,
233                "Error introspecting table '{}': {}",
234                self.table, self.message
235            )
236        }
237    }
238}
239
240/// Concurrent database introspector.
241pub struct ConcurrentIntrospector {
242    config: IntrospectionConfig,
243    executor: ConcurrentExecutor,
244}
245
246impl ConcurrentIntrospector {
247    /// Create a new concurrent introspector.
248    pub fn new(config: IntrospectionConfig) -> Self {
249        let executor_config = ConcurrencyConfig::default()
250            .with_max_concurrency(config.max_concurrency)
251            .with_timeout(config.table_timeout)
252            .with_continue_on_error(config.continue_on_error);
253
254        Self {
255            config,
256            executor: ConcurrentExecutor::new(executor_config),
257        }
258    }
259
260    /// Introspect tables concurrently using a custom operation.
261    ///
262    /// The `operation` function is called for each table name and should
263    /// return the table metadata.
264    pub async fn introspect_tables<F, Fut>(
265        &self,
266        table_names: Vec<String>,
267        operation: F,
268    ) -> IntrospectionResult
269    where
270        F: Fn(String) -> Fut + Clone + Send + 'static,
271        Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
272    {
273        let start = Instant::now();
274
275        // Create tasks for each table
276        let tasks: Vec<_> = table_names
277            .into_iter()
278            .map(|name| {
279                let op = operation.clone();
280                move || op(name)
281            })
282            .collect();
283
284        let (results, stats) = self.executor.execute_all(tasks).await;
285
286        // Separate successes and failures
287        let mut tables = Vec::new();
288        let mut errors = Vec::new();
289
290        for result in results {
291            match result {
292                TaskResult::Success { value, .. } => {
293                    tables.push(value);
294                }
295                TaskResult::Error(e) => {
296                    errors.push(IntrospectionError {
297                        table: format!("task_{}", e.task_id),
298                        message: e.message,
299                        is_timeout: e.is_timeout,
300                    });
301                }
302            }
303        }
304
305        IntrospectionResult {
306            tables,
307            errors,
308            duration: start.elapsed(),
309            max_concurrency: stats.max_concurrent,
310        }
311    }
312
313    /// Introspect tables with associated names for error tracking.
314    pub async fn introspect_named<F, Fut>(
315        &self,
316        table_names: Vec<String>,
317        operation: F,
318    ) -> IntrospectionResult
319    where
320        F: Fn(String) -> Fut + Clone + Send + 'static,
321        Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
322    {
323        let start = Instant::now();
324        let names_for_errors: Vec<_> = table_names.clone();
325
326        // Create tasks for each table
327        let tasks: Vec<_> = table_names
328            .into_iter()
329            .map(|name| {
330                let op = operation.clone();
331                move || op(name)
332            })
333            .collect();
334
335        let (results, stats) = self.executor.execute_all(tasks).await;
336
337        // Separate successes and failures
338        let mut tables = Vec::new();
339        let mut errors = Vec::new();
340
341        for (idx, result) in results.into_iter().enumerate() {
342            match result {
343                TaskResult::Success { value, .. } => {
344                    tables.push(value);
345                }
346                TaskResult::Error(e) => {
347                    let table_name = names_for_errors
348                        .get(idx)
349                        .cloned()
350                        .unwrap_or_else(|| format!("unknown_{}", idx));
351                    errors.push(IntrospectionError {
352                        table: table_name,
353                        message: e.message,
354                        is_timeout: e.is_timeout,
355                    });
356                }
357            }
358        }
359
360        IntrospectionResult {
361            tables,
362            errors,
363            duration: start.elapsed(),
364            max_concurrency: stats.max_concurrent,
365        }
366    }
367
368    /// Get the configuration.
369    pub fn config(&self) -> &IntrospectionConfig {
370        &self.config
371    }
372}
373
374/// Helper to batch introspection queries.
375///
376/// Some databases allow fetching metadata for multiple tables in a single query.
377/// This helper creates batches of table names for such queries.
378pub struct BatchIntrospector {
379    batch_size: usize,
380}
381
382impl BatchIntrospector {
383    /// Create a new batch introspector.
384    pub fn new(batch_size: usize) -> Self {
385        Self {
386            batch_size: batch_size.max(1),
387        }
388    }
389
390    /// Create batches of table names.
391    pub fn create_batches(&self, tables: Vec<String>) -> Vec<Vec<String>> {
392        tables.chunks(self.batch_size).map(|c| c.to_vec()).collect()
393    }
394
395    /// Execute batched introspection.
396    pub async fn introspect_batched<F, Fut>(
397        &self,
398        tables: Vec<String>,
399        max_concurrency: usize,
400        operation: F,
401    ) -> IntrospectionResult
402    where
403        F: Fn(Vec<String>) -> Fut + Clone + Send + 'static,
404        Fut: Future<Output = Result<Vec<TableMetadata>, String>> + Send + 'static,
405    {
406        let start = Instant::now();
407        let batches = self.create_batches(tables);
408
409        let config = IntrospectionConfig::default().with_max_concurrency(max_concurrency);
410        let executor = ConcurrentExecutor::new(
411            ConcurrencyConfig::default()
412                .with_max_concurrency(config.max_concurrency)
413                .with_continue_on_error(true),
414        );
415
416        let tasks: Vec<_> = batches
417            .into_iter()
418            .map(|batch| {
419                let op = operation.clone();
420                move || op(batch)
421            })
422            .collect();
423
424        let (results, stats) = executor.execute_all(tasks).await;
425
426        // Flatten results
427        let mut tables = Vec::new();
428        let mut errors = Vec::new();
429
430        for result in results {
431            match result {
432                TaskResult::Success { value, .. } => {
433                    tables.extend(value);
434                }
435                TaskResult::Error(e) => {
436                    errors.push(IntrospectionError {
437                        table: format!("batch_{}", e.task_id),
438                        message: e.message,
439                        is_timeout: e.is_timeout,
440                    });
441                }
442            }
443        }
444
445        IntrospectionResult {
446            tables,
447            errors,
448            duration: start.elapsed(),
449            max_concurrency: stats.max_concurrent,
450        }
451    }
452}
453
454/// Introspection phase for tracking progress.
455#[derive(Debug, Clone, Copy, PartialEq, Eq)]
456pub enum IntrospectionPhase {
457    /// Fetching table list.
458    Tables,
459    /// Fetching columns.
460    Columns,
461    /// Fetching primary keys.
462    PrimaryKeys,
463    /// Fetching foreign keys.
464    ForeignKeys,
465    /// Fetching indexes.
466    Indexes,
467    /// Fetching enums.
468    Enums,
469    /// Fetching views.
470    Views,
471    /// Complete.
472    Complete,
473}
474
475impl IntrospectionPhase {
476    /// Get the next phase.
477    pub fn next(self) -> Self {
478        match self {
479            Self::Tables => Self::Columns,
480            Self::Columns => Self::PrimaryKeys,
481            Self::PrimaryKeys => Self::ForeignKeys,
482            Self::ForeignKeys => Self::Indexes,
483            Self::Indexes => Self::Enums,
484            Self::Enums => Self::Views,
485            Self::Views => Self::Complete,
486            Self::Complete => Self::Complete,
487        }
488    }
489
490    /// Get human-readable name.
491    pub fn name(&self) -> &'static str {
492        match self {
493            Self::Tables => "tables",
494            Self::Columns => "columns",
495            Self::PrimaryKeys => "primary keys",
496            Self::ForeignKeys => "foreign keys",
497            Self::Indexes => "indexes",
498            Self::Enums => "enums",
499            Self::Views => "views",
500            Self::Complete => "complete",
501        }
502    }
503}
504
505/// Progress callback for introspection.
506pub type ProgressCallback = Box<dyn Fn(IntrospectionPhase, usize, usize) + Send + Sync>;
507
508/// Builder for creating concurrent introspection with progress reporting.
509pub struct IntrospectorBuilder {
510    config: IntrospectionConfig,
511    progress_callback: Option<ProgressCallback>,
512}
513
514impl IntrospectorBuilder {
515    /// Create a new builder.
516    pub fn new() -> Self {
517        Self {
518            config: IntrospectionConfig::default(),
519            progress_callback: None,
520        }
521    }
522
523    /// Set the configuration.
524    pub fn config(mut self, config: IntrospectionConfig) -> Self {
525        self.config = config;
526        self
527    }
528
529    /// Set progress callback.
530    pub fn on_progress<F>(mut self, callback: F) -> Self
531    where
532        F: Fn(IntrospectionPhase, usize, usize) + Send + Sync + 'static,
533    {
534        self.progress_callback = Some(Box::new(callback));
535        self
536    }
537
538    /// Build the introspector.
539    pub fn build(self) -> ConcurrentIntrospector {
540        ConcurrentIntrospector::new(self.config)
541    }
542}
543
544impl Default for IntrospectorBuilder {
545    fn default() -> Self {
546        Self::new()
547    }
548}
549
550/// SQL query templates for concurrent introspection.
551pub mod queries {
552    use crate::sql::DatabaseType;
553
554    /// Generate SQL to fetch all columns for multiple tables at once.
555    pub fn batch_columns_query(
556        db_type: DatabaseType,
557        tables: &[&str],
558        schema: Option<&str>,
559    ) -> String {
560        let schema_name = schema.unwrap_or("public");
561        let table_list = tables
562            .iter()
563            .map(|t| format!("'{}'", t))
564            .collect::<Vec<_>>()
565            .join(", ");
566
567        match db_type {
568            DatabaseType::PostgreSQL => {
569                format!(
570                    r#"
571                    SELECT
572                        c.table_name,
573                        c.column_name,
574                        c.data_type,
575                        c.udt_name,
576                        c.is_nullable = 'YES' as nullable,
577                        c.column_default,
578                        c.character_maximum_length,
579                        c.numeric_precision,
580                        c.numeric_scale,
581                        col_description(
582                            (c.table_schema || '.' || c.table_name)::regclass,
583                            c.ordinal_position
584                        ) as comment,
585                        CASE
586                            WHEN c.column_default LIKE 'nextval%' THEN true
587                            WHEN c.is_identity = 'YES' THEN true
588                            ELSE false
589                        END as auto_increment
590                    FROM information_schema.columns c
591                    WHERE c.table_schema = '{}'
592                    AND c.table_name IN ({})
593                    ORDER BY c.table_name, c.ordinal_position
594                    "#,
595                    schema_name, table_list
596                )
597            }
598            DatabaseType::MySQL => {
599                format!(
600                    r#"
601                    SELECT
602                        c.TABLE_NAME,
603                        c.COLUMN_NAME,
604                        c.DATA_TYPE,
605                        c.COLUMN_TYPE,
606                        c.IS_NULLABLE = 'YES' as nullable,
607                        c.COLUMN_DEFAULT,
608                        c.CHARACTER_MAXIMUM_LENGTH,
609                        c.NUMERIC_PRECISION,
610                        c.NUMERIC_SCALE,
611                        c.COLUMN_COMMENT,
612                        c.EXTRA LIKE '%auto_increment%' as auto_increment
613                    FROM information_schema.COLUMNS c
614                    WHERE c.TABLE_SCHEMA = DATABASE()
615                    AND c.TABLE_NAME IN ({})
616                    ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION
617                    "#,
618                    table_list
619                )
620            }
621            _ => String::new(),
622        }
623    }
624
625    /// Generate SQL to fetch all indexes for multiple tables at once.
626    pub fn batch_indexes_query(
627        db_type: DatabaseType,
628        tables: &[&str],
629        schema: Option<&str>,
630    ) -> String {
631        let schema_name = schema.unwrap_or("public");
632        let table_list = tables
633            .iter()
634            .map(|t| format!("'{}'", t))
635            .collect::<Vec<_>>()
636            .join(", ");
637
638        match db_type {
639            DatabaseType::PostgreSQL => {
640                format!(
641                    r#"
642                    SELECT
643                        t.relname as table_name,
644                        i.relname as index_name,
645                        a.attname as column_name,
646                        ix.indisunique as is_unique,
647                        ix.indisprimary as is_primary,
648                        am.amname as index_type
649                    FROM pg_class t
650                    JOIN pg_index ix ON t.oid = ix.indrelid
651                    JOIN pg_class i ON i.oid = ix.indexrelid
652                    JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
653                    JOIN pg_am am ON i.relam = am.oid
654                    JOIN pg_namespace n ON n.oid = t.relnamespace
655                    WHERE n.nspname = '{}'
656                    AND t.relname IN ({})
657                    ORDER BY t.relname, i.relname, a.attnum
658                    "#,
659                    schema_name, table_list
660                )
661            }
662            DatabaseType::MySQL => {
663                format!(
664                    r#"
665                    SELECT
666                        s.TABLE_NAME,
667                        s.INDEX_NAME,
668                        s.COLUMN_NAME,
669                        s.NON_UNIQUE = 0 as is_unique,
670                        s.INDEX_NAME = 'PRIMARY' as is_primary,
671                        s.INDEX_TYPE
672                    FROM information_schema.STATISTICS s
673                    WHERE s.TABLE_SCHEMA = DATABASE()
674                    AND s.TABLE_NAME IN ({})
675                    ORDER BY s.TABLE_NAME, s.INDEX_NAME, s.SEQ_IN_INDEX
676                    "#,
677                    table_list
678                )
679            }
680            _ => String::new(),
681        }
682    }
683
684    /// Generate SQL to fetch all foreign keys for multiple tables at once.
685    pub fn batch_foreign_keys_query(
686        db_type: DatabaseType,
687        tables: &[&str],
688        schema: Option<&str>,
689    ) -> String {
690        let schema_name = schema.unwrap_or("public");
691        let table_list = tables
692            .iter()
693            .map(|t| format!("'{}'", t))
694            .collect::<Vec<_>>()
695            .join(", ");
696
697        match db_type {
698            DatabaseType::PostgreSQL => {
699                format!(
700                    r#"
701                    SELECT
702                        tc.table_name,
703                        tc.constraint_name,
704                        kcu.column_name,
705                        ccu.table_name AS foreign_table,
706                        ccu.table_schema AS foreign_schema,
707                        ccu.column_name AS foreign_column,
708                        rc.delete_rule,
709                        rc.update_rule
710                    FROM information_schema.table_constraints tc
711                    JOIN information_schema.key_column_usage kcu
712                        ON tc.constraint_name = kcu.constraint_name
713                        AND tc.table_schema = kcu.table_schema
714                    JOIN information_schema.constraint_column_usage ccu
715                        ON ccu.constraint_name = tc.constraint_name
716                        AND ccu.table_schema = tc.table_schema
717                    JOIN information_schema.referential_constraints rc
718                        ON tc.constraint_name = rc.constraint_name
719                        AND tc.table_schema = rc.constraint_schema
720                    WHERE tc.constraint_type = 'FOREIGN KEY'
721                    AND tc.table_schema = '{}'
722                    AND tc.table_name IN ({})
723                    ORDER BY tc.table_name, tc.constraint_name, kcu.ordinal_position
724                    "#,
725                    schema_name, table_list
726                )
727            }
728            DatabaseType::MySQL => {
729                format!(
730                    r#"
731                    SELECT
732                        kcu.TABLE_NAME,
733                        kcu.CONSTRAINT_NAME,
734                        kcu.COLUMN_NAME,
735                        kcu.REFERENCED_TABLE_NAME,
736                        kcu.REFERENCED_TABLE_SCHEMA,
737                        kcu.REFERENCED_COLUMN_NAME,
738                        rc.DELETE_RULE,
739                        rc.UPDATE_RULE
740                    FROM information_schema.KEY_COLUMN_USAGE kcu
741                    JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
742                        ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
743                        AND kcu.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
744                    WHERE kcu.TABLE_SCHEMA = DATABASE()
745                    AND kcu.REFERENCED_TABLE_NAME IS NOT NULL
746                    AND kcu.TABLE_NAME IN ({})
747                    ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME, kcu.ORDINAL_POSITION
748                    "#,
749                    table_list
750                )
751            }
752            _ => String::new(),
753        }
754    }
755}
756
757#[cfg(test)]
758mod tests {
759    use super::*;
760
761    #[tokio::test]
762    async fn test_concurrent_introspector() {
763        let config = IntrospectionConfig::default().with_max_concurrency(4);
764        let introspector = ConcurrentIntrospector::new(config);
765
766        let tables = vec![
767            "users".to_string(),
768            "posts".to_string(),
769            "comments".to_string(),
770        ];
771
772        let result = introspector
773            .introspect_tables(tables, |name| async move {
774                // Simulate introspection
775                tokio::time::sleep(Duration::from_millis(10)).await;
776                Ok(TableMetadata::new(name))
777            })
778            .await;
779
780        assert_eq!(result.tables.len(), 3);
781        assert!(result.errors.is_empty());
782    }
783
784    #[tokio::test]
785    async fn test_batch_introspector() {
786        let batch = BatchIntrospector::new(2);
787
788        let tables = vec![
789            "t1".to_string(),
790            "t2".to_string(),
791            "t3".to_string(),
792            "t4".to_string(),
793            "t5".to_string(),
794        ];
795
796        let batches = batch.create_batches(tables);
797        assert_eq!(batches.len(), 3);
798        assert_eq!(batches[0].len(), 2);
799        assert_eq!(batches[1].len(), 2);
800        assert_eq!(batches[2].len(), 1);
801    }
802
803    #[tokio::test]
804    async fn test_introspection_with_errors() {
805        let config = IntrospectionConfig::default().with_max_concurrency(2);
806        let introspector = ConcurrentIntrospector::new(config);
807
808        let tables = vec!["good1".to_string(), "bad".to_string(), "good2".to_string()];
809
810        let result = introspector
811            .introspect_named(tables, |name| async move {
812                if name == "bad" {
813                    Err("Table not found".to_string())
814                } else {
815                    Ok(TableMetadata::new(name))
816                }
817            })
818            .await;
819
820        assert_eq!(result.tables.len(), 2);
821        assert_eq!(result.errors.len(), 1);
822        assert_eq!(result.errors[0].table, "bad");
823    }
824
825    #[test]
826    fn test_introspection_phase_progression() {
827        let mut phase = IntrospectionPhase::Tables;
828
829        assert_eq!(phase.name(), "tables");
830
831        phase = phase.next();
832        assert_eq!(phase, IntrospectionPhase::Columns);
833
834        phase = phase.next();
835        assert_eq!(phase, IntrospectionPhase::PrimaryKeys);
836
837        // Progress to complete
838        while phase != IntrospectionPhase::Complete {
839            phase = phase.next();
840        }
841
842        // Should stay at complete
843        assert_eq!(phase.next(), IntrospectionPhase::Complete);
844    }
845
846    #[test]
847    fn test_batch_columns_query() {
848        let sql = queries::batch_columns_query(
849            crate::sql::DatabaseType::PostgreSQL,
850            &["users", "posts"],
851            Some("public"),
852        );
853
854        assert!(sql.contains("information_schema.columns"));
855        assert!(sql.contains("'users', 'posts'"));
856    }
857}