1use std::collections::VecDeque;
7use std::sync::OnceLock;
8
9use grafeo_common::types::{EdgeId, NodeId, Value};
10use grafeo_common::utils::error::{Error, Result};
11use grafeo_common::utils::hash::FxHashMap;
12use grafeo_core::graph::Direction;
13use grafeo_core::graph::lpg::LpgStore;
14
15use super::super::{AlgorithmResult, ParameterDef, ParameterType, Parameters};
16use super::traits::GraphAlgorithm;
17
18fn extract_capacity(store: &LpgStore, edge_id: EdgeId, capacity_prop: Option<&str>) -> f64 {
24 if let Some(prop_name) = capacity_prop
25 && let Some(edge) = store.get_edge(edge_id)
26 && let Some(value) = edge.get_property(prop_name)
27 {
28 return match value {
29 Value::Int64(i) => *i as f64,
30 Value::Float64(f) => *f,
31 _ => 1.0,
32 };
33 }
34 1.0
35}
36
37fn extract_cost(store: &LpgStore, edge_id: EdgeId, cost_prop: Option<&str>) -> f64 {
39 if let Some(prop_name) = cost_prop
40 && let Some(edge) = store.get_edge(edge_id)
41 && let Some(value) = edge.get_property(prop_name)
42 {
43 return match value {
44 Value::Int64(i) => *i as f64,
45 Value::Float64(f) => *f,
46 _ => 0.0,
47 };
48 }
49 0.0
50}
51
52#[derive(Debug, Clone)]
58pub struct MaxFlowResult {
59 pub max_flow: f64,
61 pub flow_edges: Vec<(NodeId, NodeId, f64)>,
63}
64
65pub fn max_flow(
89 store: &LpgStore,
90 source: NodeId,
91 sink: NodeId,
92 capacity_property: Option<&str>,
93) -> Option<MaxFlowResult> {
94 if store.get_node(source).is_none() || store.get_node(sink).is_none() {
96 return None;
97 }
98
99 if source == sink {
100 return Some(MaxFlowResult {
101 max_flow: 0.0,
102 flow_edges: Vec::new(),
103 });
104 }
105
106 let nodes = store.node_ids();
107 let n = nodes.len();
108
109 let mut node_to_idx: FxHashMap<NodeId, usize> = FxHashMap::default();
111 let mut idx_to_node: Vec<NodeId> = Vec::with_capacity(n);
112 for (idx, &node) in nodes.iter().enumerate() {
113 node_to_idx.insert(node, idx);
114 idx_to_node.push(node);
115 }
116
117 let source_idx = *node_to_idx.get(&source)?;
118 let sink_idx = *node_to_idx.get(&sink)?;
119
120 let mut capacity: Vec<FxHashMap<usize, f64>> = vec![FxHashMap::default(); n];
123 let mut edge_map: FxHashMap<(usize, usize), EdgeId> = FxHashMap::default();
124
125 for &node in &nodes {
126 let i = *node_to_idx.get(&node).unwrap();
127 for (neighbor, edge_id) in store.edges_from(node, Direction::Outgoing) {
128 if let Some(&j) = node_to_idx.get(&neighbor) {
129 let cap = extract_capacity(store, edge_id, capacity_property);
130 *capacity[i].entry(j).or_insert(0.0) += cap;
131 edge_map.insert((i, j), edge_id);
132 }
133 }
134 }
135
136 let mut residual: Vec<FxHashMap<usize, f64>> = capacity.clone();
138
139 for i in 0..n {
141 let neighbors: Vec<usize> = residual[i].keys().copied().collect();
142 for j in neighbors {
143 residual[j].entry(i).or_insert(0.0);
144 }
145 }
146
147 let mut total_flow = 0.0;
148
149 loop {
151 let mut parent: Vec<Option<usize>> = vec![None; n];
152 let mut visited = vec![false; n];
153 let mut queue: VecDeque<usize> = VecDeque::new();
154
155 visited[source_idx] = true;
156 queue.push_back(source_idx);
157
158 while let Some(u) = queue.pop_front() {
159 if u == sink_idx {
160 break;
161 }
162
163 for (&v, &cap) in &residual[u] {
164 if !visited[v] && cap > 1e-9 {
165 visited[v] = true;
166 parent[v] = Some(u);
167 queue.push_back(v);
168 }
169 }
170 }
171
172 if !visited[sink_idx] {
174 break;
175 }
176
177 let mut path_flow = f64::INFINITY;
179 let mut v = sink_idx;
180 while let Some(u) = parent[v] {
181 let cap = *residual[u].get(&v).unwrap_or(&0.0);
182 path_flow = path_flow.min(cap);
183 v = u;
184 }
185
186 v = sink_idx;
188 while let Some(u) = parent[v] {
189 *residual[u].entry(v).or_insert(0.0) -= path_flow;
190 *residual[v].entry(u).or_insert(0.0) += path_flow;
191 v = u;
192 }
193
194 total_flow += path_flow;
195 }
196
197 let mut flow_edges: Vec<(NodeId, NodeId, f64)> = Vec::new();
199 for i in 0..n {
200 for (&j, &original_cap) in &capacity[i] {
201 let residual_cap = *residual[i].get(&j).unwrap_or(&0.0);
202 let flow = original_cap - residual_cap;
203 if flow > 1e-9 {
204 flow_edges.push((idx_to_node[i], idx_to_node[j], flow));
205 }
206 }
207 }
208
209 Some(MaxFlowResult {
210 max_flow: total_flow,
211 flow_edges,
212 })
213}
214
215#[derive(Debug, Clone)]
221pub struct MinCostFlowResult {
222 pub max_flow: f64,
224 pub total_cost: f64,
226 pub flow_edges: Vec<(NodeId, NodeId, f64, f64)>,
228}
229
230pub fn min_cost_max_flow(
255 store: &LpgStore,
256 source: NodeId,
257 sink: NodeId,
258 capacity_property: Option<&str>,
259 cost_property: Option<&str>,
260) -> Option<MinCostFlowResult> {
261 if store.get_node(source).is_none() || store.get_node(sink).is_none() {
263 return None;
264 }
265
266 if source == sink {
267 return Some(MinCostFlowResult {
268 max_flow: 0.0,
269 total_cost: 0.0,
270 flow_edges: Vec::new(),
271 });
272 }
273
274 let nodes = store.node_ids();
275 let n = nodes.len();
276
277 let mut node_to_idx: FxHashMap<NodeId, usize> = FxHashMap::default();
279 let mut idx_to_node: Vec<NodeId> = Vec::with_capacity(n);
280 for (idx, &node) in nodes.iter().enumerate() {
281 node_to_idx.insert(node, idx);
282 idx_to_node.push(node);
283 }
284
285 let source_idx = *node_to_idx.get(&source)?;
286 let sink_idx = *node_to_idx.get(&sink)?;
287
288 let mut capacity: Vec<FxHashMap<usize, f64>> = vec![FxHashMap::default(); n];
290 let mut cost: Vec<FxHashMap<usize, f64>> = vec![FxHashMap::default(); n];
291
292 for &node in &nodes {
293 let i = *node_to_idx.get(&node).unwrap();
294 for (neighbor, edge_id) in store.edges_from(node, Direction::Outgoing) {
295 if let Some(&j) = node_to_idx.get(&neighbor) {
296 let cap = extract_capacity(store, edge_id, capacity_property);
297 let edge_cost = extract_cost(store, edge_id, cost_property);
298 *capacity[i].entry(j).or_insert(0.0) += cap;
299 *cost[i].entry(j).or_insert(0.0) = edge_cost;
300 }
301 }
302 }
303
304 let mut residual: Vec<FxHashMap<usize, f64>> = capacity.clone();
306 let mut residual_cost: Vec<FxHashMap<usize, f64>> = cost.clone();
307
308 for i in 0..n {
310 let neighbors: Vec<usize> = residual[i].keys().copied().collect();
311 for j in neighbors {
312 residual[j].entry(i).or_insert(0.0);
313 let c = *cost[i].get(&j).unwrap_or(&0.0);
314 residual_cost[j].entry(i).or_insert(-c);
315 }
316 }
317
318 let mut total_flow = 0.0;
319 let mut total_cost_val = 0.0;
320
321 loop {
323 let mut dist = vec![f64::INFINITY; n];
325 let mut parent: Vec<Option<usize>> = vec![None; n];
326 dist[source_idx] = 0.0;
327
328 for _ in 0..n {
329 let mut changed = false;
330 for u in 0..n {
331 if dist[u] == f64::INFINITY {
332 continue;
333 }
334 for (&v, &cap) in &residual[u] {
335 if cap > 1e-9 {
336 let c = *residual_cost[u].get(&v).unwrap_or(&0.0);
337 if dist[u] + c < dist[v] {
338 dist[v] = dist[u] + c;
339 parent[v] = Some(u);
340 changed = true;
341 }
342 }
343 }
344 }
345 if !changed {
346 break;
347 }
348 }
349
350 if dist[sink_idx] == f64::INFINITY {
352 break;
353 }
354
355 let mut path_flow = f64::INFINITY;
357 let mut v = sink_idx;
358 while let Some(u) = parent[v] {
359 let cap = *residual[u].get(&v).unwrap_or(&0.0);
360 path_flow = path_flow.min(cap);
361 v = u;
362 }
363
364 let path_cost = dist[sink_idx];
366 total_flow += path_flow;
367 total_cost_val += path_flow * path_cost;
368
369 v = sink_idx;
371 while let Some(u) = parent[v] {
372 *residual[u].entry(v).or_insert(0.0) -= path_flow;
373 *residual[v].entry(u).or_insert(0.0) += path_flow;
374 v = u;
375 }
376 }
377
378 let mut flow_edges: Vec<(NodeId, NodeId, f64, f64)> = Vec::new();
380 for i in 0..n {
381 for (&j, &original_cap) in &capacity[i] {
382 let residual_cap = *residual[i].get(&j).unwrap_or(&0.0);
383 let flow = original_cap - residual_cap;
384 if flow > 1e-9 {
385 let edge_cost = *cost[i].get(&j).unwrap_or(&0.0);
386 flow_edges.push((idx_to_node[i], idx_to_node[j], flow, edge_cost));
387 }
388 }
389 }
390
391 Some(MinCostFlowResult {
392 max_flow: total_flow,
393 total_cost: total_cost_val,
394 flow_edges,
395 })
396}
397
398static MAX_FLOW_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
404
405fn max_flow_params() -> &'static [ParameterDef] {
406 MAX_FLOW_PARAMS.get_or_init(|| {
407 vec![
408 ParameterDef {
409 name: "source".to_string(),
410 description: "Source node ID".to_string(),
411 param_type: ParameterType::NodeId,
412 required: true,
413 default: None,
414 },
415 ParameterDef {
416 name: "sink".to_string(),
417 description: "Sink node ID".to_string(),
418 param_type: ParameterType::NodeId,
419 required: true,
420 default: None,
421 },
422 ParameterDef {
423 name: "capacity".to_string(),
424 description: "Edge property name for capacities (default: 1.0)".to_string(),
425 param_type: ParameterType::String,
426 required: false,
427 default: None,
428 },
429 ]
430 })
431}
432
433pub struct MaxFlowAlgorithm;
435
436impl GraphAlgorithm for MaxFlowAlgorithm {
437 fn name(&self) -> &str {
438 "max_flow"
439 }
440
441 fn description(&self) -> &str {
442 "Maximum flow using Edmonds-Karp algorithm"
443 }
444
445 fn parameters(&self) -> &[ParameterDef] {
446 max_flow_params()
447 }
448
449 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
450 let source_id = params
451 .get_int("source")
452 .ok_or_else(|| Error::InvalidValue("source parameter required".to_string()))?;
453 let sink_id = params
454 .get_int("sink")
455 .ok_or_else(|| Error::InvalidValue("sink parameter required".to_string()))?;
456
457 let source = NodeId::new(source_id as u64);
458 let sink = NodeId::new(sink_id as u64);
459 let capacity_prop = params.get_string("capacity");
460
461 let result = max_flow(store, source, sink, capacity_prop)
462 .ok_or_else(|| Error::InvalidValue("Invalid source or sink node".to_string()))?;
463
464 let mut output = AlgorithmResult::new(vec![
465 "source".to_string(),
466 "target".to_string(),
467 "flow".to_string(),
468 "max_flow".to_string(),
469 ]);
470
471 for (src, dst, flow) in result.flow_edges {
472 output.add_row(vec![
473 Value::Int64(src.0 as i64),
474 Value::Int64(dst.0 as i64),
475 Value::Float64(flow),
476 Value::Float64(result.max_flow),
477 ]);
478 }
479
480 if output.row_count() == 0 {
482 output.add_row(vec![
483 Value::Int64(source_id),
484 Value::Int64(sink_id),
485 Value::Float64(0.0),
486 Value::Float64(result.max_flow),
487 ]);
488 }
489
490 Ok(output)
491 }
492}
493
494static MIN_COST_FLOW_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
496
497fn min_cost_flow_params() -> &'static [ParameterDef] {
498 MIN_COST_FLOW_PARAMS.get_or_init(|| {
499 vec![
500 ParameterDef {
501 name: "source".to_string(),
502 description: "Source node ID".to_string(),
503 param_type: ParameterType::NodeId,
504 required: true,
505 default: None,
506 },
507 ParameterDef {
508 name: "sink".to_string(),
509 description: "Sink node ID".to_string(),
510 param_type: ParameterType::NodeId,
511 required: true,
512 default: None,
513 },
514 ParameterDef {
515 name: "capacity".to_string(),
516 description: "Edge property name for capacities (default: 1.0)".to_string(),
517 param_type: ParameterType::String,
518 required: false,
519 default: None,
520 },
521 ParameterDef {
522 name: "cost".to_string(),
523 description: "Edge property name for costs (default: 0.0)".to_string(),
524 param_type: ParameterType::String,
525 required: false,
526 default: None,
527 },
528 ]
529 })
530}
531
532pub struct MinCostFlowAlgorithm;
534
535impl GraphAlgorithm for MinCostFlowAlgorithm {
536 fn name(&self) -> &str {
537 "min_cost_max_flow"
538 }
539
540 fn description(&self) -> &str {
541 "Minimum cost maximum flow using successive shortest paths"
542 }
543
544 fn parameters(&self) -> &[ParameterDef] {
545 min_cost_flow_params()
546 }
547
548 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
549 let source_id = params
550 .get_int("source")
551 .ok_or_else(|| Error::InvalidValue("source parameter required".to_string()))?;
552 let sink_id = params
553 .get_int("sink")
554 .ok_or_else(|| Error::InvalidValue("sink parameter required".to_string()))?;
555
556 let source = NodeId::new(source_id as u64);
557 let sink = NodeId::new(sink_id as u64);
558 let capacity_prop = params.get_string("capacity");
559 let cost_prop = params.get_string("cost");
560
561 let result = min_cost_max_flow(store, source, sink, capacity_prop, cost_prop)
562 .ok_or_else(|| Error::InvalidValue("Invalid source or sink node".to_string()))?;
563
564 let mut output = AlgorithmResult::new(vec![
565 "source".to_string(),
566 "target".to_string(),
567 "flow".to_string(),
568 "cost".to_string(),
569 "max_flow".to_string(),
570 "total_cost".to_string(),
571 ]);
572
573 for (src, dst, flow, cost) in result.flow_edges {
574 output.add_row(vec![
575 Value::Int64(src.0 as i64),
576 Value::Int64(dst.0 as i64),
577 Value::Float64(flow),
578 Value::Float64(cost),
579 Value::Float64(result.max_flow),
580 Value::Float64(result.total_cost),
581 ]);
582 }
583
584 Ok(output)
585 }
586}
587
588#[cfg(test)]
593mod tests {
594 use super::*;
595
596 fn create_simple_flow_graph() -> LpgStore {
597 let store = LpgStore::new();
605
606 let n0 = store.create_node(&["Node"]); let n1 = store.create_node(&["Node"]);
608 let n2 = store.create_node(&["Node"]);
609 let n3 = store.create_node(&["Node"]); store.create_edge_with_props(n0, n1, "EDGE", [("capacity", Value::Float64(5.0))]);
612 store.create_edge_with_props(n0, n2, "EDGE", [("capacity", Value::Float64(3.0))]);
613 store.create_edge_with_props(n1, n3, "EDGE", [("capacity", Value::Float64(3.0))]);
614 store.create_edge_with_props(n1, n2, "EDGE", [("capacity", Value::Float64(2.0))]);
615 store.create_edge_with_props(n2, n3, "EDGE", [("capacity", Value::Float64(4.0))]);
616
617 store
618 }
619
620 fn create_cost_flow_graph() -> LpgStore {
621 let store = LpgStore::new();
626
627 let n0 = store.create_node(&["Node"]);
628 let n1 = store.create_node(&["Node"]);
629 let n2 = store.create_node(&["Node"]);
630
631 store.create_edge_with_props(
632 n0,
633 n1,
634 "EDGE",
635 [
636 ("capacity", Value::Float64(3.0)),
637 ("cost", Value::Float64(1.0)),
638 ],
639 );
640 store.create_edge_with_props(
641 n1,
642 n2,
643 "EDGE",
644 [
645 ("capacity", Value::Float64(2.0)),
646 ("cost", Value::Float64(2.0)),
647 ],
648 );
649 store.create_edge_with_props(
650 n0,
651 n2,
652 "EDGE",
653 [
654 ("capacity", Value::Float64(2.0)),
655 ("cost", Value::Float64(5.0)),
656 ],
657 );
658
659 store
660 }
661
662 #[test]
663 fn test_max_flow_basic() {
664 let store = create_simple_flow_graph();
665 let result = max_flow(&store, NodeId::new(0), NodeId::new(3), Some("capacity"));
666
667 assert!(result.is_some());
668 let result = result.unwrap();
669
670 assert!(result.max_flow >= 5.0 && result.max_flow <= 8.0);
676 }
677
678 #[test]
679 fn test_max_flow_same_source_sink() {
680 let store = create_simple_flow_graph();
681 let result = max_flow(&store, NodeId::new(0), NodeId::new(0), Some("capacity"));
682
683 assert!(result.is_some());
684 assert_eq!(result.unwrap().max_flow, 0.0);
685 }
686
687 #[test]
688 fn test_max_flow_no_path() {
689 let store = LpgStore::new();
690 let n0 = store.create_node(&["Node"]);
691 let _n1 = store.create_node(&["Node"]); let result = max_flow(&store, n0, NodeId::new(1), None);
694 assert!(result.is_some());
695 assert_eq!(result.unwrap().max_flow, 0.0);
696 }
697
698 #[test]
699 fn test_max_flow_invalid_nodes() {
700 let store = LpgStore::new();
701 store.create_node(&["Node"]);
702
703 let result = max_flow(&store, NodeId::new(999), NodeId::new(0), None);
704 assert!(result.is_none());
705 }
706
707 #[test]
708 fn test_min_cost_flow_basic() {
709 let store = create_cost_flow_graph();
710 let result = min_cost_max_flow(
711 &store,
712 NodeId::new(0),
713 NodeId::new(2),
714 Some("capacity"),
715 Some("cost"),
716 );
717
718 assert!(result.is_some());
719 let result = result.unwrap();
720
721 assert!(result.max_flow > 0.0);
723
724 assert!(result.total_cost >= 0.0);
726 }
727
728 #[test]
729 fn test_min_cost_prefers_cheaper_path() {
730 let store = create_cost_flow_graph();
731 let result = min_cost_max_flow(
732 &store,
733 NodeId::new(0),
734 NodeId::new(2),
735 Some("capacity"),
736 Some("cost"),
737 );
738
739 assert!(result.is_some());
740 let result = result.unwrap();
741
742 assert!(result.max_flow >= 2.0);
747 }
748
749 #[test]
750 fn test_min_cost_flow_no_path() {
751 let store = LpgStore::new();
752 let n0 = store.create_node(&["Node"]);
753 let _n1 = store.create_node(&["Node"]);
754
755 let result = min_cost_max_flow(&store, n0, NodeId::new(1), None, None);
756 assert!(result.is_some());
757 assert_eq!(result.unwrap().max_flow, 0.0);
758 }
759
760 #[test]
761 fn test_max_flow_default_capacity() {
762 let store = LpgStore::new();
764 let n0 = store.create_node(&["Node"]);
765 let n1 = store.create_node(&["Node"]);
766 let n2 = store.create_node(&["Node"]);
767
768 store.create_edge(n0, n1, "EDGE");
769 store.create_edge(n0, n2, "EDGE");
770 store.create_edge(n1, n2, "EDGE");
771
772 let result = max_flow(&store, n0, n2, None);
773 assert!(result.is_some());
774
775 assert!(result.unwrap().max_flow >= 1.0);
777 }
778
779 #[test]
780 fn test_max_flow_flow_edges_populated() {
781 let store = create_simple_flow_graph();
782 let result = max_flow(&store, NodeId::new(0), NodeId::new(3), Some("capacity")).unwrap();
783
784 assert!(result.max_flow > 0.0);
786 assert!(!result.flow_edges.is_empty());
787
788 for (_, _, flow) in &result.flow_edges {
790 assert!(*flow > 0.0);
791 }
792 }
793
794 #[test]
795 fn test_max_flow_int_capacity() {
796 let store = LpgStore::new();
797 let n0 = store.create_node(&["Node"]);
798 let n1 = store.create_node(&["Node"]);
799
800 store.create_edge_with_props(n0, n1, "EDGE", [("capacity", Value::Int64(5))]);
802
803 let result = max_flow(&store, n0, n1, Some("capacity")).unwrap();
804 assert!((result.max_flow - 5.0).abs() < f64::EPSILON);
805 }
806
807 #[test]
808 fn test_max_flow_parallel_paths() {
809 let store = LpgStore::new();
810 let s = store.create_node(&["Node"]);
811 let a = store.create_node(&["Node"]);
812 let b = store.create_node(&["Node"]);
813 let t = store.create_node(&["Node"]);
814
815 store.create_edge_with_props(s, a, "EDGE", [("capacity", Value::Float64(3.0))]);
817 store.create_edge_with_props(a, t, "EDGE", [("capacity", Value::Float64(3.0))]);
818 store.create_edge_with_props(s, b, "EDGE", [("capacity", Value::Float64(2.0))]);
819 store.create_edge_with_props(b, t, "EDGE", [("capacity", Value::Float64(2.0))]);
820
821 let result = max_flow(&store, s, t, Some("capacity")).unwrap();
822 assert!((result.max_flow - 5.0).abs() < f64::EPSILON);
823 }
824
825 #[test]
826 fn test_max_flow_bottleneck() {
827 let store = LpgStore::new();
828 let s = store.create_node(&["Node"]);
829 let mid = store.create_node(&["Node"]);
830 let t = store.create_node(&["Node"]);
831
832 store.create_edge_with_props(s, mid, "EDGE", [("capacity", Value::Float64(100.0))]);
834 store.create_edge_with_props(mid, t, "EDGE", [("capacity", Value::Float64(1.0))]);
835
836 let result = max_flow(&store, s, t, Some("capacity")).unwrap();
837 assert!((result.max_flow - 1.0).abs() < f64::EPSILON);
838 }
839
840 #[test]
841 fn test_min_cost_flow_same_source_sink() {
842 let store = create_cost_flow_graph();
843 let result = min_cost_max_flow(
844 &store,
845 NodeId::new(0),
846 NodeId::new(0),
847 Some("capacity"),
848 Some("cost"),
849 )
850 .unwrap();
851 assert_eq!(result.max_flow, 0.0);
852 assert_eq!(result.total_cost, 0.0);
853 }
854
855 #[test]
856 fn test_min_cost_flow_invalid_nodes() {
857 let store = LpgStore::new();
858 store.create_node(&["Node"]);
859 let result = min_cost_max_flow(&store, NodeId::new(999), NodeId::new(0), None, None);
860 assert!(result.is_none());
861 }
862
863 #[test]
864 fn test_min_cost_flow_edges() {
865 let store = create_cost_flow_graph();
866 let result = min_cost_max_flow(
867 &store,
868 NodeId::new(0),
869 NodeId::new(2),
870 Some("capacity"),
871 Some("cost"),
872 )
873 .unwrap();
874
875 for (_, _, flow, _cost) in &result.flow_edges {
877 assert!(*flow > 0.0);
878 }
879 }
880}