use std::sync::{Arc, RwLock};
pub(super) type CancellationCallbacks = Arc<RwLock<Vec<Box<dyn Fn() + Send + Sync>>>>;
#[cfg(test)]
mod tests {
use crate::production::*;
use std::time::{Duration, SystemTime};
#[test]
fn test_cancellation_token() {
let token = QueryCancellationToken::new();
assert!(!token.is_cancelled());
assert!(token.check().is_ok());
token.cancel(Some("User requested".to_string()));
assert!(token.is_cancelled());
assert!(token.check().is_err());
assert_eq!(token.get_reason(), Some("User requested".to_string()));
assert!(token.cancel_time().is_some());
}
#[test]
fn test_timeout_manager() {
let manager =
QueryTimeoutManager::new(Duration::from_millis(100), Duration::from_millis(200));
let query_id = manager.start_query("SELECT * WHERE { ?s ?p ?o }");
match manager.check_timeout(query_id) {
TimeoutCheckResult::Ok { .. } => {}
_ => panic!("Expected Ok result"),
}
std::thread::sleep(Duration::from_millis(110));
match manager.check_timeout(query_id) {
TimeoutCheckResult::SoftTimeout { .. } => {}
_ => panic!("Expected SoftTimeout result"),
}
let elapsed = manager.end_query(query_id);
assert!(elapsed.is_some());
}
#[test]
fn test_memory_tracker() {
let tracker = QueryMemoryTracker::new(1024, 512);
assert!(tracker.allocate(1, 256).is_ok());
assert_eq!(tracker.query_usage(1), 256);
assert_eq!(tracker.current_usage(), 256);
assert!(tracker.allocate(1, 128).is_ok());
assert_eq!(tracker.query_usage(1), 384);
assert!(tracker.allocate(1, 256).is_err());
assert!(tracker.allocate(2, 400).is_ok());
assert_eq!(tracker.current_usage(), 784);
let freed = tracker.free_query(1);
assert_eq!(freed, 384);
assert_eq!(tracker.current_usage(), 400);
}
#[test]
fn test_memory_pressure() {
let tracker = QueryMemoryTracker::new(1000, 500);
tracker.set_pressure_threshold(0.8);
assert!(!tracker.is_under_pressure());
tracker.allocate(1, 400).unwrap();
assert!(!tracker.is_under_pressure());
tracker.allocate(2, 450).unwrap();
assert!(tracker.is_under_pressure());
assert!(tracker.pressure_percentage() > 80.0);
}
#[test]
fn test_circuit_breaker_closed_to_open() {
let config = CircuitBreakerConfig {
failure_threshold: 3,
success_threshold: 2,
timeout: Duration::from_secs(60),
half_open_max_requests: 2,
};
let breaker = QueryCircuitBreaker::new(config);
assert!(breaker.is_request_allowed());
breaker.record_failure();
assert!(breaker.is_request_allowed());
breaker.record_failure();
assert!(breaker.is_request_allowed());
breaker.record_failure();
assert!(!breaker.is_request_allowed());
}
#[test]
fn test_circuit_breaker_half_open_to_closed() {
let config = CircuitBreakerConfig {
failure_threshold: 2,
success_threshold: 2,
timeout: Duration::from_secs(60),
half_open_max_requests: 3,
};
let breaker = QueryCircuitBreaker::new(config);
breaker.record_failure();
breaker.record_failure();
assert!(!breaker.is_request_allowed());
breaker.try_half_open();
assert!(breaker.is_request_allowed());
breaker.record_success();
breaker.record_success();
assert!(breaker.is_request_allowed());
assert_eq!(breaker.state(), "Closed");
}
#[test]
fn test_performance_monitor() {
let monitor = SparqlPerformanceMonitor::new();
monitor.record_query("SELECT", Duration::from_millis(100), 5, 1000);
monitor.record_query("SELECT", Duration::from_millis(200), 10, 2000);
monitor.record_query("SELECT", Duration::from_millis(150), 7, 1500);
let stats = monitor.get_statistics("SELECT");
assert_eq!(stats.total_queries, 3);
assert_eq!(stats.average_pattern_complexity, 7);
assert_eq!(stats.average_result_size, 1500);
let global = monitor.get_global_statistics();
assert_eq!(global.total_queries, 3);
}
#[test]
fn test_resource_quota() {
let quota = QueryResourceQuota::new(1000, Duration::from_secs(10), 50);
assert!(quota.check_result_size(500).is_ok());
assert!(quota.check_query_time(Duration::from_secs(5)).is_ok());
assert!(quota.check_pattern_complexity(30).is_ok());
assert!(quota.check_result_size(2000).is_err());
assert!(quota.check_query_time(Duration::from_secs(20)).is_err());
assert!(quota.check_pattern_complexity(100).is_err());
quota.set_enforced(false);
assert!(quota.check_result_size(2000).is_ok());
}
#[test]
fn test_health_checks() {
let health = QueryEngineHealth::default();
health.perform_all_checks();
assert_eq!(health.get_overall_status(), HealthStatus::Healthy);
let checks = health.get_checks();
assert_eq!(checks.len(), 3);
assert!(checks.iter().all(|c| c.status == HealthStatus::Healthy));
}
#[test]
fn test_production_error() {
let error = SparqlProductionError::parse_error(
"SELECT ?s WHERE { ?s ?p ?o".to_string(),
"Missing closing brace".to_string(),
);
assert_eq!(error.severity, ErrorSeverity::Error);
assert!(!error.retryable);
assert_eq!(error.context.operation, "parse");
}
#[test]
fn test_session_manager() {
let manager = QuerySessionManager::new();
let session = manager
.start_session("SELECT * WHERE { ?s ?p ?o }", Some("user1"))
.expect("lock poisoned");
assert_eq!(manager.active_session_count(), 1);
assert!(!session.is_cancelled());
let duration = manager.complete_session(session.session_id, 100).unwrap();
assert!(duration.as_nanos() > 0);
assert_eq!(manager.active_session_count(), 0);
let events = manager.get_audit_events(10);
assert_eq!(events.len(), 2);
assert_eq!(events[0].event_type, AuditEventType::QueryStarted);
assert_eq!(events[1].event_type, AuditEventType::QueryCompleted);
}
#[test]
fn test_session_failure() {
let manager = QuerySessionManager::new();
let session = manager
.start_session("SELECT * WHERE { ?s ?p ?o }", None)
.expect("lock poisoned");
manager
.fail_session(session.session_id, "Test error")
.expect("lock poisoned");
let events = manager.get_audit_events(10);
assert_eq!(events.len(), 2);
assert_eq!(events[1].event_type, AuditEventType::QueryFailed);
assert_eq!(events[1].error, Some("Test error".to_string()));
}
#[test]
fn test_rate_limiter() {
let limiter = QueryRateLimiter::default();
limiter.configure(10, 5);
for _ in 0..5 {
assert!(limiter.check_rate_limit("user1"));
}
assert!(!limiter.check_rate_limit("user1"));
assert!(limiter.check_rate_limit("user2"));
limiter.set_enabled(false);
assert!(limiter.check_rate_limit("user1"));
}
#[test]
fn test_audit_trail() {
let trail = QueryAuditTrail::new(100);
trail.log(QueryAuditEvent {
timestamp: SystemTime::now(),
session_id: 1,
query_id: 1,
event_type: AuditEventType::QueryStarted,
user_id: Some("user1".to_string()),
query_snippet: "SELECT *".to_string(),
duration: None,
result_count: None,
error: None,
});
trail.log(QueryAuditEvent {
timestamp: SystemTime::now(),
session_id: 1,
query_id: 1,
event_type: AuditEventType::QueryCompleted,
user_id: Some("user1".to_string()),
query_snippet: "SELECT *".to_string(),
duration: Some(Duration::from_millis(100)),
result_count: Some(50),
error: None,
});
assert_eq!(trail.event_count(), 2);
let user_events = trail.get_by_user("user1", 10);
assert_eq!(user_events.len(), 2);
let completed = trail.get_by_type(AuditEventType::QueryCompleted, 10);
assert_eq!(completed.len(), 1);
trail.clear();
assert_eq!(trail.event_count(), 0);
trail.set_enabled(false);
trail.log(QueryAuditEvent {
timestamp: SystemTime::now(),
session_id: 2,
query_id: 2,
event_type: AuditEventType::QueryStarted,
user_id: None,
query_snippet: "ASK".to_string(),
duration: None,
result_count: None,
error: None,
});
assert_eq!(trail.event_count(), 0);
}
#[test]
fn test_audit_trail_circular_buffer() {
let trail = QueryAuditTrail::new(3);
for i in 0..5 {
trail.log(QueryAuditEvent {
timestamp: SystemTime::now(),
session_id: i,
query_id: i,
event_type: AuditEventType::QueryStarted,
user_id: None,
query_snippet: format!("Query {}", i),
duration: None,
result_count: None,
error: None,
});
}
assert_eq!(trail.event_count(), 3);
let events = trail.get_recent(10);
assert_eq!(events.len(), 3);
assert_eq!(events[0].session_id, 2);
assert_eq!(events[1].session_id, 3);
assert_eq!(events[2].session_id, 4);
}
#[test]
fn test_session_memory_allocation() {
let manager = QuerySessionManager::new();
manager.configure_memory(1000, 500);
let session = manager
.start_session("SELECT * WHERE { ?s ?p ?o }", None)
.expect("lock poisoned");
assert!(manager.allocate_memory(session.session_id, 200).is_ok());
assert!(manager.allocate_memory(session.session_id, 200).is_ok());
assert!(manager.allocate_memory(session.session_id, 200).is_err());
manager.complete_session(session.session_id, 0).unwrap();
}
#[test]
fn test_query_priority_scheduler_basic() {
let scheduler = QueryPriorityScheduler::new(PrioritySchedulerConfig::default());
let critical_id = scheduler
.submit_query(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
QueryPriority::Critical,
Some("user1".to_string()),
Some(100.0),
)
.expect("lock poisoned");
let normal_id = scheduler
.submit_query(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
QueryPriority::Normal,
Some("user2".to_string()),
Some(50.0),
)
.expect("lock poisoned");
let batch_id = scheduler
.submit_query(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
QueryPriority::Batch,
None,
None,
)
.expect("lock poisoned");
let stats = scheduler.get_stats();
assert_eq!(stats.total_queued, 3);
let next = scheduler.next_query().unwrap();
assert_eq!(next.query_id, critical_id);
assert_eq!(next.priority, QueryPriority::Critical);
let next = scheduler.next_query().unwrap();
assert_eq!(next.query_id, normal_id);
let next = scheduler.next_query().unwrap();
assert_eq!(next.query_id, batch_id);
scheduler.complete_query(critical_id);
scheduler.complete_query(normal_id);
scheduler.complete_query(batch_id);
let stats = scheduler.get_stats();
assert_eq!(stats.total_active, 0);
assert_eq!(stats.total_queued, 0);
}
#[test]
fn test_query_priority_aging() {
let config = PrioritySchedulerConfig {
enable_aging: true,
aging_threshold: Duration::from_millis(10),
..Default::default()
};
let scheduler = QueryPriorityScheduler::new(config);
let _low_id = scheduler
.submit_query(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
QueryPriority::Low,
None,
None,
)
.expect("lock poisoned");
std::thread::sleep(Duration::from_millis(20));
scheduler
.submit_query(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
QueryPriority::Normal,
None,
None,
)
.expect("lock poisoned");
let next = scheduler.next_query();
assert!(next.is_some());
scheduler.complete_query(next.unwrap().query_id);
}
#[test]
fn test_query_priority_cancel() {
let scheduler = QueryPriorityScheduler::new(PrioritySchedulerConfig::default());
let query_id = scheduler
.submit_query(
"SELECT * WHERE { ?s ?p ?o }".to_string(),
QueryPriority::Normal,
None,
None,
)
.expect("lock poisoned");
assert_eq!(scheduler.get_stats().total_queued, 1);
assert!(scheduler.cancel_query(query_id));
assert_eq!(scheduler.get_stats().total_queued, 0);
assert!(!scheduler.cancel_query(query_id));
}
#[test]
fn test_query_cost_estimator_lightweight() {
let estimator = QueryCostEstimator::new(CostEstimatorConfig::default());
let features = QueryFeatures {
pattern_count: 2,
join_count: 1,
filter_count: 1,
limit: Some(10),
..Default::default()
};
let estimate = estimator.estimate_cost(&features);
assert_eq!(estimate.recommendation, CostRecommendation::Lightweight);
assert!(estimate.estimated_cost < 100.0);
assert!(estimate.complexity_score < 1.0);
}
#[test]
fn test_query_cost_estimator_expensive() {
let estimator = QueryCostEstimator::new(CostEstimatorConfig::default());
let features = QueryFeatures {
pattern_count: 10,
join_count: 5,
filter_count: 3,
aggregate_count: 2,
path_count: 1,
optional_count: 2,
distinct: true,
order_by: true,
group_by: true,
..Default::default()
};
let estimate = estimator.estimate_cost(&features);
assert!(
estimate.recommendation == CostRecommendation::Expensive
|| estimate.recommendation == CostRecommendation::VeryExpensive
);
assert!(estimate.estimated_cost > 500.0);
assert!(estimate.complexity_score > 5.0);
}
#[test]
fn test_query_cost_estimator_learning() {
let estimator = QueryCostEstimator::new(CostEstimatorConfig::default());
let features = QueryFeatures {
pattern_count: 3,
join_count: 2,
..Default::default()
};
for i in 1..=10 {
estimator.record_actual_cost(features.clone(), i as f64 * 10.0);
}
let stats = estimator.get_statistics();
assert_eq!(stats.sample_count, 10);
assert!(stats.avg_cost > 0.0);
assert_eq!(stats.min_cost, 10.0);
assert_eq!(stats.max_cost, 100.0);
}
#[test]
fn test_performance_baseline_tracker_basic() {
let tracker = PerformanceBaselineTracker::new(BaselineTrackerConfig::default());
tracker.record_execution("SELECT ?s ?p ?o".to_string(), 100.0, 10.0, 100);
tracker.record_execution("SELECT ?s ?p ?o".to_string(), 110.0, 11.0, 105);
tracker.record_execution("SELECT ?s ?p ?o".to_string(), 105.0, 10.5, 102);
let patterns = tracker.get_tracked_patterns();
assert_eq!(patterns.len(), 1);
assert_eq!(patterns[0], "SELECT ?s ?p ?o");
}
#[test]
fn test_performance_baseline_regression_detection() {
let config = BaselineTrackerConfig {
regression_threshold: 0.2,
min_samples: 3,
auto_update_baseline: false,
..Default::default()
};
let tracker = PerformanceBaselineTracker::new(config);
let pattern = "SELECT ?s ?p ?o";
for _ in 0..5 {
tracker.record_execution(pattern.to_string(), 100.0, 10.0, 100);
}
let regression = tracker.check_regression(pattern, 110.0);
assert!(regression.is_none());
let regression = tracker.check_regression(pattern, 130.0);
assert!(regression.is_some());
let report = regression.unwrap();
assert!(report.degradation_percentage > 20.0);
assert_eq!(report.severity, RegressionSeverity::Moderate);
}
#[test]
fn test_performance_baseline_trend() {
let tracker = PerformanceBaselineTracker::new(BaselineTrackerConfig::default());
let pattern = "SELECT ?s ?p ?o";
let durations = vec![100.0, 110.0, 105.0, 95.0, 120.0, 100.0, 105.0, 110.0];
for duration in durations {
tracker.record_execution(pattern.to_string(), duration, 10.0, 100);
}
let trend = tracker.get_trend(pattern);
assert!(trend.is_some());
let trend = trend.unwrap();
assert_eq!(trend.sample_count, 8);
assert!(trend.avg_duration_ms > 0.0);
assert_eq!(trend.min_duration_ms, 95.0);
assert_eq!(trend.max_duration_ms, 120.0);
assert!(trend.std_dev_ms > 0.0);
}
#[test]
fn test_performance_baseline_auto_update() {
let config = BaselineTrackerConfig {
auto_update_baseline: true,
min_samples: 3,
..Default::default()
};
let tracker = PerformanceBaselineTracker::new(config);
let pattern = "SELECT ?s ?p ?o";
for _ in 0..5 {
tracker.record_execution(pattern.to_string(), 100.0, 10.0, 100);
}
let trend1 = tracker.get_trend(pattern).unwrap();
let baseline1 = trend1.baseline_duration_ms;
for _ in 0..5 {
tracker.record_execution(pattern.to_string(), 150.0, 15.0, 100);
}
let trend2 = tracker.get_trend(pattern).unwrap();
let baseline2 = trend2.baseline_duration_ms;
assert!(baseline2 > baseline1);
}
#[test]
fn test_cost_estimator_limit_optimization() {
let estimator = QueryCostEstimator::new(CostEstimatorConfig::default());
let features_no_limit = QueryFeatures {
pattern_count: 10,
join_count: 3,
filter_count: 2,
..Default::default()
};
let mut features_small_limit = features_no_limit.clone();
features_small_limit.limit = Some(10);
let mut features_large_limit = features_no_limit.clone();
features_large_limit.limit = Some(500);
let estimate_no_limit = estimator.estimate_cost(&features_no_limit);
let estimate_small = estimator.estimate_cost(&features_small_limit);
let estimate_large = estimator.estimate_cost(&features_large_limit);
assert!(estimate_small.estimated_cost < estimate_large.estimated_cost);
assert!(estimate_small.estimated_cost < estimate_no_limit.estimated_cost);
}
#[test]
fn test_priority_scheduler_queue_limits() {
let config = PrioritySchedulerConfig {
max_per_priority: 2,
max_total_queued: 5,
..Default::default()
};
let scheduler = QueryPriorityScheduler::new(config);
assert!(scheduler
.submit_query("Query 1".to_string(), QueryPriority::Normal, None, None)
.is_ok());
assert!(scheduler
.submit_query("Query 2".to_string(), QueryPriority::Normal, None, None)
.is_ok());
assert!(scheduler
.submit_query("Query 3".to_string(), QueryPriority::Normal, None, None)
.is_err());
assert!(scheduler
.submit_query("Query 4".to_string(), QueryPriority::High, None, None)
.is_ok());
}
}