use pandrs::dataframe::base::DataFrame;
use pandrs::error::Result;
use pandrs::series::Series;
#[cfg(feature = "sql")]
use pandrs::io::sql::{
ColumnDefinition, DatabaseConnection, InsertMethod, PoolConfig, SqlWriteOptions, TableSchema,
WriteMode,
};
#[cfg(feature = "sql")]
use std::time::Duration;
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn main() -> Result<()> {
println!("PandRS Database Integration - Phase 2 Alpha.6");
println!("============================================");
let financial_data = create_financial_dataset()?;
let user_data = create_user_dataset()?;
println!("\n=== 1. Connection Pool Management ===");
#[cfg(feature = "sql")]
connection_pool_example(&financial_data)?;
println!("\n=== 2. Transaction Management ===");
transaction_management_example(&financial_data)?;
println!("\n=== 3. Type-Safe Query Builder ===");
query_builder_example(&financial_data)?;
println!("\n=== 4. Schema Introspection ===");
schema_introspection_example()?;
println!("\n=== 5. Bulk Insert Operations ===");
#[cfg(feature = "sql")]
{
let large_dataset = create_large_transaction_dataset(10000)?;
bulk_insert_example(&large_dataset)?;
}
println!("\n=== 6. Multi-Database Integration ===");
multi_database_example(&financial_data, &user_data)?;
println!("\n=== 7. Advanced Query Operations ===");
advanced_query_example(&financial_data)?;
println!("\n=== 8. Database Performance Monitoring ===");
#[cfg(feature = "sql")]
performance_monitoring_example()?;
println!("\nAll database integration features demonstrated successfully!");
Ok(())
}
#[cfg(feature = "sql")]
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn connection_pool_example(_df: &DataFrame) -> Result<()> {
println!("Demonstrating async connection pool management...");
let pool_configs = vec![
(
"Development",
PoolConfig {
max_connections: 5,
min_connections: 1,
connect_timeout: Duration::from_secs(10),
idle_timeout: Some(Duration::from_secs(300)),
},
),
(
"Production",
PoolConfig {
max_connections: 50,
min_connections: 10,
connect_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(600)),
},
),
(
"High-Load",
PoolConfig {
max_connections: 100,
min_connections: 25,
connect_timeout: Duration::from_secs(15),
idle_timeout: Some(Duration::from_secs(300)),
},
),
];
println!(" Connection pool configurations:");
for (env, config) in &pool_configs {
println!(" • {} environment:", env);
println!(" - Max connections: {}", config.max_connections);
println!(" - Min connections: {}", config.min_connections);
println!(" - Connect timeout: {:?}", config.connect_timeout);
println!(
" - Idle timeout: {:?}",
config.idle_timeout.unwrap_or_default()
);
}
println!(" Async operation simulation:");
let operations = vec![
("Query execution", 45), ("Bulk insert", 150),
("Complex aggregation", 230),
("Schema introspection", 80),
("Transaction commit", 25),
];
for (operation, duration_ms) in operations {
println!(" • {}: {}ms", operation, duration_ms);
}
println!(" Connection pool statistics:");
let pool_stats = vec![
("Active connections", 15),
("Idle connections", 8),
("Total connections", 23),
("Pending requests", 2),
("Failed connections", 1),
("Average response time", 42), ("Peak connections", 35),
("Connection utilization", 68), ];
for (metric, value) in pool_stats {
if metric.contains("time") {
println!(" • {}: {}ms", metric, value);
} else if metric.contains("utilization") {
println!(" • {}: {}%", metric, value);
} else {
println!(" • {}: {}", metric, value);
}
}
println!(" Connection health monitoring:");
println!(" • Health check interval: 30 seconds");
println!(" • Dead connection detection: enabled");
println!(" • Automatic reconnection: enabled");
println!(" • Connection validation: SQL ping");
println!(" • Pool status: ✓ Healthy");
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn transaction_management_example(_df: &DataFrame) -> Result<()> {
println!("Demonstrating advanced transaction management...");
let isolation_levels = vec![
("Read Uncommitted", "Fastest, allows dirty reads"),
("Read Committed", "Default, prevents dirty reads"),
("Repeatable Read", "Prevents phantom reads"),
("Serializable", "Strictest, fully isolated"),
];
println!(" Transaction isolation levels:");
for (level, description) in isolation_levels {
println!(" • {level}: {description}");
}
println!(" Complex transaction scenario:");
let transaction_steps = [
"BEGIN TRANSACTION (Serializable)",
"INSERT INTO financial_data (symbol, price, volume) VALUES (?, ?, ?)",
"UPDATE portfolio SET total_value = total_value + ?",
"INSERT INTO audit_log (action, timestamp, user_id) VALUES (?, ?, ?)",
"-- Validation checks --",
"SELECT COUNT(*) FROM financial_data WHERE symbol = ?",
"SELECT total_value FROM portfolio WHERE user_id = ?",
"-- Business rule validation --",
"IF validation_passed THEN COMMIT ELSE ROLLBACK",
];
for (i, step) in transaction_steps.iter().enumerate() {
if step.starts_with("--") {
println!(" {step}");
} else {
println!(" {}. {}", i + 1, step);
}
}
println!(" Rollback scenarios and handling:");
let rollback_scenarios = vec![
(
"Constraint violation",
"UNIQUE constraint failed",
"Automatic rollback",
),
(
"Business rule failure",
"Insufficient funds",
"Conditional rollback",
),
(
"Deadlock detection",
"Resource conflict",
"Retry with backoff",
),
("Connection timeout", "Network issue", "Full rollback"),
("Manual rollback", "User cancellation", "Explicit rollback"),
];
for (scenario, cause, handling) in rollback_scenarios {
println!(" • {scenario}: {cause} → {handling}");
}
println!(" Transaction performance metrics:");
let perf_metrics = vec![
("Average commit time", "15ms"),
("Average rollback time", "8ms"),
("Lock wait time", "2ms"),
("Transaction throughput", "450 tx/sec"),
("Deadlock rate", "0.02%"),
("Success rate", "99.8%"),
];
for (metric, value) in perf_metrics {
println!(" • {metric}: {value}");
}
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn query_builder_example(_df: &DataFrame) -> Result<()> {
println!("Demonstrating type-safe SQL query builder...");
println!(" Basic query building:");
let basic_queries = vec![
("Simple SELECT", "SELECT symbol, price FROM financial_data WHERE price > 100"),
("With JOIN", "SELECT f.symbol, f.price, s.sector_name FROM financial_data f JOIN sectors s ON f.sector_id = s.id"),
("Aggregation", "SELECT sector_id, AVG(price) as avg_price, SUM(volume) as total_volume FROM financial_data GROUP BY sector_id"),
("Window function", "SELECT symbol, price, LAG(price, 1) OVER (PARTITION BY sector_id ORDER BY date) as prev_price FROM financial_data"),
];
for (query_type, sql) in basic_queries {
println!(" • {query_type}: {sql}");
}
println!(" Parameterized queries with type safety:");
let parameterized_queries = vec![
(
"Price range filter",
"WHERE price BETWEEN ? AND ?",
vec!["100.0", "500.0"],
),
(
"Date range",
"WHERE date >= ? AND date < ?",
vec!["2024-01-01", "2024-12-31"],
),
(
"Multiple symbols",
"WHERE symbol IN (?, ?, ?)",
vec!["AAPL", "GOOGL", "MSFT"],
),
(
"Complex condition",
"WHERE (price > ? OR volume > ?) AND sector_id = ?",
vec!["200.0", "1000000", "1"],
),
];
for (description, where_clause, params) in parameterized_queries {
println!(" • {description}: {where_clause}");
println!(" Parameters: {params:?}");
}
println!(" Advanced query building features:");
let advanced_features = vec![
"Dynamic WHERE clause construction",
"Automatic parameter binding and type checking",
"SQL injection prevention",
"Query plan caching and optimization",
"Subquery and CTE support",
"Cross-database compatibility",
"Query execution time prediction",
"Automatic index recommendation",
];
for feature in advanced_features {
println!(" • {feature}");
}
println!(" Fluent API example (conceptual):");
println!(" QueryBuilder::new()");
println!(" .select(&[\"symbol\", \"price\", \"volume\"])");
println!(" .from(\"financial_data\")");
println!(" .join(\"sectors\").on(\"financial_data.sector_id = sectors.id\")");
println!(" .where_clause(\"price > ?\").param(100.0)");
println!(" .where_clause(\"volume > ?\").param(1000000)");
println!(" .order_by(\"price DESC\")");
println!(" .limit(50)");
println!(" .build()?");
println!(" Query optimization features:");
let optimizations = vec![
("Index usage analysis", "Automatic index recommendation"),
("Join order optimization", "Cost-based join reordering"),
("Predicate pushdown", "Filter early in execution plan"),
("Column pruning", "Select only required columns"),
("Partition elimination", "Skip irrelevant partitions"),
("Query plan caching", "Reuse compiled plans"),
];
for (optimization, description) in optimizations {
println!(" • {optimization}: {description}");
}
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn schema_introspection_example() -> Result<()> {
println!("Demonstrating database schema introspection...");
#[cfg(feature = "sql")]
{
let database_schema = vec![
TableSchema {
name: "financial_data".to_string(),
columns: vec![
ColumnDefinition {
name: "id".to_string(),
data_type: "BIGSERIAL PRIMARY KEY".to_string(),
nullable: false,
default_value: Some("nextval('financial_data_id_seq')".to_string()),
max_length: None,
precision: None,
scale: None,
auto_increment: true,
},
ColumnDefinition {
name: "symbol".to_string(),
data_type: "VARCHAR(10)".to_string(),
nullable: false,
default_value: None,
max_length: Some(10),
precision: None,
scale: None,
auto_increment: false,
},
ColumnDefinition {
name: "price".to_string(),
data_type: "DECIMAL(10,2)".to_string(),
nullable: true,
default_value: Some("0.00".to_string()),
max_length: None,
precision: Some(10),
scale: Some(2),
auto_increment: false,
},
ColumnDefinition {
name: "volume".to_string(),
data_type: "BIGINT".to_string(),
nullable: true,
default_value: Some("0".to_string()),
max_length: None,
precision: None,
scale: None,
auto_increment: false,
},
ColumnDefinition {
name: "updated_at".to_string(),
data_type: "TIMESTAMP WITH TIME ZONE".to_string(),
nullable: false,
default_value: Some("CURRENT_TIMESTAMP".to_string()),
max_length: None,
precision: None,
scale: None,
auto_increment: false,
},
],
primary_keys: vec!["id".to_string()],
foreign_keys: vec![],
indexes: vec![],
},
TableSchema {
name: "sectors".to_string(),
columns: vec![
ColumnDefinition {
name: "id".to_string(),
data_type: "SERIAL PRIMARY KEY".to_string(),
nullable: false,
default_value: Some("nextval('sectors_id_seq')".to_string()),
max_length: None,
precision: None,
scale: None,
auto_increment: true,
},
ColumnDefinition {
name: "name".to_string(),
data_type: "VARCHAR(50)".to_string(),
nullable: false,
default_value: None,
max_length: Some(50),
precision: None,
scale: None,
auto_increment: false,
},
ColumnDefinition {
name: "description".to_string(),
data_type: "TEXT".to_string(),
nullable: true,
default_value: None,
max_length: None,
precision: None,
scale: None,
auto_increment: false,
},
],
primary_keys: vec!["id".to_string()],
foreign_keys: vec![],
indexes: vec![],
},
];
println!(" Database schema analysis:");
for table in &database_schema {
println!(" • Table: {}", table.name);
println!(" - Columns: {}", table.columns.len());
println!(" - Primary keys: {:?}", table.primary_keys);
for column in &table.columns {
let nullable_str = if column.nullable { "NULL" } else { "NOT NULL" };
let default_str = column
.default_value
.as_ref()
.map(|d| format!(" DEFAULT {}", d))
.unwrap_or_default();
println!(
" - {} {} {}{}",
column.name, column.data_type, nullable_str, default_str
);
}
println!();
}
println!(" Index analysis:");
let indexes = vec![
(
"financial_data_pkey",
"financial_data",
"PRIMARY KEY (id)",
"BTREE",
),
("idx_symbol", "financial_data", "symbol", "BTREE"),
(
"idx_price_volume",
"financial_data",
"price, volume",
"BTREE",
),
("idx_updated_at", "financial_data", "updated_at", "BTREE"),
("sectors_pkey", "sectors", "PRIMARY KEY (id)", "BTREE"),
("idx_sector_name", "sectors", "name", "BTREE"),
];
for (index_name, table_name, columns, index_type) in indexes {
println!(
" • {}: {} ON {} ({})",
index_name, index_type, table_name, columns
);
}
println!(" Constraint analysis:");
let constraints = vec![
(
"financial_data",
"CHECK",
"price >= 0",
"Ensure non-negative prices",
),
(
"financial_data",
"CHECK",
"volume >= 0",
"Ensure non-negative volume",
),
(
"financial_data",
"UNIQUE",
"symbol, updated_at",
"Prevent duplicate entries",
),
("sectors", "UNIQUE", "name", "Unique sector names"),
];
for (table, constraint_type, definition, description) in constraints {
println!(
" • {} ({}): {} - {}",
table, constraint_type, definition, description
);
}
println!(" Database statistics:");
let db_stats = vec![
("Total tables", 15),
("Total indexes", 28),
("Total constraints", 12),
("Foreign key relationships", 8),
("Views", 5),
("Stored procedures", 3),
("Database size", 250), ("Largest table", 45), ];
for (metric, value) in db_stats {
if metric.contains("size") || metric == "Largest table" {
println!(" • {}: {} MB", metric, value);
} else {
println!(" • {}: {}", metric, value);
}
}
}
#[cfg(not(feature = "sql"))]
{
println!("SQL features require 'sql' feature flag to be enabled.");
println!(
"Compile with: cargo run --example database_integration_example --features \"sql\""
);
}
Ok(())
}
#[cfg(feature = "sql")]
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn bulk_insert_example(df: &DataFrame) -> Result<()> {
println!("Demonstrating bulk insert operations...");
let total_rows = df.row_count();
println!(" Dataset: {} rows to insert", total_rows);
let insert_strategies = vec![
("Single INSERT", 1, "Simple but slow"),
("Batch INSERT", 100, "Good balance of speed and memory"),
("Bulk INSERT", 5000, "Fastest for large datasets"),
("COPY command", 10000, "Database-specific optimization"),
];
println!(" Insert strategies comparison:");
for (strategy, batch_size, description) in &insert_strategies {
let num_batches = total_rows.div_ceil(*batch_size);
let estimated_time = match *strategy {
"Single INSERT" => total_rows * 2, "Batch INSERT" => num_batches * 50, "Bulk INSERT" => num_batches * 200, "COPY command" => num_batches * 100, _ => 1000,
};
println!(
" • {}: {} batches, ~{}ms total",
strategy, num_batches, estimated_time
);
println!(" Description: {}", description);
}
let write_options = SqlWriteOptions {
chunksize: Some(5000),
if_exists: WriteMode::Append,
method: InsertMethod::Multi,
index: false, ..Default::default()
};
println!(" Optimal bulk insert configuration:");
println!(
" • Chunk size: {} rows",
write_options.chunksize.unwrap()
);
println!(" • Insert method: {:?}", write_options.method);
println!(" • Write mode: {:?}", write_options.if_exists);
println!(" • Include index: {}", write_options.index);
let chunk_size = write_options.chunksize.unwrap();
let num_chunks = total_rows.div_ceil(chunk_size);
println!(" Bulk insert process:");
println!(" 1. Preparing data for bulk insert...");
println!(" 2. Disabling indexes and constraints...");
println!(" 3. Beginning transaction...");
for i in 0..num_chunks.min(3) {
let start_row = i * chunk_size;
let end_row = (start_row + chunk_size).min(total_rows);
let chunk_time = 180 + (i * 10);
println!(
" 4.{} Inserting chunk {}/{}: rows {}-{} ({}ms)",
i + 1,
i + 1,
num_chunks,
start_row,
end_row,
chunk_time
);
}
if num_chunks > 3 {
println!(" 4.+ ... {} more chunks processed", num_chunks - 3);
}
println!(" 5. Committing transaction...");
println!(" 6. Rebuilding indexes...");
println!(" 7. Updating table statistics...");
println!(" ✓ Bulk insert completed successfully");
let total_time = num_chunks * 200; let rows_per_second = (total_rows as f64 / (total_time as f64 / 1000.0)) as usize;
println!(" Performance metrics:");
println!(" • Total time: {}ms", total_time);
println!(" • Throughput: {} rows/second", rows_per_second);
println!(
" • Memory usage: Peak {} MB",
chunk_size * 8 / 1024 / 1024
);
println!(" • Success rate: 100%");
println!(" Error handling and recovery:");
println!(" • Constraint violation: Skip row with logging");
println!(" • Duplicate key: Update existing record");
println!(" • Data type mismatch: Automatic conversion");
println!(" • Transaction failure: Full rollback and retry");
println!(" • Connection loss: Resume from last checkpoint");
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn multi_database_example(_financial_data: &DataFrame, _user_data: &DataFrame) -> Result<()> {
println!("Demonstrating multi-database integration...");
#[cfg(feature = "sql")]
{
let databases = [
(
"PostgreSQL Production",
DatabaseConnection::PostgreSQL(
"postgresql://user:pass@prod.example.com/financial".to_string(),
),
),
(
"MySQL Analytics",
DatabaseConnection::MySQL(
"mysql://user:pass@analytics.example.com/reports".to_string(),
),
),
(
"SQLite Local Cache",
DatabaseConnection::Sqlite("cache.db".to_string()),
),
];
println!(" Connected databases:");
for (i, (name, connection)) in databases.iter().enumerate() {
match connection {
DatabaseConnection::Sqlite(path) => {
println!(" {}. {}: SQLite at {}", i + 1, name, path);
}
#[cfg(feature = "sql")]
DatabaseConnection::PostgreSQL(_) => {
println!(" {}. {}: PostgreSQL (production)", i + 1, name);
}
#[cfg(feature = "sql")]
DatabaseConnection::MySQL(_) => {
println!(" {}. {}: MySQL (analytics)", i + 1, name);
}
#[cfg(feature = "sql")]
DatabaseConnection::Generic(_) => {
println!(" {}. {}: Generic database", i + 1, name);
}
}
}
println!(" Cross-database operations:");
let cross_db_operations = vec![
(
"Data synchronization",
"PostgreSQL → MySQL",
"Sync financial data to analytics DB",
),
(
"Local caching",
"PostgreSQL → SQLite",
"Cache frequently accessed data locally",
),
(
"Aggregation pipeline",
"MySQL → PostgreSQL",
"Move aggregated reports to production",
),
(
"Backup and restore",
"PostgreSQL → SQLite",
"Create local backup copies",
),
(
"Data migration",
"SQLite → PostgreSQL",
"Import historical data",
),
];
for (operation, direction, description) in cross_db_operations {
println!(" • {}: {} - {}", operation, direction, description);
}
println!(" Database-specific optimizations:");
let optimizations = vec![
(
"PostgreSQL",
vec![
"VACUUM ANALYZE for statistics",
"Parallel query execution",
"Partial indexes for performance",
"Connection pooling with pgbouncer",
],
),
(
"MySQL",
vec![
"InnoDB buffer pool tuning",
"Query cache optimization",
"Partitioning for large tables",
"Read/write splitting",
],
),
(
"SQLite",
vec![
"WAL mode for concurrency",
"PRAGMA optimizations",
"Memory-mapped I/O",
"Prepared statement caching",
],
),
];
for (db_type, opts) in optimizations {
println!(" • {} optimizations:", db_type);
for opt in opts {
println!(" - {}", opt);
}
}
println!(" Multi-database query coordination:");
println!(" 1. Query planning across databases");
println!(" 2. Data locality optimization");
println!(" 3. Cross-database JOIN operations");
println!(" 4. Transaction coordination (2PC)");
println!(" 5. Result set consolidation");
println!(" 6. Distributed query caching");
println!(" Database federation example:");
println!(" • Financial data: PostgreSQL (source of truth)");
println!(" • User profiles: MySQL (CRM system)");
println!(" • Local cache: SQLite (performance layer)");
println!(" • Federated query: JOIN across all three databases");
println!(" • Result: Unified view of user financial portfolios");
}
#[cfg(not(feature = "sql"))]
{
println!("SQL features require 'sql' feature flag to be enabled.");
println!(
"Compile with: cargo run --example database_integration_example --features \"sql\""
);
}
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn advanced_query_example(_df: &DataFrame) -> Result<()> {
println!("Demonstrating advanced query operations...");
println!(" Complex analytical queries:");
let analytical_queries = vec![
("Moving averages", "SELECT symbol, price, AVG(price) OVER (PARTITION BY symbol ORDER BY date ROWS 20 PRECEDING) as ma_20 FROM financial_data"),
("Percentile analysis", "SELECT sector, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) as median_price FROM financial_data GROUP BY sector"),
("Correlation analysis", "SELECT CORR(price, volume) as price_volume_correlation FROM financial_data WHERE date >= '2024-01-01'"),
("Time series gaps", "WITH date_series AS (SELECT generate_series('2024-01-01'::date, '2024-12-31'::date, '1 day') as date) SELECT d.date FROM date_series d LEFT JOIN financial_data f ON d.date = f.date WHERE f.date IS NULL"),
];
for (query_type, sql) in analytical_queries {
println!(" • {query_type}: {sql}");
}
println!(" Recursive and hierarchical queries:");
let recursive_query = r#"
WITH RECURSIVE sector_hierarchy AS (
SELECT id, name, parent_id, 1 as level
FROM sectors WHERE parent_id IS NULL
UNION ALL
SELECT s.id, s.name, s.parent_id, sh.level + 1
FROM sectors s JOIN sector_hierarchy sh ON s.parent_id = sh.id
)
SELECT * FROM sector_hierarchy ORDER BY level, name
"#;
println!(
" • Hierarchical data: {}",
recursive_query.trim().replace('\n', " ")
);
println!(" Query performance optimization:");
let optimizations = vec![
("Index hints", "USE INDEX (idx_symbol) for faster lookups"),
("Query rewriting", "Transform subqueries to JOINs"),
("Partition pruning", "Eliminate irrelevant partitions"),
("Parallel execution", "Use multiple CPU cores"),
("Materialized views", "Pre-compute complex aggregations"),
("Query plan caching", "Reuse execution plans"),
];
for (technique, description) in optimizations {
println!(" • {technique}: {description}");
}
println!(" Query monitoring and analysis:");
let monitoring_metrics = vec![
("Execution time", "Track query performance trends"),
("Resource usage", "Monitor CPU and memory consumption"),
("Lock contention", "Identify blocking queries"),
("Index usage", "Optimize index strategies"),
("Query frequency", "Identify optimization candidates"),
("Error rates", "Track query failures"),
];
for (metric, purpose) in monitoring_metrics {
println!(" • {metric}: {purpose}");
}
Ok(())
}
#[cfg(feature = "sql")]
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn performance_monitoring_example() -> Result<()> {
println!("Demonstrating database performance monitoring...");
println!(" Real-time performance metrics:");
let realtime_metrics = vec![
("Active connections", 45, "/100"),
("Queries per second", 1250, ""),
("Average response time", 35, "ms"),
("CPU utilization", 68, "%"),
("Memory usage", 85, "%"),
("Disk I/O", 420, "MB/s"),
("Cache hit ratio", 94, "%"),
("Lock wait time", 5, "ms"),
];
for (metric, value, unit) in realtime_metrics {
println!(" • {}: {}{}", metric, value, unit);
}
println!(" Historical performance trends (24h):");
let historical_trends = vec![
("Peak QPS", 2850, "12:30 PM"),
("Slowest query", 15600, "ms at 2:45 AM"),
("Max connections", 78, "1:15 PM"),
("Largest lock wait", 1250, "ms at 11:20 AM"),
];
for (metric, value, timing) in historical_trends {
let unit = if metric.contains("time") || metric.contains("wait") {
"ms"
} else {
""
};
println!(" • {}: {}{} ({})", metric, value, unit, timing);
}
println!(" • Cache efficiency: min 89%, max 96% (min 89%, max 96%)");
println!(" Performance alerts and thresholds:");
let alerts = vec![
(
"High CPU usage",
"> 80%",
"Warning",
"Scale up recommendation",
),
(
"Slow queries",
"> 5 seconds",
"Critical",
"Query optimization needed",
),
(
"Connection limit",
"> 90%",
"Warning",
"Pool size increase needed",
),
(
"Deadlock rate",
"> 1%",
"Critical",
"Transaction review required",
),
(
"Disk space",
"< 10% free",
"Critical",
"Cleanup or expansion needed",
),
(
"Cache hit ratio",
"< 85%",
"Warning",
"Memory tuning recommended",
),
];
for (alert_type, threshold, severity, action) in alerts {
println!(
" • {} ({}): {} → {}",
alert_type, threshold, severity, action
);
}
println!(" Automated optimization recommendations:");
let recommendations = vec![
(
"Create index on financial_data.symbol",
"High",
"Improve WHERE clause performance",
),
(
"Increase connection pool size",
"Medium",
"Reduce connection wait times",
),
(
"Partition large tables by date",
"High",
"Improve query performance",
),
("Update table statistics", "Low", "Better query plans"),
("Consider read replicas", "Medium", "Distribute read load"),
(
"Optimize slow query #1247",
"High",
"Rewrite complex subquery",
),
];
for (recommendation, priority, benefit) in recommendations {
println!(" • {} ({}): {}", recommendation, priority, benefit);
}
let health_components = vec![
("Performance", 8.5),
("Availability", 9.8),
("Security", 9.2),
("Capacity", 7.8),
("Backup status", 9.5),
];
let overall_health = health_components
.iter()
.map(|(_, score)| score)
.sum::<f64>()
/ health_components.len() as f64;
println!(" Database health score:");
for (component, score) in health_components {
let status = if score >= 9.0 {
"Excellent"
} else if score >= 8.0 {
"Good"
} else if score >= 7.0 {
"Fair"
} else {
"Poor"
};
println!(" • {}: {:.1}/10 ({})", component, score, status);
}
println!(" • Overall health: {:.1}/10", overall_health);
Ok(())
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn create_financial_dataset() -> Result<DataFrame> {
let mut df = DataFrame::new();
let symbols = vec![
"AAPL", "GOOGL", "MSFT", "AMZN", "TSLA", "META", "NVDA", "JPM",
];
let prices = vec![
150.25, 2800.50, 300.75, 3200.00, 800.25, 350.80, 450.60, 140.90,
];
let volumes = vec![
1500000, 800000, 1200000, 900000, 2000000, 1100000, 1300000, 950000,
];
let sectors = vec![
"Technology",
"Technology",
"Technology",
"E-commerce",
"Automotive",
"Technology",
"Technology",
"Finance",
];
df.add_column(
"symbol".to_string(),
Series::new(
symbols.into_iter().map(|s| s.to_string()).collect(),
Some("symbol".to_string()),
)?,
)?;
df.add_column(
"price".to_string(),
Series::new(
prices.into_iter().map(|p| p.to_string()).collect(),
Some("price".to_string()),
)?,
)?;
df.add_column(
"volume".to_string(),
Series::new(
volumes.into_iter().map(|v| v.to_string()).collect(),
Some("volume".to_string()),
)?,
)?;
df.add_column(
"sector".to_string(),
Series::new(
sectors.into_iter().map(|s| s.to_string()).collect(),
Some("sector".to_string()),
)?,
)?;
Ok(df)
}
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn create_user_dataset() -> Result<DataFrame> {
let mut df = DataFrame::new();
let user_ids = vec![1001, 1002, 1003, 1004, 1005];
let usernames = vec!["alice", "bob", "charlie", "diana", "eve"];
let emails = vec![
"alice@example.com",
"bob@example.com",
"charlie@example.com",
"diana@example.com",
"eve@example.com",
];
let account_types = vec!["Premium", "Basic", "Premium", "Enterprise", "Basic"];
df.add_column(
"user_id".to_string(),
Series::new(
user_ids.into_iter().map(|id| id.to_string()).collect(),
Some("user_id".to_string()),
)?,
)?;
df.add_column(
"username".to_string(),
Series::new(
usernames.into_iter().map(|s| s.to_string()).collect(),
Some("username".to_string()),
)?,
)?;
df.add_column(
"email".to_string(),
Series::new(
emails.into_iter().map(|s| s.to_string()).collect(),
Some("email".to_string()),
)?,
)?;
df.add_column(
"account_type".to_string(),
Series::new(
account_types.into_iter().map(|s| s.to_string()).collect(),
Some("account_type".to_string()),
)?,
)?;
Ok(df)
}
#[cfg(feature = "sql")]
#[allow(clippy::result_large_err)]
#[allow(clippy::result_large_err)]
fn create_large_transaction_dataset(size: usize) -> Result<DataFrame> {
let mut df = DataFrame::new();
let mut transaction_ids = Vec::with_capacity(size);
let mut user_ids = Vec::with_capacity(size);
let mut amounts = Vec::with_capacity(size);
let mut types = Vec::with_capacity(size);
let transaction_types = ["Buy", "Sell", "Dividend", "Transfer"];
for i in 0..size {
transaction_ids.push(format!("TXN_{:08}", i + 1));
user_ids.push((1000 + (i % 100)).to_string());
amounts.push((10.0 + (i as f64 * 0.50) % 10000.0).to_string());
types.push(transaction_types[i % transaction_types.len()].to_string());
}
df.add_column(
"transaction_id".to_string(),
Series::new(transaction_ids, Some("transaction_id".to_string()))?,
)?;
df.add_column(
"user_id".to_string(),
Series::new(user_ids, Some("user_id".to_string()))?,
)?;
df.add_column(
"amount".to_string(),
Series::new(amounts, Some("amount".to_string()))?,
)?;
df.add_column(
"type".to_string(),
Series::new(types, Some("type".to_string()))?,
)?;
Ok(df)
}