1use std::collections::VecDeque;
7use std::sync::OnceLock;
8
9use graphos_common::types::{EdgeId, NodeId, Value};
10use graphos_common::utils::error::{Error, Result};
11use graphos_common::utils::hash::FxHashMap;
12use graphos_core::graph::Direction;
13use graphos_core::graph::lpg::LpgStore;
14
15use super::super::{AlgorithmResult, ParameterDef, ParameterType, Parameters};
16use super::traits::GraphAlgorithm;
17
18fn extract_capacity(store: &LpgStore, edge_id: EdgeId, capacity_prop: Option<&str>) -> f64 {
24 if let Some(prop_name) = capacity_prop {
25 if let Some(edge) = store.get_edge(edge_id) {
26 if let Some(value) = edge.get_property(prop_name) {
27 return match value {
28 Value::Int64(i) => *i as f64,
29 Value::Float64(f) => *f,
30 _ => 1.0,
31 };
32 }
33 }
34 }
35 1.0
36}
37
38fn extract_cost(store: &LpgStore, edge_id: EdgeId, cost_prop: Option<&str>) -> f64 {
40 if let Some(prop_name) = cost_prop {
41 if let Some(edge) = store.get_edge(edge_id) {
42 if let Some(value) = edge.get_property(prop_name) {
43 return match value {
44 Value::Int64(i) => *i as f64,
45 Value::Float64(f) => *f,
46 _ => 0.0,
47 };
48 }
49 }
50 }
51 0.0
52}
53
54#[derive(Debug, Clone)]
60pub struct MaxFlowResult {
61 pub max_flow: f64,
63 pub flow_edges: Vec<(NodeId, NodeId, f64)>,
65}
66
67pub fn max_flow(
91 store: &LpgStore,
92 source: NodeId,
93 sink: NodeId,
94 capacity_property: Option<&str>,
95) -> Option<MaxFlowResult> {
96 if store.get_node(source).is_none() || store.get_node(sink).is_none() {
98 return None;
99 }
100
101 if source == sink {
102 return Some(MaxFlowResult {
103 max_flow: 0.0,
104 flow_edges: Vec::new(),
105 });
106 }
107
108 let nodes = store.node_ids();
109 let n = nodes.len();
110
111 let mut node_to_idx: FxHashMap<NodeId, usize> = FxHashMap::default();
113 let mut idx_to_node: Vec<NodeId> = Vec::with_capacity(n);
114 for (idx, &node) in nodes.iter().enumerate() {
115 node_to_idx.insert(node, idx);
116 idx_to_node.push(node);
117 }
118
119 let source_idx = *node_to_idx.get(&source)?;
120 let sink_idx = *node_to_idx.get(&sink)?;
121
122 let mut capacity: Vec<FxHashMap<usize, f64>> = vec![FxHashMap::default(); n];
125 let mut edge_map: FxHashMap<(usize, usize), EdgeId> = FxHashMap::default();
126
127 for &node in &nodes {
128 let i = *node_to_idx.get(&node).unwrap();
129 for (neighbor, edge_id) in store.edges_from(node, Direction::Outgoing) {
130 if let Some(&j) = node_to_idx.get(&neighbor) {
131 let cap = extract_capacity(store, edge_id, capacity_property);
132 *capacity[i].entry(j).or_insert(0.0) += cap;
133 edge_map.insert((i, j), edge_id);
134 }
135 }
136 }
137
138 let mut residual: Vec<FxHashMap<usize, f64>> = capacity.clone();
140
141 for i in 0..n {
143 let neighbors: Vec<usize> = residual[i].keys().copied().collect();
144 for j in neighbors {
145 residual[j].entry(i).or_insert(0.0);
146 }
147 }
148
149 let mut total_flow = 0.0;
150
151 loop {
153 let mut parent: Vec<Option<usize>> = vec![None; n];
154 let mut visited = vec![false; n];
155 let mut queue: VecDeque<usize> = VecDeque::new();
156
157 visited[source_idx] = true;
158 queue.push_back(source_idx);
159
160 while let Some(u) = queue.pop_front() {
161 if u == sink_idx {
162 break;
163 }
164
165 for (&v, &cap) in &residual[u] {
166 if !visited[v] && cap > 1e-9 {
167 visited[v] = true;
168 parent[v] = Some(u);
169 queue.push_back(v);
170 }
171 }
172 }
173
174 if !visited[sink_idx] {
176 break;
177 }
178
179 let mut path_flow = f64::INFINITY;
181 let mut v = sink_idx;
182 while let Some(u) = parent[v] {
183 let cap = *residual[u].get(&v).unwrap_or(&0.0);
184 path_flow = path_flow.min(cap);
185 v = u;
186 }
187
188 v = sink_idx;
190 while let Some(u) = parent[v] {
191 *residual[u].entry(v).or_insert(0.0) -= path_flow;
192 *residual[v].entry(u).or_insert(0.0) += path_flow;
193 v = u;
194 }
195
196 total_flow += path_flow;
197 }
198
199 let mut flow_edges: Vec<(NodeId, NodeId, f64)> = Vec::new();
201 for i in 0..n {
202 for (&j, &original_cap) in &capacity[i] {
203 let residual_cap = *residual[i].get(&j).unwrap_or(&0.0);
204 let flow = original_cap - residual_cap;
205 if flow > 1e-9 {
206 flow_edges.push((idx_to_node[i], idx_to_node[j], flow));
207 }
208 }
209 }
210
211 Some(MaxFlowResult {
212 max_flow: total_flow,
213 flow_edges,
214 })
215}
216
217#[derive(Debug, Clone)]
223pub struct MinCostFlowResult {
224 pub max_flow: f64,
226 pub total_cost: f64,
228 pub flow_edges: Vec<(NodeId, NodeId, f64, f64)>,
230}
231
232pub fn min_cost_max_flow(
257 store: &LpgStore,
258 source: NodeId,
259 sink: NodeId,
260 capacity_property: Option<&str>,
261 cost_property: Option<&str>,
262) -> Option<MinCostFlowResult> {
263 if store.get_node(source).is_none() || store.get_node(sink).is_none() {
265 return None;
266 }
267
268 if source == sink {
269 return Some(MinCostFlowResult {
270 max_flow: 0.0,
271 total_cost: 0.0,
272 flow_edges: Vec::new(),
273 });
274 }
275
276 let nodes = store.node_ids();
277 let n = nodes.len();
278
279 let mut node_to_idx: FxHashMap<NodeId, usize> = FxHashMap::default();
281 let mut idx_to_node: Vec<NodeId> = Vec::with_capacity(n);
282 for (idx, &node) in nodes.iter().enumerate() {
283 node_to_idx.insert(node, idx);
284 idx_to_node.push(node);
285 }
286
287 let source_idx = *node_to_idx.get(&source)?;
288 let sink_idx = *node_to_idx.get(&sink)?;
289
290 let mut capacity: Vec<FxHashMap<usize, f64>> = vec![FxHashMap::default(); n];
292 let mut cost: Vec<FxHashMap<usize, f64>> = vec![FxHashMap::default(); n];
293
294 for &node in &nodes {
295 let i = *node_to_idx.get(&node).unwrap();
296 for (neighbor, edge_id) in store.edges_from(node, Direction::Outgoing) {
297 if let Some(&j) = node_to_idx.get(&neighbor) {
298 let cap = extract_capacity(store, edge_id, capacity_property);
299 let edge_cost = extract_cost(store, edge_id, cost_property);
300 *capacity[i].entry(j).or_insert(0.0) += cap;
301 *cost[i].entry(j).or_insert(0.0) = edge_cost;
302 }
303 }
304 }
305
306 let mut residual: Vec<FxHashMap<usize, f64>> = capacity.clone();
308 let mut residual_cost: Vec<FxHashMap<usize, f64>> = cost.clone();
309
310 for i in 0..n {
312 let neighbors: Vec<usize> = residual[i].keys().copied().collect();
313 for j in neighbors {
314 residual[j].entry(i).or_insert(0.0);
315 let c = *cost[i].get(&j).unwrap_or(&0.0);
316 residual_cost[j].entry(i).or_insert(-c);
317 }
318 }
319
320 let mut total_flow = 0.0;
321 let mut total_cost_val = 0.0;
322
323 loop {
325 let mut dist = vec![f64::INFINITY; n];
327 let mut parent: Vec<Option<usize>> = vec![None; n];
328 dist[source_idx] = 0.0;
329
330 for _ in 0..n {
331 let mut changed = false;
332 for u in 0..n {
333 if dist[u] == f64::INFINITY {
334 continue;
335 }
336 for (&v, &cap) in &residual[u] {
337 if cap > 1e-9 {
338 let c = *residual_cost[u].get(&v).unwrap_or(&0.0);
339 if dist[u] + c < dist[v] {
340 dist[v] = dist[u] + c;
341 parent[v] = Some(u);
342 changed = true;
343 }
344 }
345 }
346 }
347 if !changed {
348 break;
349 }
350 }
351
352 if dist[sink_idx] == f64::INFINITY {
354 break;
355 }
356
357 let mut path_flow = f64::INFINITY;
359 let mut v = sink_idx;
360 while let Some(u) = parent[v] {
361 let cap = *residual[u].get(&v).unwrap_or(&0.0);
362 path_flow = path_flow.min(cap);
363 v = u;
364 }
365
366 let path_cost = dist[sink_idx];
368 total_flow += path_flow;
369 total_cost_val += path_flow * path_cost;
370
371 v = sink_idx;
373 while let Some(u) = parent[v] {
374 *residual[u].entry(v).or_insert(0.0) -= path_flow;
375 *residual[v].entry(u).or_insert(0.0) += path_flow;
376 v = u;
377 }
378 }
379
380 let mut flow_edges: Vec<(NodeId, NodeId, f64, f64)> = Vec::new();
382 for i in 0..n {
383 for (&j, &original_cap) in &capacity[i] {
384 let residual_cap = *residual[i].get(&j).unwrap_or(&0.0);
385 let flow = original_cap - residual_cap;
386 if flow > 1e-9 {
387 let edge_cost = *cost[i].get(&j).unwrap_or(&0.0);
388 flow_edges.push((idx_to_node[i], idx_to_node[j], flow, edge_cost));
389 }
390 }
391 }
392
393 Some(MinCostFlowResult {
394 max_flow: total_flow,
395 total_cost: total_cost_val,
396 flow_edges,
397 })
398}
399
400static MAX_FLOW_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
406
407fn max_flow_params() -> &'static [ParameterDef] {
408 MAX_FLOW_PARAMS.get_or_init(|| {
409 vec![
410 ParameterDef {
411 name: "source".to_string(),
412 description: "Source node ID".to_string(),
413 param_type: ParameterType::NodeId,
414 required: true,
415 default: None,
416 },
417 ParameterDef {
418 name: "sink".to_string(),
419 description: "Sink node ID".to_string(),
420 param_type: ParameterType::NodeId,
421 required: true,
422 default: None,
423 },
424 ParameterDef {
425 name: "capacity".to_string(),
426 description: "Edge property name for capacities (default: 1.0)".to_string(),
427 param_type: ParameterType::String,
428 required: false,
429 default: None,
430 },
431 ]
432 })
433}
434
435pub struct MaxFlowAlgorithm;
437
438impl GraphAlgorithm for MaxFlowAlgorithm {
439 fn name(&self) -> &str {
440 "max_flow"
441 }
442
443 fn description(&self) -> &str {
444 "Maximum flow using Edmonds-Karp algorithm"
445 }
446
447 fn parameters(&self) -> &[ParameterDef] {
448 max_flow_params()
449 }
450
451 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
452 let source_id = params
453 .get_int("source")
454 .ok_or_else(|| Error::InvalidValue("source parameter required".to_string()))?;
455 let sink_id = params
456 .get_int("sink")
457 .ok_or_else(|| Error::InvalidValue("sink parameter required".to_string()))?;
458
459 let source = NodeId::new(source_id as u64);
460 let sink = NodeId::new(sink_id as u64);
461 let capacity_prop = params.get_string("capacity");
462
463 let result = max_flow(store, source, sink, capacity_prop)
464 .ok_or_else(|| Error::InvalidValue("Invalid source or sink node".to_string()))?;
465
466 let mut output = AlgorithmResult::new(vec![
467 "source".to_string(),
468 "target".to_string(),
469 "flow".to_string(),
470 "max_flow".to_string(),
471 ]);
472
473 for (src, dst, flow) in result.flow_edges {
474 output.add_row(vec![
475 Value::Int64(src.0 as i64),
476 Value::Int64(dst.0 as i64),
477 Value::Float64(flow),
478 Value::Float64(result.max_flow),
479 ]);
480 }
481
482 if output.row_count() == 0 {
484 output.add_row(vec![
485 Value::Int64(source_id),
486 Value::Int64(sink_id),
487 Value::Float64(0.0),
488 Value::Float64(result.max_flow),
489 ]);
490 }
491
492 Ok(output)
493 }
494}
495
496static MIN_COST_FLOW_PARAMS: OnceLock<Vec<ParameterDef>> = OnceLock::new();
498
499fn min_cost_flow_params() -> &'static [ParameterDef] {
500 MIN_COST_FLOW_PARAMS.get_or_init(|| {
501 vec![
502 ParameterDef {
503 name: "source".to_string(),
504 description: "Source node ID".to_string(),
505 param_type: ParameterType::NodeId,
506 required: true,
507 default: None,
508 },
509 ParameterDef {
510 name: "sink".to_string(),
511 description: "Sink node ID".to_string(),
512 param_type: ParameterType::NodeId,
513 required: true,
514 default: None,
515 },
516 ParameterDef {
517 name: "capacity".to_string(),
518 description: "Edge property name for capacities (default: 1.0)".to_string(),
519 param_type: ParameterType::String,
520 required: false,
521 default: None,
522 },
523 ParameterDef {
524 name: "cost".to_string(),
525 description: "Edge property name for costs (default: 0.0)".to_string(),
526 param_type: ParameterType::String,
527 required: false,
528 default: None,
529 },
530 ]
531 })
532}
533
534pub struct MinCostFlowAlgorithm;
536
537impl GraphAlgorithm for MinCostFlowAlgorithm {
538 fn name(&self) -> &str {
539 "min_cost_max_flow"
540 }
541
542 fn description(&self) -> &str {
543 "Minimum cost maximum flow using successive shortest paths"
544 }
545
546 fn parameters(&self) -> &[ParameterDef] {
547 min_cost_flow_params()
548 }
549
550 fn execute(&self, store: &LpgStore, params: &Parameters) -> Result<AlgorithmResult> {
551 let source_id = params
552 .get_int("source")
553 .ok_or_else(|| Error::InvalidValue("source parameter required".to_string()))?;
554 let sink_id = params
555 .get_int("sink")
556 .ok_or_else(|| Error::InvalidValue("sink parameter required".to_string()))?;
557
558 let source = NodeId::new(source_id as u64);
559 let sink = NodeId::new(sink_id as u64);
560 let capacity_prop = params.get_string("capacity");
561 let cost_prop = params.get_string("cost");
562
563 let result = min_cost_max_flow(store, source, sink, capacity_prop, cost_prop)
564 .ok_or_else(|| Error::InvalidValue("Invalid source or sink node".to_string()))?;
565
566 let mut output = AlgorithmResult::new(vec![
567 "source".to_string(),
568 "target".to_string(),
569 "flow".to_string(),
570 "cost".to_string(),
571 "max_flow".to_string(),
572 "total_cost".to_string(),
573 ]);
574
575 for (src, dst, flow, cost) in result.flow_edges {
576 output.add_row(vec![
577 Value::Int64(src.0 as i64),
578 Value::Int64(dst.0 as i64),
579 Value::Float64(flow),
580 Value::Float64(cost),
581 Value::Float64(result.max_flow),
582 Value::Float64(result.total_cost),
583 ]);
584 }
585
586 Ok(output)
587 }
588}
589
590#[cfg(test)]
595mod tests {
596 use super::*;
597
598 fn create_simple_flow_graph() -> LpgStore {
599 let store = LpgStore::new();
607
608 let n0 = store.create_node(&["Node"]); let n1 = store.create_node(&["Node"]);
610 let n2 = store.create_node(&["Node"]);
611 let n3 = store.create_node(&["Node"]); store.create_edge_with_props(n0, n1, "EDGE", [("capacity", Value::Float64(5.0))]);
614 store.create_edge_with_props(n0, n2, "EDGE", [("capacity", Value::Float64(3.0))]);
615 store.create_edge_with_props(n1, n3, "EDGE", [("capacity", Value::Float64(3.0))]);
616 store.create_edge_with_props(n1, n2, "EDGE", [("capacity", Value::Float64(2.0))]);
617 store.create_edge_with_props(n2, n3, "EDGE", [("capacity", Value::Float64(4.0))]);
618
619 store
620 }
621
622 fn create_cost_flow_graph() -> LpgStore {
623 let store = LpgStore::new();
628
629 let n0 = store.create_node(&["Node"]);
630 let n1 = store.create_node(&["Node"]);
631 let n2 = store.create_node(&["Node"]);
632
633 store.create_edge_with_props(
634 n0,
635 n1,
636 "EDGE",
637 [
638 ("capacity", Value::Float64(3.0)),
639 ("cost", Value::Float64(1.0)),
640 ],
641 );
642 store.create_edge_with_props(
643 n1,
644 n2,
645 "EDGE",
646 [
647 ("capacity", Value::Float64(2.0)),
648 ("cost", Value::Float64(2.0)),
649 ],
650 );
651 store.create_edge_with_props(
652 n0,
653 n2,
654 "EDGE",
655 [
656 ("capacity", Value::Float64(2.0)),
657 ("cost", Value::Float64(5.0)),
658 ],
659 );
660
661 store
662 }
663
664 #[test]
665 fn test_max_flow_basic() {
666 let store = create_simple_flow_graph();
667 let result = max_flow(&store, NodeId::new(0), NodeId::new(3), Some("capacity"));
668
669 assert!(result.is_some());
670 let result = result.unwrap();
671
672 assert!(result.max_flow >= 5.0 && result.max_flow <= 8.0);
678 }
679
680 #[test]
681 fn test_max_flow_same_source_sink() {
682 let store = create_simple_flow_graph();
683 let result = max_flow(&store, NodeId::new(0), NodeId::new(0), Some("capacity"));
684
685 assert!(result.is_some());
686 assert_eq!(result.unwrap().max_flow, 0.0);
687 }
688
689 #[test]
690 fn test_max_flow_no_path() {
691 let store = LpgStore::new();
692 let n0 = store.create_node(&["Node"]);
693 let _n1 = store.create_node(&["Node"]); let result = max_flow(&store, n0, NodeId::new(1), None);
696 assert!(result.is_some());
697 assert_eq!(result.unwrap().max_flow, 0.0);
698 }
699
700 #[test]
701 fn test_max_flow_invalid_nodes() {
702 let store = LpgStore::new();
703 store.create_node(&["Node"]);
704
705 let result = max_flow(&store, NodeId::new(999), NodeId::new(0), None);
706 assert!(result.is_none());
707 }
708
709 #[test]
710 fn test_min_cost_flow_basic() {
711 let store = create_cost_flow_graph();
712 let result = min_cost_max_flow(
713 &store,
714 NodeId::new(0),
715 NodeId::new(2),
716 Some("capacity"),
717 Some("cost"),
718 );
719
720 assert!(result.is_some());
721 let result = result.unwrap();
722
723 assert!(result.max_flow > 0.0);
725
726 assert!(result.total_cost >= 0.0);
728 }
729
730 #[test]
731 fn test_min_cost_prefers_cheaper_path() {
732 let store = create_cost_flow_graph();
733 let result = min_cost_max_flow(
734 &store,
735 NodeId::new(0),
736 NodeId::new(2),
737 Some("capacity"),
738 Some("cost"),
739 );
740
741 assert!(result.is_some());
742 let result = result.unwrap();
743
744 assert!(result.max_flow >= 2.0);
749 }
750
751 #[test]
752 fn test_min_cost_flow_no_path() {
753 let store = LpgStore::new();
754 let n0 = store.create_node(&["Node"]);
755 let _n1 = store.create_node(&["Node"]);
756
757 let result = min_cost_max_flow(&store, n0, NodeId::new(1), None, None);
758 assert!(result.is_some());
759 assert_eq!(result.unwrap().max_flow, 0.0);
760 }
761
762 #[test]
763 fn test_max_flow_default_capacity() {
764 let store = LpgStore::new();
766 let n0 = store.create_node(&["Node"]);
767 let n1 = store.create_node(&["Node"]);
768 let n2 = store.create_node(&["Node"]);
769
770 store.create_edge(n0, n1, "EDGE");
771 store.create_edge(n0, n2, "EDGE");
772 store.create_edge(n1, n2, "EDGE");
773
774 let result = max_flow(&store, n0, n2, None);
775 assert!(result.is_some());
776
777 assert!(result.unwrap().max_flow >= 1.0);
779 }
780}