use crate::core::RuntimeContext;
use crate::engine::{DagExecutor, MissionStep, StepType, StepStatus};
use serde_json::json;
use std::collections::HashMap;
use tokio;
#[tokio::test]
async fn test_sql_query_step_type() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let sql_query_step = MissionStep {
id: "test_sql_query".to_string(),
name: Some("Test SQL Query".to_string()),
step_type: StepType::SqlQuery,
parameters: json!({
"query": "SELECT 1 as test_column",
"database_url": "sqlite://memory:"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&sql_query_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("query").is_some());
}
#[tokio::test]
async fn test_redis_set_step_type() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let redis_set_step = MissionStep {
id: "test_redis_set".to_string(),
name: Some("Test Redis Set".to_string()),
step_type: StepType::RedisSet,
parameters: json!({
"key": "test_key",
"value": "test_value",
"redis_url": "redis://127.0.0.1:6379"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&redis_set_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("key").is_some());
}
#[tokio::test]
async fn test_redis_get_step_type() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let redis_get_step = MissionStep {
id: "test_redis_get".to_string(),
name: Some("Test Redis Get".to_string()),
step_type: StepType::RedisGet,
parameters: json!({
"key": "test_key",
"redis_url": "redis://127.0.0.1:6379"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&redis_get_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("key").is_some());
}
#[tokio::test]
async fn test_db_backup_step_type() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let db_backup_step = MissionStep {
id: "test_db_backup".to_string(),
name: Some("Test Database Backup".to_string()),
step_type: StepType::DbBackup,
parameters: json!({
"database_url": "sqlite://test.db",
"backup_path": "/tmp/test_backup.db",
"format": "sqlite"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&db_backup_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("backup_path").is_some());
}
#[tokio::test]
async fn test_db_migrate_step_type() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let db_migrate_step = MissionStep {
id: "test_db_migrate".to_string(),
name: Some("Test Database Migration".to_string()),
step_type: StepType::DbMigrate,
parameters: json!({
"database_url": "sqlite://memory:",
"migration_path": "/tmp/migrations",
"direction": "up"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&db_migrate_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("direction").is_some());
}
#[tokio::test]
async fn test_database_operations_error_handling() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let invalid_db_step = MissionStep {
id: "test_invalid_db".to_string(),
name: Some("Test Invalid Database".to_string()),
step_type: StepType::SqlQuery,
parameters: json!({
"query": "SELECT 1",
"database_url": "invalid://database/url"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&invalid_db_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
}
#[tokio::test]
async fn test_sql_query_prevents_injection() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let injection_attempt_step = MissionStep {
id: "test_sql_injection".to_string(),
name: Some("Test SQL Injection Prevention".to_string()),
step_type: StepType::SqlQuery,
parameters: json!({
"query": "SELECT * FROM users WHERE id = '1; DROP TABLE users; --'",
"database_url": "sqlite://memory:"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&injection_attempt_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("query").is_some());
}
#[tokio::test]
async fn test_redis_operations_with_auth() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let auth_redis_step = MissionStep {
id: "test_auth_redis".to_string(),
name: Some("Test Redis with Authentication".to_string()),
step_type: StepType::RedisSet,
parameters: json!({
"key": "secure_key",
"value": "secure_value",
"redis_url": "redis://:password@127.0.0.1:6379",
"auth": true
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&auth_redis_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("key").is_some());
}
#[tokio::test]
async fn test_database_connection_pooling() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let queries = vec![
"SELECT 1 as first_query",
"SELECT 2 as second_query",
"SELECT 3 as third_query"
];
for (i, query) in queries.iter().enumerate() {
let db_step = MissionStep {
id: format!("test_pool_query_{}", i),
name: Some(format!("Test Pool Query {}", i)),
step_type: StepType::SqlQuery,
parameters: json!({
"query": query,
"database_url": "sqlite://memory:",
"pool_size": 5
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&db_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
}
}
#[tokio::test]
async fn test_backup_with_encryption() {
let context = RuntimeContext::new();
let executor = DagExecutor::new(context);
let encrypted_backup_step = MissionStep {
id: "test_encrypted_backup".to_string(),
name: Some("Test Encrypted Database Backup".to_string()),
step_type: StepType::DbBackup,
parameters: json!({
"database_url": "sqlite://test.db",
"backup_path": "/tmp/encrypted_backup.db",
"encryption": true,
"encryption_key": "test_encryption_key_32_bytes_long"
}).as_object().unwrap().clone(),
depends_on: vec![],
timeout: None,
};
let result = executor.execute_step(&encrypted_backup_step).await;
assert!(result.is_ok());
let step_result = result.unwrap();
assert!(matches!(step_result.status, StepStatus::Success | StepStatus::Failed));
assert!(step_result.output.get("encryption").is_some());
}