1use std::future::Future;
33use std::time::{Duration, Instant};
34
35use super::concurrent::{ConcurrencyConfig, ConcurrentExecutor, TaskResult};
36
37#[derive(Debug, Clone)]
39pub struct IntrospectionConfig {
40 pub max_concurrency: usize,
42 pub table_timeout: Duration,
44 pub continue_on_error: bool,
46 pub batch_size: usize,
48}
49
50impl Default for IntrospectionConfig {
51 fn default() -> Self {
52 Self {
53 max_concurrency: 8,
54 table_timeout: Duration::from_secs(30),
55 continue_on_error: true,
56 batch_size: 0, }
58 }
59}
60
61impl IntrospectionConfig {
62 #[must_use]
64 pub fn for_large_database() -> Self {
65 Self {
66 max_concurrency: 16,
67 table_timeout: Duration::from_secs(60),
68 continue_on_error: true,
69 batch_size: 50,
70 }
71 }
72
73 #[must_use]
75 pub fn for_small_database() -> Self {
76 Self {
77 max_concurrency: 4,
78 table_timeout: Duration::from_secs(15),
79 continue_on_error: true,
80 batch_size: 0,
81 }
82 }
83
84 #[must_use]
86 pub fn with_max_concurrency(mut self, max: usize) -> Self {
87 self.max_concurrency = max.max(1);
88 self
89 }
90
91 #[must_use]
93 pub fn with_table_timeout(mut self, timeout: Duration) -> Self {
94 self.table_timeout = timeout;
95 self
96 }
97
98 #[must_use]
100 pub fn with_batch_size(mut self, size: usize) -> Self {
101 self.batch_size = size;
102 self
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct TableMetadata {
109 pub name: String,
111 pub columns: Vec<ColumnMetadata>,
113 pub indexes: Vec<IndexMetadata>,
115 pub foreign_keys: Vec<ForeignKeyMetadata>,
117 pub primary_key: Vec<String>,
119 pub comment: Option<String>,
121}
122
123impl TableMetadata {
124 pub fn new(name: impl Into<String>) -> Self {
126 Self {
127 name: name.into(),
128 columns: Vec::new(),
129 indexes: Vec::new(),
130 foreign_keys: Vec::new(),
131 primary_key: Vec::new(),
132 comment: None,
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct ColumnMetadata {
140 pub name: String,
142 pub db_type: String,
144 pub nullable: bool,
146 pub default: Option<String>,
148 pub auto_increment: bool,
150 pub is_primary_key: bool,
152}
153
154#[derive(Debug, Clone)]
156pub struct IndexMetadata {
157 pub name: String,
159 pub columns: Vec<String>,
161 pub is_unique: bool,
163 pub is_primary: bool,
165 pub index_type: Option<String>,
167}
168
169#[derive(Debug, Clone)]
171pub struct ForeignKeyMetadata {
172 pub name: String,
174 pub columns: Vec<String>,
176 pub referenced_table: String,
178 pub referenced_columns: Vec<String>,
180 pub on_delete: String,
182 pub on_update: String,
184}
185
186#[derive(Debug)]
188pub struct IntrospectionResult {
189 pub tables: Vec<TableMetadata>,
191 pub errors: Vec<IntrospectionError>,
193 pub duration: Duration,
195 pub max_concurrency: usize,
197}
198
199impl IntrospectionResult {
200 pub fn is_complete(&self) -> bool {
202 self.errors.is_empty()
203 }
204
205 pub fn get_table(&self, name: &str) -> Option<&TableMetadata> {
207 self.tables.iter().find(|t| t.name == name)
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct IntrospectionError {
214 pub table: String,
216 pub message: String,
218 pub is_timeout: bool,
220}
221
222impl std::fmt::Display for IntrospectionError {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 if self.is_timeout {
225 write!(f, "Timeout introspecting table '{}': {}", self.table, self.message)
226 } else {
227 write!(f, "Error introspecting table '{}': {}", self.table, self.message)
228 }
229 }
230}
231
232pub struct ConcurrentIntrospector {
234 config: IntrospectionConfig,
235 executor: ConcurrentExecutor,
236}
237
238impl ConcurrentIntrospector {
239 pub fn new(config: IntrospectionConfig) -> Self {
241 let executor_config = ConcurrencyConfig::default()
242 .with_max_concurrency(config.max_concurrency)
243 .with_timeout(config.table_timeout)
244 .with_continue_on_error(config.continue_on_error);
245
246 Self {
247 config,
248 executor: ConcurrentExecutor::new(executor_config),
249 }
250 }
251
252 pub async fn introspect_tables<F, Fut>(
257 &self,
258 table_names: Vec<String>,
259 operation: F,
260 ) -> IntrospectionResult
261 where
262 F: Fn(String) -> Fut + Clone + Send + 'static,
263 Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
264 {
265 let start = Instant::now();
266
267 let tasks: Vec<_> = table_names
269 .into_iter()
270 .map(|name| {
271 let op = operation.clone();
272 move || op(name)
273 })
274 .collect();
275
276 let (results, stats) = self.executor.execute_all(tasks).await;
277
278 let mut tables = Vec::new();
280 let mut errors = Vec::new();
281
282 for result in results {
283 match result {
284 TaskResult::Success { value, .. } => {
285 tables.push(value);
286 }
287 TaskResult::Error(e) => {
288 errors.push(IntrospectionError {
289 table: format!("task_{}", e.task_id),
290 message: e.message,
291 is_timeout: e.is_timeout,
292 });
293 }
294 }
295 }
296
297 IntrospectionResult {
298 tables,
299 errors,
300 duration: start.elapsed(),
301 max_concurrency: stats.max_concurrent,
302 }
303 }
304
305 pub async fn introspect_named<F, Fut>(
307 &self,
308 table_names: Vec<String>,
309 operation: F,
310 ) -> IntrospectionResult
311 where
312 F: Fn(String) -> Fut + Clone + Send + 'static,
313 Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
314 {
315 let start = Instant::now();
316 let names_for_errors: Vec<_> = table_names.clone();
317
318 let tasks: Vec<_> = table_names
320 .into_iter()
321 .map(|name| {
322 let op = operation.clone();
323 move || op(name)
324 })
325 .collect();
326
327 let (results, stats) = self.executor.execute_all(tasks).await;
328
329 let mut tables = Vec::new();
331 let mut errors = Vec::new();
332
333 for (idx, result) in results.into_iter().enumerate() {
334 match result {
335 TaskResult::Success { value, .. } => {
336 tables.push(value);
337 }
338 TaskResult::Error(e) => {
339 let table_name = names_for_errors
340 .get(idx)
341 .cloned()
342 .unwrap_or_else(|| format!("unknown_{}", idx));
343 errors.push(IntrospectionError {
344 table: table_name,
345 message: e.message,
346 is_timeout: e.is_timeout,
347 });
348 }
349 }
350 }
351
352 IntrospectionResult {
353 tables,
354 errors,
355 duration: start.elapsed(),
356 max_concurrency: stats.max_concurrent,
357 }
358 }
359
360 pub fn config(&self) -> &IntrospectionConfig {
362 &self.config
363 }
364}
365
366pub struct BatchIntrospector {
371 batch_size: usize,
372}
373
374impl BatchIntrospector {
375 pub fn new(batch_size: usize) -> Self {
377 Self {
378 batch_size: batch_size.max(1),
379 }
380 }
381
382 pub fn create_batches(&self, tables: Vec<String>) -> Vec<Vec<String>> {
384 tables.chunks(self.batch_size).map(|c| c.to_vec()).collect()
385 }
386
387 pub async fn introspect_batched<F, Fut>(
389 &self,
390 tables: Vec<String>,
391 max_concurrency: usize,
392 operation: F,
393 ) -> IntrospectionResult
394 where
395 F: Fn(Vec<String>) -> Fut + Clone + Send + 'static,
396 Fut: Future<Output = Result<Vec<TableMetadata>, String>> + Send + 'static,
397 {
398 let start = Instant::now();
399 let batches = self.create_batches(tables);
400
401 let config = IntrospectionConfig::default().with_max_concurrency(max_concurrency);
402 let executor = ConcurrentExecutor::new(
403 ConcurrencyConfig::default()
404 .with_max_concurrency(config.max_concurrency)
405 .with_continue_on_error(true),
406 );
407
408 let tasks: Vec<_> = batches
409 .into_iter()
410 .map(|batch| {
411 let op = operation.clone();
412 move || op(batch)
413 })
414 .collect();
415
416 let (results, stats) = executor.execute_all(tasks).await;
417
418 let mut tables = Vec::new();
420 let mut errors = Vec::new();
421
422 for result in results {
423 match result {
424 TaskResult::Success { value, .. } => {
425 tables.extend(value);
426 }
427 TaskResult::Error(e) => {
428 errors.push(IntrospectionError {
429 table: format!("batch_{}", e.task_id),
430 message: e.message,
431 is_timeout: e.is_timeout,
432 });
433 }
434 }
435 }
436
437 IntrospectionResult {
438 tables,
439 errors,
440 duration: start.elapsed(),
441 max_concurrency: stats.max_concurrent,
442 }
443 }
444}
445
446#[derive(Debug, Clone, Copy, PartialEq, Eq)]
448pub enum IntrospectionPhase {
449 Tables,
451 Columns,
453 PrimaryKeys,
455 ForeignKeys,
457 Indexes,
459 Enums,
461 Views,
463 Complete,
465}
466
467impl IntrospectionPhase {
468 pub fn next(self) -> Self {
470 match self {
471 Self::Tables => Self::Columns,
472 Self::Columns => Self::PrimaryKeys,
473 Self::PrimaryKeys => Self::ForeignKeys,
474 Self::ForeignKeys => Self::Indexes,
475 Self::Indexes => Self::Enums,
476 Self::Enums => Self::Views,
477 Self::Views => Self::Complete,
478 Self::Complete => Self::Complete,
479 }
480 }
481
482 pub fn name(&self) -> &'static str {
484 match self {
485 Self::Tables => "tables",
486 Self::Columns => "columns",
487 Self::PrimaryKeys => "primary keys",
488 Self::ForeignKeys => "foreign keys",
489 Self::Indexes => "indexes",
490 Self::Enums => "enums",
491 Self::Views => "views",
492 Self::Complete => "complete",
493 }
494 }
495}
496
497pub type ProgressCallback = Box<dyn Fn(IntrospectionPhase, usize, usize) + Send + Sync>;
499
500pub struct IntrospectorBuilder {
502 config: IntrospectionConfig,
503 progress_callback: Option<ProgressCallback>,
504}
505
506impl IntrospectorBuilder {
507 pub fn new() -> Self {
509 Self {
510 config: IntrospectionConfig::default(),
511 progress_callback: None,
512 }
513 }
514
515 pub fn config(mut self, config: IntrospectionConfig) -> Self {
517 self.config = config;
518 self
519 }
520
521 pub fn on_progress<F>(mut self, callback: F) -> Self
523 where
524 F: Fn(IntrospectionPhase, usize, usize) + Send + Sync + 'static,
525 {
526 self.progress_callback = Some(Box::new(callback));
527 self
528 }
529
530 pub fn build(self) -> ConcurrentIntrospector {
532 ConcurrentIntrospector::new(self.config)
533 }
534}
535
536impl Default for IntrospectorBuilder {
537 fn default() -> Self {
538 Self::new()
539 }
540}
541
542pub mod queries {
544 use crate::sql::DatabaseType;
545
546 pub fn batch_columns_query(db_type: DatabaseType, tables: &[&str], schema: Option<&str>) -> String {
548 let schema_name = schema.unwrap_or("public");
549 let table_list = tables
550 .iter()
551 .map(|t| format!("'{}'", t))
552 .collect::<Vec<_>>()
553 .join(", ");
554
555 match db_type {
556 DatabaseType::PostgreSQL => {
557 format!(
558 r#"
559 SELECT
560 c.table_name,
561 c.column_name,
562 c.data_type,
563 c.udt_name,
564 c.is_nullable = 'YES' as nullable,
565 c.column_default,
566 c.character_maximum_length,
567 c.numeric_precision,
568 c.numeric_scale,
569 col_description(
570 (c.table_schema || '.' || c.table_name)::regclass,
571 c.ordinal_position
572 ) as comment,
573 CASE
574 WHEN c.column_default LIKE 'nextval%' THEN true
575 WHEN c.is_identity = 'YES' THEN true
576 ELSE false
577 END as auto_increment
578 FROM information_schema.columns c
579 WHERE c.table_schema = '{}'
580 AND c.table_name IN ({})
581 ORDER BY c.table_name, c.ordinal_position
582 "#,
583 schema_name, table_list
584 )
585 }
586 DatabaseType::MySQL => {
587 format!(
588 r#"
589 SELECT
590 c.TABLE_NAME,
591 c.COLUMN_NAME,
592 c.DATA_TYPE,
593 c.COLUMN_TYPE,
594 c.IS_NULLABLE = 'YES' as nullable,
595 c.COLUMN_DEFAULT,
596 c.CHARACTER_MAXIMUM_LENGTH,
597 c.NUMERIC_PRECISION,
598 c.NUMERIC_SCALE,
599 c.COLUMN_COMMENT,
600 c.EXTRA LIKE '%auto_increment%' as auto_increment
601 FROM information_schema.COLUMNS c
602 WHERE c.TABLE_SCHEMA = DATABASE()
603 AND c.TABLE_NAME IN ({})
604 ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION
605 "#,
606 table_list
607 )
608 }
609 _ => String::new(),
610 }
611 }
612
613 pub fn batch_indexes_query(db_type: DatabaseType, tables: &[&str], schema: Option<&str>) -> String {
615 let schema_name = schema.unwrap_or("public");
616 let table_list = tables
617 .iter()
618 .map(|t| format!("'{}'", t))
619 .collect::<Vec<_>>()
620 .join(", ");
621
622 match db_type {
623 DatabaseType::PostgreSQL => {
624 format!(
625 r#"
626 SELECT
627 t.relname as table_name,
628 i.relname as index_name,
629 a.attname as column_name,
630 ix.indisunique as is_unique,
631 ix.indisprimary as is_primary,
632 am.amname as index_type
633 FROM pg_class t
634 JOIN pg_index ix ON t.oid = ix.indrelid
635 JOIN pg_class i ON i.oid = ix.indexrelid
636 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
637 JOIN pg_am am ON i.relam = am.oid
638 JOIN pg_namespace n ON n.oid = t.relnamespace
639 WHERE n.nspname = '{}'
640 AND t.relname IN ({})
641 ORDER BY t.relname, i.relname, a.attnum
642 "#,
643 schema_name, table_list
644 )
645 }
646 DatabaseType::MySQL => {
647 format!(
648 r#"
649 SELECT
650 s.TABLE_NAME,
651 s.INDEX_NAME,
652 s.COLUMN_NAME,
653 s.NON_UNIQUE = 0 as is_unique,
654 s.INDEX_NAME = 'PRIMARY' as is_primary,
655 s.INDEX_TYPE
656 FROM information_schema.STATISTICS s
657 WHERE s.TABLE_SCHEMA = DATABASE()
658 AND s.TABLE_NAME IN ({})
659 ORDER BY s.TABLE_NAME, s.INDEX_NAME, s.SEQ_IN_INDEX
660 "#,
661 table_list
662 )
663 }
664 _ => String::new(),
665 }
666 }
667
668 pub fn batch_foreign_keys_query(db_type: DatabaseType, tables: &[&str], schema: Option<&str>) -> String {
670 let schema_name = schema.unwrap_or("public");
671 let table_list = tables
672 .iter()
673 .map(|t| format!("'{}'", t))
674 .collect::<Vec<_>>()
675 .join(", ");
676
677 match db_type {
678 DatabaseType::PostgreSQL => {
679 format!(
680 r#"
681 SELECT
682 tc.table_name,
683 tc.constraint_name,
684 kcu.column_name,
685 ccu.table_name AS foreign_table,
686 ccu.table_schema AS foreign_schema,
687 ccu.column_name AS foreign_column,
688 rc.delete_rule,
689 rc.update_rule
690 FROM information_schema.table_constraints tc
691 JOIN information_schema.key_column_usage kcu
692 ON tc.constraint_name = kcu.constraint_name
693 AND tc.table_schema = kcu.table_schema
694 JOIN information_schema.constraint_column_usage ccu
695 ON ccu.constraint_name = tc.constraint_name
696 AND ccu.table_schema = tc.table_schema
697 JOIN information_schema.referential_constraints rc
698 ON tc.constraint_name = rc.constraint_name
699 AND tc.table_schema = rc.constraint_schema
700 WHERE tc.constraint_type = 'FOREIGN KEY'
701 AND tc.table_schema = '{}'
702 AND tc.table_name IN ({})
703 ORDER BY tc.table_name, tc.constraint_name, kcu.ordinal_position
704 "#,
705 schema_name, table_list
706 )
707 }
708 DatabaseType::MySQL => {
709 format!(
710 r#"
711 SELECT
712 kcu.TABLE_NAME,
713 kcu.CONSTRAINT_NAME,
714 kcu.COLUMN_NAME,
715 kcu.REFERENCED_TABLE_NAME,
716 kcu.REFERENCED_TABLE_SCHEMA,
717 kcu.REFERENCED_COLUMN_NAME,
718 rc.DELETE_RULE,
719 rc.UPDATE_RULE
720 FROM information_schema.KEY_COLUMN_USAGE kcu
721 JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
722 ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
723 AND kcu.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
724 WHERE kcu.TABLE_SCHEMA = DATABASE()
725 AND kcu.REFERENCED_TABLE_NAME IS NOT NULL
726 AND kcu.TABLE_NAME IN ({})
727 ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME, kcu.ORDINAL_POSITION
728 "#,
729 table_list
730 )
731 }
732 _ => String::new(),
733 }
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740
741 #[tokio::test]
742 async fn test_concurrent_introspector() {
743 let config = IntrospectionConfig::default().with_max_concurrency(4);
744 let introspector = ConcurrentIntrospector::new(config);
745
746 let tables = vec!["users".to_string(), "posts".to_string(), "comments".to_string()];
747
748 let result = introspector
749 .introspect_tables(tables, |name| async move {
750 tokio::time::sleep(Duration::from_millis(10)).await;
752 Ok(TableMetadata::new(name))
753 })
754 .await;
755
756 assert_eq!(result.tables.len(), 3);
757 assert!(result.errors.is_empty());
758 }
759
760 #[tokio::test]
761 async fn test_batch_introspector() {
762 let batch = BatchIntrospector::new(2);
763
764 let tables = vec![
765 "t1".to_string(),
766 "t2".to_string(),
767 "t3".to_string(),
768 "t4".to_string(),
769 "t5".to_string(),
770 ];
771
772 let batches = batch.create_batches(tables);
773 assert_eq!(batches.len(), 3);
774 assert_eq!(batches[0].len(), 2);
775 assert_eq!(batches[1].len(), 2);
776 assert_eq!(batches[2].len(), 1);
777 }
778
779 #[tokio::test]
780 async fn test_introspection_with_errors() {
781 let config = IntrospectionConfig::default().with_max_concurrency(2);
782 let introspector = ConcurrentIntrospector::new(config);
783
784 let tables = vec!["good1".to_string(), "bad".to_string(), "good2".to_string()];
785
786 let result = introspector
787 .introspect_named(tables, |name| async move {
788 if name == "bad" {
789 Err("Table not found".to_string())
790 } else {
791 Ok(TableMetadata::new(name))
792 }
793 })
794 .await;
795
796 assert_eq!(result.tables.len(), 2);
797 assert_eq!(result.errors.len(), 1);
798 assert_eq!(result.errors[0].table, "bad");
799 }
800
801 #[test]
802 fn test_introspection_phase_progression() {
803 let mut phase = IntrospectionPhase::Tables;
804
805 assert_eq!(phase.name(), "tables");
806
807 phase = phase.next();
808 assert_eq!(phase, IntrospectionPhase::Columns);
809
810 phase = phase.next();
811 assert_eq!(phase, IntrospectionPhase::PrimaryKeys);
812
813 while phase != IntrospectionPhase::Complete {
815 phase = phase.next();
816 }
817
818 assert_eq!(phase.next(), IntrospectionPhase::Complete);
820 }
821
822 #[test]
823 fn test_batch_columns_query() {
824 let sql = queries::batch_columns_query(
825 crate::sql::DatabaseType::PostgreSQL,
826 &["users", "posts"],
827 Some("public"),
828 );
829
830 assert!(sql.contains("information_schema.columns"));
831 assert!(sql.contains("'users', 'posts'"));
832 }
833}
834