use crate::parallel::partitioner::Partition;
use crate::node::NodeIndex;
use crate::vgi::VirtualGraph;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::time::Instant;
#[derive(Debug, Clone, Default)]
pub struct DijkstraConfig {
pub compute_predecessors: bool,
pub target: Option<NodeIndex>,
pub max_distance: Option<f64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DijkstraConfigError {
NegativeMaxDistance,
SourceEqualsTarget,
}
impl std::fmt::Display for DijkstraConfigError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DijkstraConfigError::NegativeMaxDistance => {
write!(f, "max_distance must be non-negative")
}
DijkstraConfigError::SourceEqualsTarget => {
write!(f, "source and target cannot be the same node")
}
}
}
}
impl std::error::Error for DijkstraConfigError {}
impl DijkstraConfig {
pub fn new() -> Self {
Self::default()
}
pub fn validate(&self, source: NodeIndex) -> Result<(), DijkstraConfigError> {
if let Some(dist) = self.max_distance {
if dist < 0.0 {
return Err(DijkstraConfigError::NegativeMaxDistance);
}
}
if self.target == Some(source) {
return Err(DijkstraConfigError::SourceEqualsTarget);
}
Ok(())
}
pub fn try_new(source: NodeIndex) -> Result<Self, DijkstraConfigError> {
let config = Self::default();
config.validate(source)?;
Ok(config)
}
pub fn with_predecessors(mut self, compute: bool) -> Self {
self.compute_predecessors = compute;
self
}
pub fn with_target(mut self, target: NodeIndex) -> Self {
self.target = Some(target);
self
}
pub fn with_max_distance(mut self, max_distance: f64) -> Self {
self.max_distance = Some(max_distance);
self
}
}
#[derive(Debug, Clone)]
pub struct DijkstraResult {
pub source: NodeIndex,
pub distances: HashMap<NodeIndex, f64>,
pub predecessors: HashMap<NodeIndex, Option<NodeIndex>>,
pub computation_time_ms: u64,
pub partition_stats: Vec<PartitionDijkstraStats>,
distances_vec: Option<Vec<f64>>,
predecessors_vec: Option<Vec<Option<NodeIndex>>>,
all_nodes: Vec<NodeIndex>,
}
impl DijkstraResult {
pub fn from_vec(
source: NodeIndex,
distances: Vec<f64>,
predecessors: Vec<Option<NodeIndex>>,
all_nodes: Vec<NodeIndex>,
computation_time_ms: u64,
partition_stats: Vec<PartitionDijkstraStats>,
) -> Self {
Self {
source,
distances: HashMap::new(), predecessors: HashMap::new(), computation_time_ms,
partition_stats,
distances_vec: Some(distances),
predecessors_vec: Some(predecessors),
all_nodes,
}
}
pub fn distance(&self, node: NodeIndex) -> Result<f64, GraphError> {
if let Some(ref dist_vec) = self.distances_vec {
if let Some(pos) = self.all_nodes.iter().position(|&n| n == node) {
let d = dist_vec[pos];
if d.is_finite() {
return Ok(d);
}
}
}
self.distances.get(&node).copied().ok_or_else(|| GraphError::NotFound(
format!("Node {:?} is not reachable from source", node)
))
}
pub fn is_reachable(&self, node: NodeIndex) -> bool {
if let Some(ref dist_vec) = self.distances_vec {
if let Some(pos) = self.all_nodes.iter().position(|&n| n == node) {
return dist_vec[pos].is_finite();
}
}
self.distances.get(&node).is_some_and(|&d| d.is_finite())
}
pub fn reconstruct_path(&self, target: NodeIndex) -> Option<Vec<NodeIndex>> {
if !self.is_reachable(target) {
return None;
}
let mut path = vec![target];
let mut current = target;
if let Some(ref pred_vec) = self.predecessors_vec {
while let Some(pos) = self.all_nodes.iter().position(|&n| n == current) {
if let Some(Some(pred)) = pred_vec.get(pos) {
if *pred == self.source {
path.push(*pred);
break;
}
path.push(*pred);
current = *pred;
} else {
break;
}
}
} else {
while let Some(&Some(pred)) = self.predecessors.get(¤t) {
if pred == self.source {
path.push(pred);
break;
}
path.push(pred);
current = pred;
}
}
path.reverse();
Some(path)
}
pub fn farthest_node(&self) -> Option<(NodeIndex, f64)> {
if let Some(ref dist_vec) = self.distances_vec {
let mut max_dist = f64::NEG_INFINITY;
let mut max_node = None;
for (i, &d) in dist_vec.iter().enumerate() {
if d.is_finite() && d > max_dist {
max_dist = d;
max_node = Some(self.all_nodes[i]);
}
}
if let Some(node) = max_node {
return Some((node, max_dist));
}
}
self.distances
.iter()
.filter(|(_, &d)| d.is_finite())
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap_or(Ordering::Equal))
.map(|(&node, &dist)| (node, dist))
}
}
#[derive(Debug, Clone)]
pub struct PartitionDijkstraStats {
pub partition_id: usize,
pub visited_count: usize,
pub boundary_count: usize,
pub min_distance: f64,
pub max_distance: f64,
}
#[derive(Debug, Clone)]
struct HeapItem {
node: NodeIndex,
distance: f64,
}
impl PartialEq for HeapItem {
fn eq(&self, other: &Self) -> bool {
self.node == other.node && self.distance.to_bits() == other.distance.to_bits()
}
}
impl Eq for HeapItem {}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> Ordering {
other
.distance
.partial_cmp(&self.distance)
.unwrap_or(Ordering::Equal)
}
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub struct DistributedDijkstra {
source: NodeIndex,
config: DijkstraConfig,
}
impl DistributedDijkstra {
pub fn new(source: NodeIndex) -> Self {
Self {
source,
config: DijkstraConfig::default(),
}
}
pub fn from_config(source: NodeIndex, config: DijkstraConfig) -> Self {
Self { source, config }
}
pub fn compute<G, F>(
&self,
graph: &G,
partitions: &[Partition],
mut get_weight: F,
) -> DijkstraResult
where
G: VirtualGraph<NodeData = (), EdgeData = f64>,
F: FnMut(NodeIndex, NodeIndex, &f64) -> f64,
{
let start_time = Instant::now();
let all_nodes: Vec<NodeIndex> = partitions
.iter()
.flat_map(|p| p.nodes.iter().copied())
.collect();
let n = all_nodes.len();
let max_index = all_nodes.iter().map(|ni| ni.index()).max().unwrap_or(0);
let mut node_to_idx_vec = vec![usize::MAX; max_index + 1];
for (pos, &node_idx) in all_nodes.iter().enumerate() {
node_to_idx_vec[node_idx.index()] = pos;
}
let get_node_idx = |node: NodeIndex| -> Option<usize> {
let idx = node_to_idx_vec.get(node.index()).copied();
if idx == Some(usize::MAX) { None } else { idx }
};
let mut edge_counts = vec![0usize; n];
for edge in graph.edges() {
if let Ok((u, v)) = graph.edge_endpoints(edge.index()) {
if let (Some(u_idx), Some(v_idx)) = (get_node_idx(u), get_node_idx(v)) {
edge_counts[u_idx] += 1;
let _ = v_idx; }
}
}
let mut edge_offsets = vec![0usize; n + 1];
for i in 0..n {
edge_offsets[i + 1] = edge_offsets[i] + edge_counts[i];
}
let total_edges = edge_offsets[n];
let mut edge_data: Vec<(usize, f64)> = Vec::with_capacity(total_edges);
edge_data.resize(total_edges, (0, 0.0));
let mut temp_pos = vec![0usize; n];
for partition in partitions {
for &node in &partition.nodes {
let u_idx = match get_node_idx(node) {
Some(idx) => idx,
None => continue,
};
for neighbor in graph.neighbors(node) {
let v_idx = match get_node_idx(neighbor) {
Some(idx) => idx,
None => continue,
};
if let Some(weight) = partition.get_edge_weight(node, neighbor) {
let pos = edge_offsets[u_idx] + temp_pos[u_idx];
edge_data[pos] = (v_idx, weight);
temp_pos[u_idx] += 1;
}
}
}
}
for edge in graph.edges() {
if let Ok((u, v)) = graph.edge_endpoints(edge.index()) {
if let (Some(u_idx), Some(v_idx)) = (get_node_idx(u), get_node_idx(v)) {
let start = edge_offsets[u_idx];
let end = edge_offsets[u_idx + 1];
let already_cached = edge_data[start..end].iter().any(|&(idx, _)| idx == v_idx);
if !already_cached {
let pos = edge_offsets[u_idx] + temp_pos[u_idx];
if pos < edge_offsets[u_idx + 1] {
let weight = get_weight(u, v, edge.data());
edge_data[pos] = (v_idx, weight);
temp_pos[u_idx] += 1;
}
}
}
}
}
edge_data.shrink_to_fit();
let source_idx = match get_node_idx(self.source) {
Some(idx) => idx,
None => return DijkstraResult {
source: self.source,
distances: HashMap::new(),
predecessors: HashMap::new(),
computation_time_ms: 0,
partition_stats: partitions
.iter()
.map(|p| PartitionDijkstraStats {
partition_id: p.id,
visited_count: 0,
boundary_count: p.boundary_nodes.len(),
min_distance: f64::INFINITY,
max_distance: f64::NEG_INFINITY,
})
.collect(),
distances_vec: None,
predecessors_vec: None,
all_nodes: vec![],
},
};
let mut distances: Vec<f64> = vec![f64::INFINITY; n];
let mut predecessors: Vec<Option<NodeIndex>> = vec![None; n];
let mut heap = BinaryHeap::new();
let mut visited: Vec<bool> = vec![false; n];
distances[source_idx] = 0.0;
predecessors[source_idx] = None;
heap.push(HeapItem {
node: self.source,
distance: 0.0,
});
let mut _visited_count = 0;
while let Some(HeapItem { node, distance }) = heap.pop() {
let node_idx = match get_node_idx(node) {
Some(idx) => idx,
None => continue,
};
if visited[node_idx] {
continue;
}
visited[node_idx] = true;
_visited_count += 1;
if self.config.target == Some(node) {
break;
}
if let Some(max_dist) = self.config.max_distance {
if distance > max_dist {
continue;
}
}
let start = edge_offsets[node_idx];
let end = edge_offsets[node_idx + 1];
for &(neighbor_idx, weight) in &edge_data[start..end] {
if visited[neighbor_idx] {
continue;
}
let new_distance = distance + weight;
if new_distance < distances[neighbor_idx] {
distances[neighbor_idx] = new_distance;
if self.config.compute_predecessors {
predecessors[neighbor_idx] = Some(node);
}
heap.push(HeapItem {
node: all_nodes[neighbor_idx],
distance: new_distance,
});
}
}
}
let computation_time_ms = start_time.elapsed().as_millis() as u64;
let partition_stats: Vec<PartitionDijkstraStats> = partitions
.iter()
.map(|p| {
let partition_distances: Vec<_> = p
.nodes
.iter()
.filter_map(|&n| get_node_idx(n).and_then(|idx| {
let d = distances[idx];
if d.is_finite() { Some(d) } else { None }
}))
.collect();
let visited_in_partition = partition_distances.len();
let min_dist = partition_distances
.iter()
.cloned()
.fold(f64::INFINITY, f64::min);
let max_dist = partition_distances
.iter()
.cloned()
.fold(f64::NEG_INFINITY, f64::max);
PartitionDijkstraStats {
partition_id: p.id,
visited_count: visited_in_partition,
boundary_count: p.boundary_nodes.len(),
min_distance: min_dist,
max_distance: max_dist,
}
})
.collect();
DijkstraResult::from_vec(
self.source,
distances,
predecessors,
all_nodes,
computation_time_ms,
partition_stats,
)
}
pub fn shortest_path<G, F>(
&self,
graph: &G,
partitions: &[Partition],
target: NodeIndex,
get_weight: F,
) -> Option<Vec<NodeIndex>>
where
G: VirtualGraph<NodeData = (), EdgeData = f64>,
F: FnMut(NodeIndex, NodeIndex, &f64) -> f64,
{
let config = DijkstraConfig::new()
.with_predecessors(true)
.with_target(target);
let dijkstra = DistributedDijkstra::from_config(self.source, config);
let result = dijkstra.compute(graph, partitions, get_weight);
result.reconstruct_path(target)
}
}
pub fn simple_dijkstra<G, F>(
graph: &G,
source: NodeIndex,
mut get_weight: F,
) -> HashMap<NodeIndex, f64>
where
G: VirtualGraph<NodeData = (), EdgeData = f64>,
F: FnMut(NodeIndex, NodeIndex, &f64) -> f64,
{
let all_nodes: Vec<NodeIndex> = graph.nodes().map(|n| n.index()).collect();
let n = all_nodes.len();
let max_index = all_nodes.iter().map(|ni| ni.index()).max().unwrap_or(0);
let mut node_to_idx_vec = vec![usize::MAX; max_index + 1];
for (pos, &node_idx) in all_nodes.iter().enumerate() {
node_to_idx_vec[node_idx.index()] = pos;
}
let get_node_idx = |node: NodeIndex| -> Option<usize> {
let idx = node_to_idx_vec.get(node.index()).copied();
if idx == Some(usize::MAX) { None } else { idx }
};
let mut distances: Vec<f64> = vec![f64::INFINITY; n];
let mut visited: Vec<bool> = vec![false; n];
let mut heap = BinaryHeap::new();
let source_idx = get_node_idx(source).unwrap_or(0);
distances[source_idx] = 0.0;
heap.push(HeapItem {
node: source,
distance: 0.0,
});
while let Some(HeapItem { node, distance }) = heap.pop() {
let node_idx = match get_node_idx(node) {
Some(idx) => idx,
None => continue,
};
if visited[node_idx] {
continue;
}
visited[node_idx] = true;
for neighbor in graph.neighbors(node) {
let neighbor_idx = match get_node_idx(neighbor) {
Some(idx) => idx,
None => continue,
};
if visited[neighbor_idx] {
continue;
}
let weight = graph.incident_edges(node).find_map(|edge_idx| {
if let Ok((u, v)) = graph.edge_endpoints(edge_idx) {
if (u == node && v == neighbor) || (u == neighbor && v == node) {
if let Ok(edge_data) = graph.get_edge(edge_idx) {
return Some(get_weight(node, neighbor, edge_data));
}
}
}
None
});
if let Some(w) = weight {
let new_distance = distance + w;
if new_distance < distances[neighbor_idx] {
distances[neighbor_idx] = new_distance;
heap.push(HeapItem {
node: neighbor,
distance: new_distance,
});
}
}
}
}
all_nodes
.into_iter()
.enumerate()
.filter(|(i, _)| distances[*i].is_finite())
.map(|(i, n)| (n, distances[i]))
.collect()
}
pub fn has_negative_weights<G>(graph: &G) -> bool
where
G: VirtualGraph<NodeData = (), EdgeData = f64>,
{
for edge_ref in graph.edges() {
let weight = *edge_ref.data();
if weight < 0.0 {
return true;
}
}
false
}
pub fn validate_path<G>(
graph: &G,
path: &[NodeIndex],
expected_distance: f64,
get_weight: &mut impl FnMut(NodeIndex, NodeIndex, &f64) -> f64,
) -> bool
where
G: VirtualGraph<NodeData = (), EdgeData = f64>,
{
if path.is_empty() {
return expected_distance == 0.0;
}
let mut total_distance = 0.0;
for i in 0..path.len() - 1 {
let u = path[i];
let v = path[i + 1];
let edge_weight = graph.incident_edges(u).find_map(|edge_idx| {
if let Ok((a, b)) = graph.edge_endpoints(edge_idx) {
if (a == u && b == v) || (a == v && b == u) {
if let Ok(edge_data) = graph.get_edge(edge_idx) {
return Some(get_weight(u, v, edge_data));
}
}
}
None
});
if let Some(w) = edge_weight {
total_distance += w;
} else {
return false; }
}
(total_distance - expected_distance).abs() < 1e-9
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parallel::partitioner::{HashPartitioner, Partitioner};
use crate::graph::Graph;
use crate::graph::traits::GraphOps;
#[test]
fn test_dijkstra_config() {
let config = DijkstraConfig::new()
.with_predecessors(true)
.with_target(NodeIndex::new_public(5))
.with_max_distance(100.0);
assert!(config.compute_predecessors);
assert_eq!(config.target, Some(NodeIndex::new_public(5)));
assert_eq!(config.max_distance, Some(100.0));
}
#[test]
fn test_distributed_dijkstra_basic() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..5).map(|_| graph.add_node(()).unwrap()).collect();
graph.add_edge(nodes[0], nodes[1], 1.0).unwrap();
graph.add_edge(nodes[1], nodes[2], 2.0).unwrap();
graph.add_edge(nodes[0], nodes[2], 5.0).unwrap();
graph.add_edge(nodes[2], nodes[3], 1.0).unwrap();
graph.add_edge(nodes[3], nodes[4], 3.0).unwrap();
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let dijkstra = DistributedDijkstra::new(nodes[0]);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
assert_eq!(result.distance(nodes[0]), Some(0.0));
assert_eq!(result.distance(nodes[1]), Some(1.0));
assert_eq!(result.distance(nodes[2]), Some(3.0)); assert!(result.is_reachable(nodes[4]));
}
#[test]
fn test_distributed_dijkstra_with_predecessors() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..5).map(|_| graph.add_node(()).unwrap()).collect();
graph.add_edge(nodes[0], nodes[1], 1.0).unwrap();
graph.add_edge(nodes[1], nodes[2], 2.0).unwrap();
graph.add_edge(nodes[2], nodes[3], 1.0).unwrap();
graph.add_edge(nodes[3], nodes[4], 1.0).unwrap();
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let config = DijkstraConfig::new().with_predecessors(true);
let dijkstra = DistributedDijkstra::from_config(nodes[0], config);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
let path = result.reconstruct_path(nodes[4]);
assert!(path.is_some());
let path = path.unwrap();
assert_eq!(path.len(), 5);
assert_eq!(path[0], nodes[0]);
assert_eq!(*path.last().unwrap(), nodes[4]);
}
#[test]
fn test_distributed_dijkstra_target() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..10).map(|_| graph.add_node(()).unwrap()).collect();
for i in 0..nodes.len() - 1 {
graph.add_edge(nodes[i], nodes[i + 1], 1.0).unwrap();
}
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let config = DijkstraConfig::new().with_target(nodes[5]);
let dijkstra = DistributedDijkstra::from_config(nodes[0], config);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
assert_eq!(result.distance(nodes[5]), Some(5.0));
}
#[test]
fn test_distributed_dijkstra_empty_graph() {
let graph = Graph::<(), f64>::undirected();
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let dijkstra = DistributedDijkstra::new(NodeIndex::new_public(0));
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
assert_eq!(result.distances.len(), 0);
assert_eq!(result.computation_time_ms, 0);
}
#[test]
fn test_distributed_dijkstra_isolated_nodes() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..5).map(|_| graph.add_node(()).unwrap()).collect();
graph.add_edge(nodes[0], nodes[1], 1.0).unwrap();
graph.add_edge(nodes[1], nodes[2], 2.0).unwrap();
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let dijkstra = DistributedDijkstra::new(nodes[0]);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
assert!(result.is_reachable(nodes[0]));
assert!(result.is_reachable(nodes[1]));
assert!(result.is_reachable(nodes[2]));
assert!(!result.is_reachable(nodes[3]));
assert!(!result.is_reachable(nodes[4]));
}
#[test]
fn test_dijkstra_config_validation() {
let config = DijkstraConfig::new().with_max_distance(-1.0);
assert_eq!(
config.validate(NodeIndex::new_public(0)),
Err(DijkstraConfigError::NegativeMaxDistance)
);
let source = NodeIndex::new_public(5);
let config = DijkstraConfig::new().with_target(source);
assert_eq!(
config.validate(source),
Err(DijkstraConfigError::SourceEqualsTarget)
);
let config = DijkstraConfig::new();
assert!(config.validate(NodeIndex::new_public(0)).is_ok());
}
#[test]
fn test_dijkstra_try_new() {
let result = DijkstraConfig::try_new(NodeIndex::new_public(0));
assert!(result.is_ok());
let source = NodeIndex::new_public(5);
let config = DijkstraConfig::new().with_target(source);
let result = config.validate(source);
assert!(result.is_err());
}
#[test]
fn test_distributed_dijkstra_max_distance() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..10).map(|_| graph.add_node(()).unwrap()).collect();
for i in 0..nodes.len() - 1 {
graph.add_edge(nodes[i], nodes[i + 1], 1.0).unwrap();
}
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let config = DijkstraConfig::new().with_max_distance(3.0);
let dijkstra = DistributedDijkstra::from_config(nodes[0], config);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
assert_eq!(result.distance(nodes[3]), Some(3.0));
assert!(!result.is_reachable(nodes[5]));
}
#[test]
fn test_simple_dijkstra() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..5).map(|_| graph.add_node(()).unwrap()).collect();
graph.add_edge(nodes[0], nodes[1], 1.0).unwrap();
graph.add_edge(nodes[1], nodes[2], 2.0).unwrap();
graph.add_edge(nodes[2], nodes[3], 1.0).unwrap();
let distances = simple_dijkstra(&graph, nodes[0], |_, _, w| *w);
assert_eq!(distances.get(&nodes[0]), Some(&0.0));
assert_eq!(distances.get(&nodes[2]), Some(&3.0));
}
#[test]
fn test_has_negative_weights() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..3).map(|_| graph.add_node(()).unwrap()).collect();
graph.add_edge(nodes[0], nodes[1], 1.0).unwrap();
graph.add_edge(nodes[1], nodes[2], -2.0).unwrap();
assert!(has_negative_weights(&graph));
}
#[test]
fn test_partition_stats() {
let mut graph = Graph::<(), f64>::undirected();
for _ in 0..20 {
graph.add_node(()).unwrap();
}
let partitioner = HashPartitioner::new(4);
let partitions = partitioner.partition_graph(&graph);
let source = NodeIndex::new_public(0);
let dijkstra = DistributedDijkstra::new(source);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
assert_eq!(result.partition_stats.len(), 4);
let total_visited: usize = result.partition_stats.iter().map(|s| s.visited_count).sum();
assert!(total_visited >= 1, "Source node should always be visited");
}
#[test]
fn test_farthest_node() {
let mut graph = Graph::<(), f64>::undirected();
let nodes: Vec<NodeIndex> = (0..5).map(|_| graph.add_node(()).unwrap()).collect();
graph.add_edge(nodes[0], nodes[1], 1.0).unwrap();
graph.add_edge(nodes[1], nodes[2], 2.0).unwrap();
graph.add_edge(nodes[2], nodes[3], 3.0).unwrap();
graph.add_edge(nodes[3], nodes[4], 4.0).unwrap();
let partitioner = HashPartitioner::new(2);
let partitions = partitioner.partition_graph(&graph);
let dijkstra = DistributedDijkstra::new(nodes[0]);
let result = dijkstra.compute(&graph, &partitions, |_, _, w| *w);
let farthest = result.farthest_node();
assert!(farthest.is_some());
let (node, dist) = farthest.unwrap();
assert_eq!(node, nodes[4]);
assert_eq!(dist, 10.0); }
}