// Integration test disabled - APIs have been refactored
#![cfg(test)]
#![allow(dead_code, unused_imports)]
use pmat::agents::registry::AgentRegistry;
use pmat::agents::*;
use pmat::mcp_integration::*;
//use pmat::modules::*;
use pmat::quality::gate::*;
use pmat::resources::*;
use pmat::state::*;
use pmat::workflow::*;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
#[actix_rt::test]
async fn test_full_agent_workflow() {
// Initialize agent registry
let registry = Arc::new(AgentRegistry::new());
// Register agents
registry
.register("analyzer", Arc::new(AnalyzerActor::new()))
.await;
registry
.register("transformer", Arc::new(TransformerActor::new()))
.await;
registry
.register("validator", Arc::new(ValidatorActor::new()))
.await;
// Create workflow
let workflow = WorkflowBuilder::new("quality_check")
.description("Full quality check workflow")
.add_step(
StepBuilder::action("analyze", "Code Analysis", "analyzer", "analyze")
.params(json!({
"code": "fn main() { println!(\"Hello\"); }",
"language": "rust"
}))
.timeout(Duration::from_secs(10))
.build(),
)
.add_step(
StepBuilder::action("validate", "Validation", "validator", "validate")
.params(json!({}))
.condition("steps.analyze.status == 'completed'", true)
.build(),
)
.error_strategy(ErrorStrategy::FailFast)
.timeout(Duration::from_secs(30))
.build();
// Execute workflow
let context = WorkflowContext::new(workflow.id, registry.clone());
let executor = DefaultWorkflowExecutor::new(registry);
let result = executor.execute(&workflow, &context).await;
assert!(result.is_ok());
// Verify context state
assert_eq!(context.get_state(), WorkflowState::Completed);
}
#[actix_rt::test]
async fn test_mcp_server_integration() {
use pmat::mcp_integration::server::*;
// Create agent registry
let registry = Arc::new(AgentRegistry::new());
// Create MCP server
let config = ServerConfig {
name: "Test MCP Server".to_string(),
version: "1.0.0".to_string(),
bind_address: "127.0.0.1:0".to_string(), // Random port
unix_socket: None,
max_connections: 10,
request_timeout: Duration::from_secs(5),
enable_logging: true,
};
let server = McpServer::new(registry, config).unwrap();
server.register_defaults().await.unwrap();
// Create a mock session
let context = server.context.clone();
// Test tool listing
let tools = context.tools.read().list();
assert!(!tools.is_empty());
assert!(tools.iter().any(|t| t.name == "analyze"));
// Test resource listing
let resources = context.resources.read().list();
assert!(!resources.is_empty());
// Test prompt listing
let prompts = context.prompts.read().list();
assert!(!prompts.is_empty());
}
#[actix_rt::test]
async fn test_quality_gates_integration() {
use pmat::quality::complexity::ComplexityAnalyzer;
use pmat::quality::entropy::EntropyCalculator;
use pmat::quality::satd::SatdDetector;
let code = r#"
fn complex_function(x: i32) -> i32 {
// TODO: This needs refactoring
if x > 0 {
if x > 10 {
if x > 20 {
return x * 2;
}
return x + 10;
}
return x + 5;
}
return 0;
}
"#;
// Run complexity analysis
let analyzer = ComplexityAnalyzer::default();
let complexity = analyzer.analyze_code(code, "rust");
assert!(complexity.cyclomatic > 1);
assert!(complexity.cognitive > 1);
// Run SATD detection
let detector = SatdDetector::new();
let satd_items = detector.detect(code);
assert!(!satd_items.is_empty());
assert_eq!(satd_items[0].satd_type, "TODO");
// Run entropy calculation
let calculator = EntropyCalculator::new();
let entropy = calculator.calculate(code.as_bytes());
assert!(entropy > 0.0);
// Run quality gate
let gate = QualityGate::new(
vec![Box::new(analyzer), Box::new(detector), Box::new(calculator)],
QualityThresholds {
max_complexity: 10,
max_satd_items: 5,
min_test_coverage: 0.8,
max_duplication: 0.1,
},
);
let result = gate.check(code, "rust").await;
assert!(!result.passed); // Should fail due to SATD
}
#[actix_rt::test]
async fn test_state_management_integration() {
use pmat::state::event_store::*;
use pmat::state::recovery::*;
use pmat::state::snapshot_store::*;
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_str().unwrap();
// Create event store
let event_config = EventStoreConfig {
persistence_enabled: true,
..Default::default()
};
let event_store = EventStore::new(event_config).await.unwrap();
// Create snapshot store
let snapshot_store = SnapshotStore::new(path, SnapshotConfig::default())
.await
.unwrap();
// Create recovery manager
let recovery_manager = RecoveryManager::<ExampleState>::new(
EventStoreConfig::default(),
SnapshotConfig::default(),
path,
)
.await
.unwrap();
// Test event append
let event = StateEvent::new(
"test_partition".to_string(),
"test_event".to_string(),
json!({"data": "test"}),
);
let event_id = event_store.append(event).await.unwrap();
assert_eq!(event_id, 1);
// Test state recovery
let initial_state = ExampleState::default();
let restored = recovery_manager
.recover_state(initial_state, Some("test_partition".to_string()))
.await
.unwrap();
assert_eq!(restored.events_to_replay, 1);
}
#[actix_rt::test]
async fn test_resource_management() {
let limits = ResourceLimits {
cpu: CpuLimits {
cores: 2.0,
max_percent: 80.0,
scheduling_priority: 0,
},
memory: MemoryLimits {
max_bytes: 2 * 1024 * 1024 * 1024, // 2GB
max_heap_bytes: None,
max_stack_bytes: None,
swap_limit_bytes: None,
},
gpu: None,
network: NetworkLimits {
ingress_bytes_per_sec: 100 * 1024 * 1024,
egress_bytes_per_sec: 100 * 1024 * 1024,
max_connections: 1000,
burst_size: None,
},
disk_io: DiskIoLimits {
read_bytes_per_sec: 500 * 1024 * 1024,
write_bytes_per_sec: 500 * 1024 * 1024,
read_iops: 50000,
write_iops: 50000,
},
};
// Create resource manager
let manager = ResourceManager::new(limits.clone()).unwrap();
// Test resource usage monitoring
let usage = manager.get_current_usage().unwrap();
assert!(usage.cpu_percent >= 0.0);
assert!(usage.memory_bytes > 0);
// Test resource pool
let pool = ResourcePool::new(limits);
let agent_id = Uuid::new_v4();
let requested = ResourceLimits {
cpu: CpuLimits {
cores: 1.0,
max_percent: 50.0,
scheduling_priority: 0,
},
memory: MemoryLimits {
max_bytes: 1024 * 1024 * 1024, // 1GB
max_heap_bytes: None,
max_stack_bytes: None,
swap_limit_bytes: None,
},
..Default::default()
};
let allocated = pool.request(agent_id, requested).unwrap();
assert_eq!(allocated.cpu.cores, 1.0);
let available = pool.get_available();
assert_eq!(available.cpu.cores, 1.0); // 2.0 - 1.0 allocated
pool.release(agent_id).unwrap();
let available = pool.get_available();
assert_eq!(available.cpu.cores, 2.0); // Back to full
}
#[actix_rt::test]
async fn test_workflow_dsl() {
use pmat::workflow::dsl::*;
// Test YAML DSL compilation
let yaml = r#"
name: test_workflow
version: 1.0.0
steps:
- id: step1
name: First Step
type: action
agent: analyzer
operation: analyze
params:
language: rust
"#;
let workflow = DslCompiler::compile(yaml).unwrap();
assert_eq!(workflow.name, "test_workflow");
assert_eq!(workflow.steps.len(), 1);
// Test fluent DSL
let workflow = FluentWorkflow::define("fluent_test")
.then(step!(action: "analyzer", "analyze", {
language: "rust"
}))
.then(step!(wait: Duration::from_secs(1)))
.on_error(ErrorStrategy::Continue)
.build();
assert_eq!(workflow.name, "fluent_test");
assert_eq!(workflow.steps.len(), 2);
// Test workflow macro
let workflow = workflow!("macro_test" => {
step!(action: "analyzer", "analyze", {}),
step!(action: "validator", "validate", {}),
});
assert_eq!(workflow.name, "macro_test");
assert_eq!(workflow.steps.len(), 2);
}
#[actix_rt::test]
async fn test_circuit_breaker() {
use pmat::agents::messaging::circuit_breaker::*;
let breaker = CircuitBreaker::new(CircuitBreakerConfig {
failure_threshold: 3,
recovery_timeout: Duration::from_millis(100),
half_open_requests: 1,
});
// Test normal operation
assert!(matches!(breaker.state(), CircuitState::Closed));
assert!(breaker.can_proceed());
// Simulate failures
for _ in 0..3 {
breaker.record_failure();
}
// Should be open after threshold
assert!(matches!(breaker.state(), CircuitState::Open));
assert!(!breaker.can_proceed());
// Wait for recovery timeout
tokio::time::sleep(Duration::from_millis(150)).await;
// Should be half-open
assert!(matches!(breaker.state(), CircuitState::HalfOpen));
assert!(breaker.can_proceed());
// Record success to close
breaker.record_success();
assert!(matches!(breaker.state(), CircuitState::Closed));
}
#[actix_rt::test]
async fn test_parallel_workflow_execution() {
let registry = Arc::new(AgentRegistry::new());
// Register test agents
registry
.register("analyzer", Arc::new(AnalyzerActor::new()))
.await;
registry
.register("validator", Arc::new(ValidatorActor::new()))
.await;
// Create workflow with parallel steps
let parallel_steps = vec![
StepBuilder::action("p1", "Parallel 1", "analyzer", "analyze")
.params(json!({"code": "test1", "language": "rust"}))
.build(),
StepBuilder::action("p2", "Parallel 2", "analyzer", "analyze")
.params(json!({"code": "test2", "language": "rust"}))
.build(),
];
let workflow = WorkflowBuilder::new("parallel_test")
.add_step(WorkflowStep {
id: "parallel".to_string(),
name: "Parallel Steps".to_string(),
step_type: StepType::Parallel {
steps: parallel_steps,
},
condition: None,
retry: None,
timeout: Some(Duration::from_secs(5)),
on_error: None,
metadata: Default::default(),
})
.build();
// Execute workflow
let context = WorkflowContext::new(workflow.id, registry.clone());
let executor = DefaultWorkflowExecutor::new(registry);
let start = std::time::Instant::now();
let result = executor.execute(&workflow, &context).await;
let elapsed = start.elapsed();
assert!(result.is_ok());
assert!(elapsed < Duration::from_secs(10)); // Should run in parallel
}
// Helper struct for state management tests
#[derive(Clone, Default, serde::Serialize, serde::Deserialize)]
struct ExampleState {
last_event_id: u64,
events_since_snapshot: usize,
data: std::collections::HashMap<String, serde_json::Value>,
}
impl AgentState for ExampleState {
fn apply_event(&mut self, event: &StateEvent) {
self.last_event_id = event.id;
self.events_since_snapshot += 1;
}
fn last_event_id(&self) -> EventId {
self.last_event_id
}
fn events_since_snapshot(&self) -> usize {
self.events_since_snapshot
}
fn time_since_snapshot(&self) -> Duration {
Duration::from_secs(60) // Mock value
}
fn merge_partition(&mut self, partition: Self) {
self.data.extend(partition.data);
}
}