pub mod checkpointing;
pub mod communication;
pub mod load_balancing;
pub mod node;
pub mod solver;
pub mod types;
pub use checkpointing::{
Checkpoint, CheckpointConfig, CheckpointGlobalState, CheckpointManager, CheckpointStatistics,
ChunkCheckpoint, FaultToleranceCoordinator, RecoveryAction,
};
pub use communication::{
deserialize_boundary_data, serialize_boundary_data, BoundaryExchanger, Communicator,
MessageChannel, SyncBarrier,
};
pub use load_balancing::{
ChunkDistributor, LoadBalancer, LoadBalancerConfig, LoadBalancerStatistics, NodePerformance,
};
pub use node::{ComputeNode, NodeBuilder, NodeManager, ResourceMonitor};
pub use solver::{DistributedODEResult, DistributedODESolver, DistributedODESolverBuilder};
pub use types::{
AckStatus, BoundaryConditions, BoundaryData, ChunkId, ChunkResult, ChunkResultStatus,
DistributedConfig, DistributedError, DistributedMessage, DistributedMetrics, DistributedResult,
FaultToleranceMode, FloatPrecision, JobId, LoadBalancingStrategy, NodeCapabilities, NodeId,
NodeInfo, NodeStatus, SimdCapability, WorkChunk,
};
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::{array, ArrayView1};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn test_address(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
}
#[test]
fn test_integration_simple_ode() {
let config = DistributedConfig::<f64>::default();
let solver = DistributedODESolver::new(config).expect("Failed to create solver");
for i in 0..2 {
let mut node = NodeInfo::new(NodeId::new(i), test_address(8080 + i as u16));
node.status = NodeStatus::Available;
solver.register_node(node).expect("Failed to register");
}
let f = |_t: f64, y: ArrayView1<f64>| array![-y[0]];
let result = solver.solve(f, (0.0, 1.0), array![1.0], None);
assert!(result.is_ok());
let result = result.expect("Solve failed");
assert!(!result.is_empty());
let expected = (-1.0_f64).exp();
let actual = result.final_state().expect("No final state")[0];
assert!((actual - expected).abs() < 0.05);
}
#[test]
fn test_integration_with_checkpointing() {
let mut config = DistributedConfig::<f64>::default();
config.checkpointing_enabled = true;
config.checkpoint_interval = 2;
let solver = DistributedODESolver::new(config).expect("Failed to create solver");
let mut node = NodeInfo::new(NodeId::new(0), test_address(9000));
node.status = NodeStatus::Available;
solver.register_node(node).expect("Failed to register");
let f = |_t: f64, y: ArrayView1<f64>| array![-y[0]];
let result = solver.solve(f, (0.0, 2.0), array![1.0], None);
assert!(result.is_ok());
}
#[test]
fn test_node_manager_integration() {
use std::time::Duration;
let manager = NodeManager::new(Duration::from_secs(30));
for i in 0..5 {
let addr = test_address(7000 + i);
let caps = NodeCapabilities::default();
let result = manager.register_node(addr, caps);
assert!(result.is_ok());
}
assert_eq!(manager.available_node_count(), 5);
let best = manager.select_best_node(1.0);
assert!(best.is_some());
}
#[test]
fn test_load_balancer_integration() {
use std::time::Duration;
let balancer: LoadBalancer<f64> = LoadBalancer::new(
LoadBalancingStrategy::Adaptive,
LoadBalancerConfig::default(),
);
for i in 0..3 {
balancer
.register_node(NodeId::new(i))
.expect("Failed to register");
}
let nodes: Vec<NodeInfo> = (0..3)
.map(|i| {
let mut node = NodeInfo::new(NodeId::new(i), test_address(6000 + i as u16));
node.status = NodeStatus::Available;
node
})
.collect();
for i in 0..10 {
let chunk = WorkChunk::new(ChunkId::new(i), JobId::new(1), (0.0, 1.0), array![1.0]);
let result = balancer.assign_chunk(&chunk, &nodes);
assert!(result.is_ok());
}
let stats = balancer.get_statistics();
assert_eq!(stats.node_count, 3);
}
#[test]
fn test_boundary_exchanger_integration() {
use std::time::Duration;
let exchanger: BoundaryExchanger<f64> = BoundaryExchanger::new(Duration::from_secs(5));
let target = ChunkId::new(2);
let source = ChunkId::new(1);
exchanger
.request_boundary(target, source)
.expect("Request failed");
let data = BoundaryData {
time: 1.0,
state: array![1.0, 2.0, 3.0],
derivative: Some(array![-1.0, -2.0, -3.0]),
source_chunk: source,
};
exchanger
.receive_boundary(target, source, data.clone())
.expect("Receive failed");
let bc = exchanger.build_boundary_conditions(target, Some(source), None);
assert!(bc.left_boundary.is_some());
}
#[test]
fn test_checkpoint_manager_integration() {
use std::path::PathBuf;
let path = std::env::temp_dir().join(format!("scirs_dist_test_{}", std::process::id()));
let mut config = CheckpointConfig::default();
config.persist_to_disk = false;
let manager: CheckpointManager<f64> =
CheckpointManager::new(path.clone(), config).expect("Failed to create manager");
let job_id = JobId::new(1);
let chunk_result = ChunkResult {
chunk_id: ChunkId::new(1),
node_id: NodeId::new(1),
time_points: vec![0.0, 0.5, 1.0],
states: vec![array![1.0], array![0.5], array![0.25]],
final_state: array![0.25],
final_derivative: Some(array![-0.25]),
error_estimate: 1e-6,
processing_time: std::time::Duration::from_millis(100),
memory_used: 1024,
status: ChunkResultStatus::Success,
};
let global_state = CheckpointGlobalState {
iteration: 1,
chunks_completed: 1,
chunks_remaining: 5,
current_time: 1.0,
error_estimate: 1e-6,
};
let cp_id = manager
.create_checkpoint(
job_id,
vec![chunk_result],
vec![ChunkId::new(2)],
global_state,
)
.expect("Failed to create checkpoint");
assert!(cp_id > 0);
let restored = manager.restore(job_id, None).expect("Failed to restore");
assert_eq!(restored.global_state.chunks_completed, 1);
let _ = std::fs::remove_dir_all(&path);
}
#[test]
fn test_communicator_integration() {
use std::sync::Arc;
use std::time::Duration;
let channel: Arc<MessageChannel<f64>> =
Arc::new(MessageChannel::new(Duration::from_secs(5)));
let exchanger: Arc<BoundaryExchanger<f64>> =
Arc::new(BoundaryExchanger::new(Duration::from_secs(5)));
let comm = Communicator::new(NodeId::new(1), channel, exchanger);
comm.add_peer(NodeId::new(2)).expect("Failed to add peer");
comm.add_peer(NodeId::new(3)).expect("Failed to add peer");
let peers = comm.get_peers();
assert_eq!(peers.len(), 2);
let barrier_id = comm.create_barrier(3).expect("Failed to create barrier");
assert!(barrier_id > 0);
}
}