1use std::future::Future;
33use std::time::{Duration, Instant};
34
35use super::concurrent::{ConcurrencyConfig, ConcurrentExecutor, TaskResult};
36
37#[derive(Debug, Clone)]
39pub struct IntrospectionConfig {
40 pub max_concurrency: usize,
42 pub table_timeout: Duration,
44 pub continue_on_error: bool,
46 pub batch_size: usize,
48}
49
50impl Default for IntrospectionConfig {
51 fn default() -> Self {
52 Self {
53 max_concurrency: 8,
54 table_timeout: Duration::from_secs(30),
55 continue_on_error: true,
56 batch_size: 0, }
58 }
59}
60
61impl IntrospectionConfig {
62 #[must_use]
64 pub fn for_large_database() -> Self {
65 Self {
66 max_concurrency: 16,
67 table_timeout: Duration::from_secs(60),
68 continue_on_error: true,
69 batch_size: 50,
70 }
71 }
72
73 #[must_use]
75 pub fn for_small_database() -> Self {
76 Self {
77 max_concurrency: 4,
78 table_timeout: Duration::from_secs(15),
79 continue_on_error: true,
80 batch_size: 0,
81 }
82 }
83
84 #[must_use]
86 pub fn with_max_concurrency(mut self, max: usize) -> Self {
87 self.max_concurrency = max.max(1);
88 self
89 }
90
91 #[must_use]
93 pub fn with_table_timeout(mut self, timeout: Duration) -> Self {
94 self.table_timeout = timeout;
95 self
96 }
97
98 #[must_use]
100 pub fn with_batch_size(mut self, size: usize) -> Self {
101 self.batch_size = size;
102 self
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct TableMetadata {
109 pub name: String,
111 pub columns: Vec<ColumnMetadata>,
113 pub indexes: Vec<IndexMetadata>,
115 pub foreign_keys: Vec<ForeignKeyMetadata>,
117 pub primary_key: Vec<String>,
119 pub comment: Option<String>,
121}
122
123impl TableMetadata {
124 pub fn new(name: impl Into<String>) -> Self {
126 Self {
127 name: name.into(),
128 columns: Vec::new(),
129 indexes: Vec::new(),
130 foreign_keys: Vec::new(),
131 primary_key: Vec::new(),
132 comment: None,
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct ColumnMetadata {
140 pub name: String,
142 pub db_type: String,
144 pub nullable: bool,
146 pub default: Option<String>,
148 pub auto_increment: bool,
150 pub is_primary_key: bool,
152}
153
154#[derive(Debug, Clone)]
156pub struct IndexMetadata {
157 pub name: String,
159 pub columns: Vec<String>,
161 pub is_unique: bool,
163 pub is_primary: bool,
165 pub index_type: Option<String>,
167}
168
169#[derive(Debug, Clone)]
171pub struct ForeignKeyMetadata {
172 pub name: String,
174 pub columns: Vec<String>,
176 pub referenced_table: String,
178 pub referenced_columns: Vec<String>,
180 pub on_delete: String,
182 pub on_update: String,
184}
185
186#[derive(Debug)]
188pub struct IntrospectionResult {
189 pub tables: Vec<TableMetadata>,
191 pub errors: Vec<IntrospectionError>,
193 pub duration: Duration,
195 pub max_concurrency: usize,
197}
198
199impl IntrospectionResult {
200 pub fn is_complete(&self) -> bool {
202 self.errors.is_empty()
203 }
204
205 pub fn get_table(&self, name: &str) -> Option<&TableMetadata> {
207 self.tables.iter().find(|t| t.name == name)
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct IntrospectionError {
214 pub table: String,
216 pub message: String,
218 pub is_timeout: bool,
220}
221
222impl std::fmt::Display for IntrospectionError {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 if self.is_timeout {
225 write!(
226 f,
227 "Timeout introspecting table '{}': {}",
228 self.table, self.message
229 )
230 } else {
231 write!(
232 f,
233 "Error introspecting table '{}': {}",
234 self.table, self.message
235 )
236 }
237 }
238}
239
240pub struct ConcurrentIntrospector {
242 config: IntrospectionConfig,
243 executor: ConcurrentExecutor,
244}
245
246impl ConcurrentIntrospector {
247 pub fn new(config: IntrospectionConfig) -> Self {
249 let executor_config = ConcurrencyConfig::default()
250 .with_max_concurrency(config.max_concurrency)
251 .with_timeout(config.table_timeout)
252 .with_continue_on_error(config.continue_on_error);
253
254 Self {
255 config,
256 executor: ConcurrentExecutor::new(executor_config),
257 }
258 }
259
260 pub async fn introspect_tables<F, Fut>(
265 &self,
266 table_names: Vec<String>,
267 operation: F,
268 ) -> IntrospectionResult
269 where
270 F: Fn(String) -> Fut + Clone + Send + 'static,
271 Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
272 {
273 let start = Instant::now();
274
275 let tasks: Vec<_> = table_names
277 .into_iter()
278 .map(|name| {
279 let op = operation.clone();
280 move || op(name)
281 })
282 .collect();
283
284 let (results, stats) = self.executor.execute_all(tasks).await;
285
286 let mut tables = Vec::new();
288 let mut errors = Vec::new();
289
290 for result in results {
291 match result {
292 TaskResult::Success { value, .. } => {
293 tables.push(value);
294 }
295 TaskResult::Error(e) => {
296 errors.push(IntrospectionError {
297 table: format!("task_{}", e.task_id),
298 message: e.message,
299 is_timeout: e.is_timeout,
300 });
301 }
302 }
303 }
304
305 IntrospectionResult {
306 tables,
307 errors,
308 duration: start.elapsed(),
309 max_concurrency: stats.max_concurrent,
310 }
311 }
312
313 pub async fn introspect_named<F, Fut>(
315 &self,
316 table_names: Vec<String>,
317 operation: F,
318 ) -> IntrospectionResult
319 where
320 F: Fn(String) -> Fut + Clone + Send + 'static,
321 Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
322 {
323 let start = Instant::now();
324 let names_for_errors: Vec<_> = table_names.clone();
325
326 let tasks: Vec<_> = table_names
328 .into_iter()
329 .map(|name| {
330 let op = operation.clone();
331 move || op(name)
332 })
333 .collect();
334
335 let (results, stats) = self.executor.execute_all(tasks).await;
336
337 let mut tables = Vec::new();
339 let mut errors = Vec::new();
340
341 for (idx, result) in results.into_iter().enumerate() {
342 match result {
343 TaskResult::Success { value, .. } => {
344 tables.push(value);
345 }
346 TaskResult::Error(e) => {
347 let table_name = names_for_errors
348 .get(idx)
349 .cloned()
350 .unwrap_or_else(|| format!("unknown_{}", idx));
351 errors.push(IntrospectionError {
352 table: table_name,
353 message: e.message,
354 is_timeout: e.is_timeout,
355 });
356 }
357 }
358 }
359
360 IntrospectionResult {
361 tables,
362 errors,
363 duration: start.elapsed(),
364 max_concurrency: stats.max_concurrent,
365 }
366 }
367
368 pub fn config(&self) -> &IntrospectionConfig {
370 &self.config
371 }
372}
373
374pub struct BatchIntrospector {
379 batch_size: usize,
380}
381
382impl BatchIntrospector {
383 pub fn new(batch_size: usize) -> Self {
385 Self {
386 batch_size: batch_size.max(1),
387 }
388 }
389
390 pub fn create_batches(&self, tables: Vec<String>) -> Vec<Vec<String>> {
392 tables.chunks(self.batch_size).map(|c| c.to_vec()).collect()
393 }
394
395 pub async fn introspect_batched<F, Fut>(
397 &self,
398 tables: Vec<String>,
399 max_concurrency: usize,
400 operation: F,
401 ) -> IntrospectionResult
402 where
403 F: Fn(Vec<String>) -> Fut + Clone + Send + 'static,
404 Fut: Future<Output = Result<Vec<TableMetadata>, String>> + Send + 'static,
405 {
406 let start = Instant::now();
407 let batches = self.create_batches(tables);
408
409 let config = IntrospectionConfig::default().with_max_concurrency(max_concurrency);
410 let executor = ConcurrentExecutor::new(
411 ConcurrencyConfig::default()
412 .with_max_concurrency(config.max_concurrency)
413 .with_continue_on_error(true),
414 );
415
416 let tasks: Vec<_> = batches
417 .into_iter()
418 .map(|batch| {
419 let op = operation.clone();
420 move || op(batch)
421 })
422 .collect();
423
424 let (results, stats) = executor.execute_all(tasks).await;
425
426 let mut tables = Vec::new();
428 let mut errors = Vec::new();
429
430 for result in results {
431 match result {
432 TaskResult::Success { value, .. } => {
433 tables.extend(value);
434 }
435 TaskResult::Error(e) => {
436 errors.push(IntrospectionError {
437 table: format!("batch_{}", e.task_id),
438 message: e.message,
439 is_timeout: e.is_timeout,
440 });
441 }
442 }
443 }
444
445 IntrospectionResult {
446 tables,
447 errors,
448 duration: start.elapsed(),
449 max_concurrency: stats.max_concurrent,
450 }
451 }
452}
453
454#[derive(Debug, Clone, Copy, PartialEq, Eq)]
456pub enum IntrospectionPhase {
457 Tables,
459 Columns,
461 PrimaryKeys,
463 ForeignKeys,
465 Indexes,
467 Enums,
469 Views,
471 Complete,
473}
474
475impl IntrospectionPhase {
476 pub fn next(self) -> Self {
478 match self {
479 Self::Tables => Self::Columns,
480 Self::Columns => Self::PrimaryKeys,
481 Self::PrimaryKeys => Self::ForeignKeys,
482 Self::ForeignKeys => Self::Indexes,
483 Self::Indexes => Self::Enums,
484 Self::Enums => Self::Views,
485 Self::Views => Self::Complete,
486 Self::Complete => Self::Complete,
487 }
488 }
489
490 pub fn name(&self) -> &'static str {
492 match self {
493 Self::Tables => "tables",
494 Self::Columns => "columns",
495 Self::PrimaryKeys => "primary keys",
496 Self::ForeignKeys => "foreign keys",
497 Self::Indexes => "indexes",
498 Self::Enums => "enums",
499 Self::Views => "views",
500 Self::Complete => "complete",
501 }
502 }
503}
504
505pub type ProgressCallback = Box<dyn Fn(IntrospectionPhase, usize, usize) + Send + Sync>;
507
508pub struct IntrospectorBuilder {
510 config: IntrospectionConfig,
511 progress_callback: Option<ProgressCallback>,
512}
513
514impl IntrospectorBuilder {
515 pub fn new() -> Self {
517 Self {
518 config: IntrospectionConfig::default(),
519 progress_callback: None,
520 }
521 }
522
523 pub fn config(mut self, config: IntrospectionConfig) -> Self {
525 self.config = config;
526 self
527 }
528
529 pub fn on_progress<F>(mut self, callback: F) -> Self
531 where
532 F: Fn(IntrospectionPhase, usize, usize) + Send + Sync + 'static,
533 {
534 self.progress_callback = Some(Box::new(callback));
535 self
536 }
537
538 pub fn build(self) -> ConcurrentIntrospector {
540 ConcurrentIntrospector::new(self.config)
541 }
542}
543
544impl Default for IntrospectorBuilder {
545 fn default() -> Self {
546 Self::new()
547 }
548}
549
550pub mod queries {
552 use crate::sql::DatabaseType;
553
554 pub fn batch_columns_query(
556 db_type: DatabaseType,
557 tables: &[&str],
558 schema: Option<&str>,
559 ) -> String {
560 let schema_name = schema.unwrap_or("public");
561 let table_list = tables
562 .iter()
563 .map(|t| format!("'{}'", t))
564 .collect::<Vec<_>>()
565 .join(", ");
566
567 match db_type {
568 DatabaseType::PostgreSQL => {
569 format!(
570 r#"
571 SELECT
572 c.table_name,
573 c.column_name,
574 c.data_type,
575 c.udt_name,
576 c.is_nullable = 'YES' as nullable,
577 c.column_default,
578 c.character_maximum_length,
579 c.numeric_precision,
580 c.numeric_scale,
581 col_description(
582 (c.table_schema || '.' || c.table_name)::regclass,
583 c.ordinal_position
584 ) as comment,
585 CASE
586 WHEN c.column_default LIKE 'nextval%' THEN true
587 WHEN c.is_identity = 'YES' THEN true
588 ELSE false
589 END as auto_increment
590 FROM information_schema.columns c
591 WHERE c.table_schema = '{}'
592 AND c.table_name IN ({})
593 ORDER BY c.table_name, c.ordinal_position
594 "#,
595 schema_name, table_list
596 )
597 }
598 DatabaseType::MySQL => {
599 format!(
600 r#"
601 SELECT
602 c.TABLE_NAME,
603 c.COLUMN_NAME,
604 c.DATA_TYPE,
605 c.COLUMN_TYPE,
606 c.IS_NULLABLE = 'YES' as nullable,
607 c.COLUMN_DEFAULT,
608 c.CHARACTER_MAXIMUM_LENGTH,
609 c.NUMERIC_PRECISION,
610 c.NUMERIC_SCALE,
611 c.COLUMN_COMMENT,
612 c.EXTRA LIKE '%auto_increment%' as auto_increment
613 FROM information_schema.COLUMNS c
614 WHERE c.TABLE_SCHEMA = DATABASE()
615 AND c.TABLE_NAME IN ({})
616 ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION
617 "#,
618 table_list
619 )
620 }
621 _ => String::new(),
622 }
623 }
624
625 pub fn batch_indexes_query(
627 db_type: DatabaseType,
628 tables: &[&str],
629 schema: Option<&str>,
630 ) -> String {
631 let schema_name = schema.unwrap_or("public");
632 let table_list = tables
633 .iter()
634 .map(|t| format!("'{}'", t))
635 .collect::<Vec<_>>()
636 .join(", ");
637
638 match db_type {
639 DatabaseType::PostgreSQL => {
640 format!(
641 r#"
642 SELECT
643 t.relname as table_name,
644 i.relname as index_name,
645 a.attname as column_name,
646 ix.indisunique as is_unique,
647 ix.indisprimary as is_primary,
648 am.amname as index_type
649 FROM pg_class t
650 JOIN pg_index ix ON t.oid = ix.indrelid
651 JOIN pg_class i ON i.oid = ix.indexrelid
652 JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
653 JOIN pg_am am ON i.relam = am.oid
654 JOIN pg_namespace n ON n.oid = t.relnamespace
655 WHERE n.nspname = '{}'
656 AND t.relname IN ({})
657 ORDER BY t.relname, i.relname, a.attnum
658 "#,
659 schema_name, table_list
660 )
661 }
662 DatabaseType::MySQL => {
663 format!(
664 r#"
665 SELECT
666 s.TABLE_NAME,
667 s.INDEX_NAME,
668 s.COLUMN_NAME,
669 s.NON_UNIQUE = 0 as is_unique,
670 s.INDEX_NAME = 'PRIMARY' as is_primary,
671 s.INDEX_TYPE
672 FROM information_schema.STATISTICS s
673 WHERE s.TABLE_SCHEMA = DATABASE()
674 AND s.TABLE_NAME IN ({})
675 ORDER BY s.TABLE_NAME, s.INDEX_NAME, s.SEQ_IN_INDEX
676 "#,
677 table_list
678 )
679 }
680 _ => String::new(),
681 }
682 }
683
684 pub fn batch_foreign_keys_query(
686 db_type: DatabaseType,
687 tables: &[&str],
688 schema: Option<&str>,
689 ) -> String {
690 let schema_name = schema.unwrap_or("public");
691 let table_list = tables
692 .iter()
693 .map(|t| format!("'{}'", t))
694 .collect::<Vec<_>>()
695 .join(", ");
696
697 match db_type {
698 DatabaseType::PostgreSQL => {
699 format!(
700 r#"
701 SELECT
702 tc.table_name,
703 tc.constraint_name,
704 kcu.column_name,
705 ccu.table_name AS foreign_table,
706 ccu.table_schema AS foreign_schema,
707 ccu.column_name AS foreign_column,
708 rc.delete_rule,
709 rc.update_rule
710 FROM information_schema.table_constraints tc
711 JOIN information_schema.key_column_usage kcu
712 ON tc.constraint_name = kcu.constraint_name
713 AND tc.table_schema = kcu.table_schema
714 JOIN information_schema.constraint_column_usage ccu
715 ON ccu.constraint_name = tc.constraint_name
716 AND ccu.table_schema = tc.table_schema
717 JOIN information_schema.referential_constraints rc
718 ON tc.constraint_name = rc.constraint_name
719 AND tc.table_schema = rc.constraint_schema
720 WHERE tc.constraint_type = 'FOREIGN KEY'
721 AND tc.table_schema = '{}'
722 AND tc.table_name IN ({})
723 ORDER BY tc.table_name, tc.constraint_name, kcu.ordinal_position
724 "#,
725 schema_name, table_list
726 )
727 }
728 DatabaseType::MySQL => {
729 format!(
730 r#"
731 SELECT
732 kcu.TABLE_NAME,
733 kcu.CONSTRAINT_NAME,
734 kcu.COLUMN_NAME,
735 kcu.REFERENCED_TABLE_NAME,
736 kcu.REFERENCED_TABLE_SCHEMA,
737 kcu.REFERENCED_COLUMN_NAME,
738 rc.DELETE_RULE,
739 rc.UPDATE_RULE
740 FROM information_schema.KEY_COLUMN_USAGE kcu
741 JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
742 ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
743 AND kcu.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
744 WHERE kcu.TABLE_SCHEMA = DATABASE()
745 AND kcu.REFERENCED_TABLE_NAME IS NOT NULL
746 AND kcu.TABLE_NAME IN ({})
747 ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME, kcu.ORDINAL_POSITION
748 "#,
749 table_list
750 )
751 }
752 _ => String::new(),
753 }
754 }
755}
756
757#[cfg(test)]
758mod tests {
759 use super::*;
760
761 #[tokio::test]
762 async fn test_concurrent_introspector() {
763 let config = IntrospectionConfig::default().with_max_concurrency(4);
764 let introspector = ConcurrentIntrospector::new(config);
765
766 let tables = vec![
767 "users".to_string(),
768 "posts".to_string(),
769 "comments".to_string(),
770 ];
771
772 let result = introspector
773 .introspect_tables(tables, |name| async move {
774 tokio::time::sleep(Duration::from_millis(10)).await;
776 Ok(TableMetadata::new(name))
777 })
778 .await;
779
780 assert_eq!(result.tables.len(), 3);
781 assert!(result.errors.is_empty());
782 }
783
784 #[tokio::test]
785 async fn test_batch_introspector() {
786 let batch = BatchIntrospector::new(2);
787
788 let tables = vec![
789 "t1".to_string(),
790 "t2".to_string(),
791 "t3".to_string(),
792 "t4".to_string(),
793 "t5".to_string(),
794 ];
795
796 let batches = batch.create_batches(tables);
797 assert_eq!(batches.len(), 3);
798 assert_eq!(batches[0].len(), 2);
799 assert_eq!(batches[1].len(), 2);
800 assert_eq!(batches[2].len(), 1);
801 }
802
803 #[tokio::test]
804 async fn test_introspection_with_errors() {
805 let config = IntrospectionConfig::default().with_max_concurrency(2);
806 let introspector = ConcurrentIntrospector::new(config);
807
808 let tables = vec!["good1".to_string(), "bad".to_string(), "good2".to_string()];
809
810 let result = introspector
811 .introspect_named(tables, |name| async move {
812 if name == "bad" {
813 Err("Table not found".to_string())
814 } else {
815 Ok(TableMetadata::new(name))
816 }
817 })
818 .await;
819
820 assert_eq!(result.tables.len(), 2);
821 assert_eq!(result.errors.len(), 1);
822 assert_eq!(result.errors[0].table, "bad");
823 }
824
825 #[test]
826 fn test_introspection_phase_progression() {
827 let mut phase = IntrospectionPhase::Tables;
828
829 assert_eq!(phase.name(), "tables");
830
831 phase = phase.next();
832 assert_eq!(phase, IntrospectionPhase::Columns);
833
834 phase = phase.next();
835 assert_eq!(phase, IntrospectionPhase::PrimaryKeys);
836
837 while phase != IntrospectionPhase::Complete {
839 phase = phase.next();
840 }
841
842 assert_eq!(phase.next(), IntrospectionPhase::Complete);
844 }
845
846 #[test]
847 fn test_batch_columns_query() {
848 let sql = queries::batch_columns_query(
849 crate::sql::DatabaseType::PostgreSQL,
850 &["users", "posts"],
851 Some("public"),
852 );
853
854 assert!(sql.contains("information_schema.columns"));
855 assert!(sql.contains("'users', 'posts'"));
856 }
857}