use std::future::Future;
use std::time::{Duration, Instant};
use super::concurrent::{ConcurrencyConfig, ConcurrentExecutor, TaskResult};
#[derive(Debug, Clone)]
pub struct IntrospectionConfig {
pub max_concurrency: usize,
pub table_timeout: Duration,
pub continue_on_error: bool,
pub batch_size: usize,
}
impl Default for IntrospectionConfig {
fn default() -> Self {
Self {
max_concurrency: 8,
table_timeout: Duration::from_secs(30),
continue_on_error: true,
batch_size: 0, }
}
}
impl IntrospectionConfig {
#[must_use]
pub fn for_large_database() -> Self {
Self {
max_concurrency: 16,
table_timeout: Duration::from_secs(60),
continue_on_error: true,
batch_size: 50,
}
}
#[must_use]
pub fn for_small_database() -> Self {
Self {
max_concurrency: 4,
table_timeout: Duration::from_secs(15),
continue_on_error: true,
batch_size: 0,
}
}
#[must_use]
pub fn with_max_concurrency(mut self, max: usize) -> Self {
self.max_concurrency = max.max(1);
self
}
#[must_use]
pub fn with_table_timeout(mut self, timeout: Duration) -> Self {
self.table_timeout = timeout;
self
}
#[must_use]
pub fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
}
#[derive(Debug, Clone)]
pub struct TableMetadata {
pub name: String,
pub columns: Vec<ColumnMetadata>,
pub indexes: Vec<IndexMetadata>,
pub foreign_keys: Vec<ForeignKeyMetadata>,
pub primary_key: Vec<String>,
pub comment: Option<String>,
}
impl TableMetadata {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
columns: Vec::new(),
indexes: Vec::new(),
foreign_keys: Vec::new(),
primary_key: Vec::new(),
comment: None,
}
}
}
#[derive(Debug, Clone)]
pub struct ColumnMetadata {
pub name: String,
pub db_type: String,
pub nullable: bool,
pub default: Option<String>,
pub auto_increment: bool,
pub is_primary_key: bool,
}
#[derive(Debug, Clone)]
pub struct IndexMetadata {
pub name: String,
pub columns: Vec<String>,
pub is_unique: bool,
pub is_primary: bool,
pub index_type: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ForeignKeyMetadata {
pub name: String,
pub columns: Vec<String>,
pub referenced_table: String,
pub referenced_columns: Vec<String>,
pub on_delete: String,
pub on_update: String,
}
#[derive(Debug)]
pub struct IntrospectionResult {
pub tables: Vec<TableMetadata>,
pub errors: Vec<IntrospectionError>,
pub duration: Duration,
pub max_concurrency: usize,
}
impl IntrospectionResult {
pub fn is_complete(&self) -> bool {
self.errors.is_empty()
}
pub fn get_table(&self, name: &str) -> Option<&TableMetadata> {
self.tables.iter().find(|t| t.name == name)
}
}
#[derive(Debug, Clone)]
pub struct IntrospectionError {
pub table: String,
pub message: String,
pub is_timeout: bool,
}
impl std::fmt::Display for IntrospectionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.is_timeout {
write!(
f,
"Timeout introspecting table '{}': {}",
self.table, self.message
)
} else {
write!(
f,
"Error introspecting table '{}': {}",
self.table, self.message
)
}
}
}
pub struct ConcurrentIntrospector {
config: IntrospectionConfig,
executor: ConcurrentExecutor,
}
impl ConcurrentIntrospector {
pub fn new(config: IntrospectionConfig) -> Self {
let executor_config = ConcurrencyConfig::default()
.with_max_concurrency(config.max_concurrency)
.with_timeout(config.table_timeout)
.with_continue_on_error(config.continue_on_error);
Self {
config,
executor: ConcurrentExecutor::new(executor_config),
}
}
pub async fn introspect_tables<F, Fut>(
&self,
table_names: Vec<String>,
operation: F,
) -> IntrospectionResult
where
F: Fn(String) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
{
let start = Instant::now();
let tasks: Vec<_> = table_names
.into_iter()
.map(|name| {
let op = operation.clone();
move || op(name)
})
.collect();
let (results, stats) = self.executor.execute_all(tasks).await;
let mut tables = Vec::new();
let mut errors = Vec::new();
for result in results {
match result {
TaskResult::Success { value, .. } => {
tables.push(value);
}
TaskResult::Error(e) => {
errors.push(IntrospectionError {
table: format!("task_{}", e.task_id),
message: e.message,
is_timeout: e.is_timeout,
});
}
}
}
IntrospectionResult {
tables,
errors,
duration: start.elapsed(),
max_concurrency: stats.max_concurrent,
}
}
pub async fn introspect_named<F, Fut>(
&self,
table_names: Vec<String>,
operation: F,
) -> IntrospectionResult
where
F: Fn(String) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Result<TableMetadata, String>> + Send + 'static,
{
let start = Instant::now();
let names_for_errors: Vec<_> = table_names.clone();
let tasks: Vec<_> = table_names
.into_iter()
.map(|name| {
let op = operation.clone();
move || op(name)
})
.collect();
let (results, stats) = self.executor.execute_all(tasks).await;
let mut tables = Vec::new();
let mut errors = Vec::new();
for (idx, result) in results.into_iter().enumerate() {
match result {
TaskResult::Success { value, .. } => {
tables.push(value);
}
TaskResult::Error(e) => {
let table_name = names_for_errors
.get(idx)
.cloned()
.unwrap_or_else(|| format!("unknown_{}", idx));
errors.push(IntrospectionError {
table: table_name,
message: e.message,
is_timeout: e.is_timeout,
});
}
}
}
IntrospectionResult {
tables,
errors,
duration: start.elapsed(),
max_concurrency: stats.max_concurrent,
}
}
pub fn config(&self) -> &IntrospectionConfig {
&self.config
}
}
pub struct BatchIntrospector {
batch_size: usize,
}
impl BatchIntrospector {
pub fn new(batch_size: usize) -> Self {
Self {
batch_size: batch_size.max(1),
}
}
pub fn create_batches(&self, tables: Vec<String>) -> Vec<Vec<String>> {
tables.chunks(self.batch_size).map(|c| c.to_vec()).collect()
}
pub async fn introspect_batched<F, Fut>(
&self,
tables: Vec<String>,
max_concurrency: usize,
operation: F,
) -> IntrospectionResult
where
F: Fn(Vec<String>) -> Fut + Clone + Send + 'static,
Fut: Future<Output = Result<Vec<TableMetadata>, String>> + Send + 'static,
{
let start = Instant::now();
let batches = self.create_batches(tables);
let config = IntrospectionConfig::default().with_max_concurrency(max_concurrency);
let executor = ConcurrentExecutor::new(
ConcurrencyConfig::default()
.with_max_concurrency(config.max_concurrency)
.with_continue_on_error(true),
);
let tasks: Vec<_> = batches
.into_iter()
.map(|batch| {
let op = operation.clone();
move || op(batch)
})
.collect();
let (results, stats) = executor.execute_all(tasks).await;
let mut tables = Vec::new();
let mut errors = Vec::new();
for result in results {
match result {
TaskResult::Success { value, .. } => {
tables.extend(value);
}
TaskResult::Error(e) => {
errors.push(IntrospectionError {
table: format!("batch_{}", e.task_id),
message: e.message,
is_timeout: e.is_timeout,
});
}
}
}
IntrospectionResult {
tables,
errors,
duration: start.elapsed(),
max_concurrency: stats.max_concurrent,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IntrospectionPhase {
Tables,
Columns,
PrimaryKeys,
ForeignKeys,
Indexes,
Enums,
Views,
Complete,
}
impl IntrospectionPhase {
pub fn next(self) -> Self {
match self {
Self::Tables => Self::Columns,
Self::Columns => Self::PrimaryKeys,
Self::PrimaryKeys => Self::ForeignKeys,
Self::ForeignKeys => Self::Indexes,
Self::Indexes => Self::Enums,
Self::Enums => Self::Views,
Self::Views => Self::Complete,
Self::Complete => Self::Complete,
}
}
pub fn name(&self) -> &'static str {
match self {
Self::Tables => "tables",
Self::Columns => "columns",
Self::PrimaryKeys => "primary keys",
Self::ForeignKeys => "foreign keys",
Self::Indexes => "indexes",
Self::Enums => "enums",
Self::Views => "views",
Self::Complete => "complete",
}
}
}
pub type ProgressCallback = Box<dyn Fn(IntrospectionPhase, usize, usize) + Send + Sync>;
pub struct IntrospectorBuilder {
config: IntrospectionConfig,
progress_callback: Option<ProgressCallback>,
}
impl IntrospectorBuilder {
pub fn new() -> Self {
Self {
config: IntrospectionConfig::default(),
progress_callback: None,
}
}
pub fn config(mut self, config: IntrospectionConfig) -> Self {
self.config = config;
self
}
pub fn on_progress<F>(mut self, callback: F) -> Self
where
F: Fn(IntrospectionPhase, usize, usize) + Send + Sync + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
pub fn build(self) -> ConcurrentIntrospector {
ConcurrentIntrospector::new(self.config)
}
}
impl Default for IntrospectorBuilder {
fn default() -> Self {
Self::new()
}
}
pub mod queries {
use crate::sql::DatabaseType;
pub fn batch_columns_query(
db_type: DatabaseType,
tables: &[&str],
schema: Option<&str>,
) -> String {
let schema_name = schema.unwrap_or("public");
let table_list = tables
.iter()
.map(|t| format!("'{}'", t))
.collect::<Vec<_>>()
.join(", ");
match db_type {
DatabaseType::PostgreSQL => {
format!(
r#"
SELECT
c.table_name,
c.column_name,
c.data_type,
c.udt_name,
c.is_nullable = 'YES' as nullable,
c.column_default,
c.character_maximum_length,
c.numeric_precision,
c.numeric_scale,
col_description(
(c.table_schema || '.' || c.table_name)::regclass,
c.ordinal_position
) as comment,
CASE
WHEN c.column_default LIKE 'nextval%' THEN true
WHEN c.is_identity = 'YES' THEN true
ELSE false
END as auto_increment
FROM information_schema.columns c
WHERE c.table_schema = '{}'
AND c.table_name IN ({})
ORDER BY c.table_name, c.ordinal_position
"#,
schema_name, table_list
)
}
DatabaseType::MySQL => {
format!(
r#"
SELECT
c.TABLE_NAME,
c.COLUMN_NAME,
c.DATA_TYPE,
c.COLUMN_TYPE,
c.IS_NULLABLE = 'YES' as nullable,
c.COLUMN_DEFAULT,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.COLUMN_COMMENT,
c.EXTRA LIKE '%auto_increment%' as auto_increment
FROM information_schema.COLUMNS c
WHERE c.TABLE_SCHEMA = DATABASE()
AND c.TABLE_NAME IN ({})
ORDER BY c.TABLE_NAME, c.ORDINAL_POSITION
"#,
table_list
)
}
_ => String::new(),
}
}
pub fn batch_indexes_query(
db_type: DatabaseType,
tables: &[&str],
schema: Option<&str>,
) -> String {
let schema_name = schema.unwrap_or("public");
let table_list = tables
.iter()
.map(|t| format!("'{}'", t))
.collect::<Vec<_>>()
.join(", ");
match db_type {
DatabaseType::PostgreSQL => {
format!(
r#"
SELECT
t.relname as table_name,
i.relname as index_name,
a.attname as column_name,
ix.indisunique as is_unique,
ix.indisprimary as is_primary,
am.amname as index_type
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(ix.indkey)
JOIN pg_am am ON i.relam = am.oid
JOIN pg_namespace n ON n.oid = t.relnamespace
WHERE n.nspname = '{}'
AND t.relname IN ({})
ORDER BY t.relname, i.relname, a.attnum
"#,
schema_name, table_list
)
}
DatabaseType::MySQL => {
format!(
r#"
SELECT
s.TABLE_NAME,
s.INDEX_NAME,
s.COLUMN_NAME,
s.NON_UNIQUE = 0 as is_unique,
s.INDEX_NAME = 'PRIMARY' as is_primary,
s.INDEX_TYPE
FROM information_schema.STATISTICS s
WHERE s.TABLE_SCHEMA = DATABASE()
AND s.TABLE_NAME IN ({})
ORDER BY s.TABLE_NAME, s.INDEX_NAME, s.SEQ_IN_INDEX
"#,
table_list
)
}
_ => String::new(),
}
}
pub fn batch_foreign_keys_query(
db_type: DatabaseType,
tables: &[&str],
schema: Option<&str>,
) -> String {
let schema_name = schema.unwrap_or("public");
let table_list = tables
.iter()
.map(|t| format!("'{}'", t))
.collect::<Vec<_>>()
.join(", ");
match db_type {
DatabaseType::PostgreSQL => {
format!(
r#"
SELECT
tc.table_name,
tc.constraint_name,
kcu.column_name,
ccu.table_name AS foreign_table,
ccu.table_schema AS foreign_schema,
ccu.column_name AS foreign_column,
rc.delete_rule,
rc.update_rule
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
JOIN information_schema.referential_constraints rc
ON tc.constraint_name = rc.constraint_name
AND tc.table_schema = rc.constraint_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.table_schema = '{}'
AND tc.table_name IN ({})
ORDER BY tc.table_name, tc.constraint_name, kcu.ordinal_position
"#,
schema_name, table_list
)
}
DatabaseType::MySQL => {
format!(
r#"
SELECT
kcu.TABLE_NAME,
kcu.CONSTRAINT_NAME,
kcu.COLUMN_NAME,
kcu.REFERENCED_TABLE_NAME,
kcu.REFERENCED_TABLE_SCHEMA,
kcu.REFERENCED_COLUMN_NAME,
rc.DELETE_RULE,
rc.UPDATE_RULE
FROM information_schema.KEY_COLUMN_USAGE kcu
JOIN information_schema.REFERENTIAL_CONSTRAINTS rc
ON kcu.CONSTRAINT_NAME = rc.CONSTRAINT_NAME
AND kcu.TABLE_SCHEMA = rc.CONSTRAINT_SCHEMA
WHERE kcu.TABLE_SCHEMA = DATABASE()
AND kcu.REFERENCED_TABLE_NAME IS NOT NULL
AND kcu.TABLE_NAME IN ({})
ORDER BY kcu.TABLE_NAME, kcu.CONSTRAINT_NAME, kcu.ORDINAL_POSITION
"#,
table_list
)
}
_ => String::new(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_concurrent_introspector() {
let config = IntrospectionConfig::default().with_max_concurrency(4);
let introspector = ConcurrentIntrospector::new(config);
let tables = vec![
"users".to_string(),
"posts".to_string(),
"comments".to_string(),
];
let result = introspector
.introspect_tables(tables, |name| async move {
tokio::time::sleep(Duration::from_millis(10)).await;
Ok(TableMetadata::new(name))
})
.await;
assert_eq!(result.tables.len(), 3);
assert!(result.errors.is_empty());
}
#[tokio::test]
async fn test_batch_introspector() {
let batch = BatchIntrospector::new(2);
let tables = vec![
"t1".to_string(),
"t2".to_string(),
"t3".to_string(),
"t4".to_string(),
"t5".to_string(),
];
let batches = batch.create_batches(tables);
assert_eq!(batches.len(), 3);
assert_eq!(batches[0].len(), 2);
assert_eq!(batches[1].len(), 2);
assert_eq!(batches[2].len(), 1);
}
#[tokio::test]
async fn test_introspection_with_errors() {
let config = IntrospectionConfig::default().with_max_concurrency(2);
let introspector = ConcurrentIntrospector::new(config);
let tables = vec!["good1".to_string(), "bad".to_string(), "good2".to_string()];
let result = introspector
.introspect_named(tables, |name| async move {
if name == "bad" {
Err("Table not found".to_string())
} else {
Ok(TableMetadata::new(name))
}
})
.await;
assert_eq!(result.tables.len(), 2);
assert_eq!(result.errors.len(), 1);
assert_eq!(result.errors[0].table, "bad");
}
#[test]
fn test_introspection_phase_progression() {
let mut phase = IntrospectionPhase::Tables;
assert_eq!(phase.name(), "tables");
phase = phase.next();
assert_eq!(phase, IntrospectionPhase::Columns);
phase = phase.next();
assert_eq!(phase, IntrospectionPhase::PrimaryKeys);
while phase != IntrospectionPhase::Complete {
phase = phase.next();
}
assert_eq!(phase.next(), IntrospectionPhase::Complete);
}
#[test]
fn test_batch_columns_query() {
let sql = queries::batch_columns_query(
crate::sql::DatabaseType::PostgreSQL,
&["users", "posts"],
Some("public"),
);
assert!(sql.contains("information_schema.columns"));
assert!(sql.contains("'users', 'posts'"));
}
}