use crate::parallel::partitioner::Partition;
use crate::plugins::algorithm::{AlgorithmResult, PluginContext};
use crate::vgi::VirtualGraph;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct ExecutorConfig {
pub num_workers: usize,
pub timeout: Option<Duration>,
pub retry_count: usize,
pub properties: HashMap<String, String>,
}
impl Default for ExecutorConfig {
fn default() -> Self {
Self {
num_workers: 4,
timeout: Some(Duration::from_secs(300)),
retry_count: 3,
properties: HashMap::new(),
}
}
}
impl ExecutorConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_workers(mut self, num_workers: usize) -> Self {
self.num_workers = num_workers;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_retry_count(mut self, retry_count: usize) -> Self {
self.retry_count = retry_count;
self
}
}
#[derive(Debug, Clone)]
pub struct WorkerInfo {
pub id: usize,
pub address: String,
pub partition_ids: Vec<usize>,
pub status: WorkerStatus,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkerStatus {
Idle,
Running,
Completed,
Failed(String),
}
#[derive(Debug, Clone)]
pub struct ExecutionResult {
pub success: bool,
pub execution_time_ms: u64,
pub partition_results: HashMap<usize, AlgorithmResult>,
pub aggregated_result: Option<AlgorithmResult>,
pub error_message: Option<String>,
}
impl ExecutionResult {
pub fn success(execution_time_ms: u64) -> Self {
Self {
success: true,
execution_time_ms,
partition_results: HashMap::new(),
aggregated_result: None,
error_message: None,
}
}
pub fn failure(error_message: String) -> Self {
Self {
success: false,
execution_time_ms: 0,
partition_results: HashMap::new(),
aggregated_result: None,
error_message: Some(error_message),
}
}
}
pub trait DistributedExecutor: Send + Sync {
fn name(&self) -> &'static str;
fn config(&self) -> &ExecutorConfig;
fn workers(&self) -> &[WorkerInfo];
fn initialize(&mut self) -> Result<(), String>;
fn shutdown(&mut self) -> Result<(), String>;
fn execute<G>(
&self,
graph: &G,
partitions: Vec<Partition>,
algorithm_name: &str,
ctx: &mut PluginContext<G>,
) -> Result<ExecutionResult, String>
where
G: VirtualGraph + ?Sized;
fn aggregate_results(
&self,
partition_results: HashMap<usize, AlgorithmResult>,
) -> Result<AlgorithmResult, String>;
fn get_stats(&self) -> ExecutorStats;
}
#[derive(Debug, Clone, Default)]
pub struct ExecutorStats {
pub total_executions: usize,
pub successful_executions: usize,
pub failed_executions: usize,
pub avg_execution_time_ms: f64,
pub active_workers: usize,
}
impl ExecutorStats {
pub fn new() -> Self {
Self::default()
}
pub fn success_rate(&self) -> f64 {
if self.total_executions == 0 {
0.0
} else {
self.successful_executions as f64 / self.total_executions as f64
}
}
}
pub struct SingleMachineExecutor {
config: ExecutorConfig,
workers: Vec<WorkerInfo>,
stats: ExecutorStats,
initialized: bool,
}
impl SingleMachineExecutor {
pub fn new(config: ExecutorConfig) -> Self {
let workers: Vec<WorkerInfo> = (0..config.num_workers)
.map(|id| WorkerInfo {
id,
address: format!("localhost:{}", 8000 + id),
partition_ids: Vec::new(),
status: WorkerStatus::Idle,
})
.collect();
Self {
config,
workers,
stats: ExecutorStats::new(),
initialized: false,
}
}
}
impl DistributedExecutor for SingleMachineExecutor {
fn name(&self) -> &'static str {
"single_machine"
}
fn config(&self) -> &ExecutorConfig {
&self.config
}
fn workers(&self) -> &[WorkerInfo] {
&self.workers
}
fn initialize(&mut self) -> Result<(), String> {
self.initialized = true;
Ok(())
}
fn shutdown(&mut self) -> Result<(), String> {
self.initialized = false;
for worker in &mut self.workers {
worker.status = WorkerStatus::Idle;
}
Ok(())
}
fn execute<G>(
&self,
_graph: &G,
_partitions: Vec<Partition>,
_algorithm_name: &str,
_ctx: &mut PluginContext<G>,
) -> Result<ExecutionResult, String>
where
G: VirtualGraph + ?Sized,
{
if !self.initialized {
return Err("Executor not initialized".to_string());
}
Ok(ExecutionResult::success(0))
}
fn aggregate_results(
&self,
_partition_results: HashMap<usize, AlgorithmResult>,
) -> Result<AlgorithmResult, String> {
Ok(AlgorithmResult::scalar(0.0))
}
fn get_stats(&self) -> ExecutorStats {
self.stats.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_executor_config() {
use std::time::Duration;
let config = ExecutorConfig::new()
.with_workers(8)
.with_timeout(Duration::from_secs(600))
.with_retry_count(5);
assert_eq!(config.num_workers, 8);
assert_eq!(config.timeout, Some(Duration::from_secs(600)));
assert_eq!(config.retry_count, 5);
}
#[test]
fn test_worker_status() {
let status = WorkerStatus::Running;
assert_ne!(status, WorkerStatus::Idle);
let failed = WorkerStatus::Failed("error".to_string());
if let WorkerStatus::Failed(msg) = failed {
assert_eq!(msg, "error");
}
}
#[test]
fn test_execution_result() {
let success = ExecutionResult::success(100);
assert!(success.success);
assert_eq!(success.execution_time_ms, 100);
assert!(success.error_message.is_none());
let failure = ExecutionResult::failure("test error".to_string());
assert!(!failure.success);
assert_eq!(failure.error_message, Some("test error".to_string()));
}
#[test]
fn test_executor_stats() {
let stats = ExecutorStats {
total_executions: 10,
successful_executions: 8,
failed_executions: 2,
avg_execution_time_ms: 50.0,
active_workers: 4,
};
assert_eq!(stats.success_rate(), 0.8);
}
#[test]
fn test_single_machine_executor() {
use crate::graph::Graph;
use crate::graph::traits::GraphOps;
let config = ExecutorConfig::new().with_workers(2);
let mut executor = SingleMachineExecutor::new(config);
assert!(executor.initialize().is_ok());
let graph = Graph::<(), ()>::undirected();
let partitions = vec![];
let mut ctx = PluginContext::new(&graph);
let result = executor.execute(&graph, partitions, "test", &mut ctx);
assert!(result.is_ok());
assert!(executor.shutdown().is_ok());
}
}