Skip to main content

converge_optimization/suggestors/
flow_optimization.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Min-cost flow optimisation over a supply/demand network.
5//!
6//! Reads a [`FlowRequest`] from context — a directed graph with edge
7//! capacities and costs, a source, a sink, and a required flow demand —
8//! and proposes a [`FlowPlan`] with the cheapest routing of that flow.
9//!
10//! # Formation role
11//!
12//! Resource allocation suggestors (budget, capacity, workforce) produce demand
13//! signals that are reflected in the flow request. The network models supply
14//! chains, distribution networks, or internal resource routing. When any
15//! upstream signal changes, a new request is seeded and the formation
16//! re-converges on the updated routing.
17
18use async_trait::async_trait;
19use converge_pack::{AgentEffect, Context, ContextKey, ProposedFact, Suggestor};
20use serde::{Deserialize, Serialize};
21
22use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
23
24// ── Request ───────────────────────────────────────────────────────────────────
25
26/// Seed under [`ContextKey::Seeds`] with id prefix `"flow-request:"`.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct FlowRequest {
29    pub id: String,
30    pub num_nodes: usize,
31    pub edges: Vec<FlowEdgeSpec>,
32    pub source: usize,
33    pub sink: usize,
34    /// Required flow volume from source to sink.
35    pub demand: i64,
36}
37
38/// One directed edge in the flow network.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct FlowEdgeSpec {
41    pub from: usize,
42    pub to: usize,
43    pub capacity: i64,
44    /// Per-unit routing cost.
45    pub cost: i64,
46    /// Optional human-readable label for the edge (supply route, link, etc.).
47    pub label: Option<String>,
48}
49
50// ── Plan (output) ─────────────────────────────────────────────────────────────
51
52/// The min-cost flow routing.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FlowPlan {
55    pub request_id: String,
56    pub total_flow: i64,
57    pub total_cost: i64,
58    /// Flow on each edge, in the same order as the request's `edges` vec.
59    pub edge_flows: Vec<i64>,
60    /// `total_flow / demand` — 1.0 when demand is fully satisfied.
61    pub fulfillment: f64,
62    pub feasible: bool,
63}
64
65// ── Suggestor ─────────────────────────────────────────────────────────────────
66
67const REQUEST_PREFIX: &str = "flow-request:";
68const PLAN_PREFIX: &str = "flow-plan:";
69const ERROR_PREFIX: &str = "flow-request-error:";
70
71/// Routes flow through a directed network at minimum cost (Successive Shortest
72/// Paths with Bellman-Ford).
73pub struct FlowOptimizationSuggestor;
74
75#[async_trait]
76impl Suggestor for FlowOptimizationSuggestor {
77    fn name(&self) -> &str {
78        "FlowOptimizationSuggestor"
79    }
80
81    fn dependencies(&self) -> &[ContextKey] {
82        &[ContextKey::Seeds]
83    }
84
85    fn complexity_hint(&self) -> Option<&'static str> {
86        Some(
87            "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
88        )
89    }
90
91    fn accepts(&self, ctx: &dyn Context) -> bool {
92        ctx.get(ContextKey::Seeds).iter().any(|f| {
93            f.id.starts_with(REQUEST_PREFIX)
94                && match serde_json::from_str::<FlowRequest>(&f.content) {
95                    Ok(_) => !plan_exists(ctx, req_id(&f.id)),
96                    Err(_) => !error_exists(ctx, &f.id),
97                }
98        })
99    }
100
101    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
102        let mut proposals = Vec::new();
103
104        for fact in ctx
105            .get(ContextKey::Seeds)
106            .iter()
107            .filter(|f| f.id.starts_with(REQUEST_PREFIX))
108        {
109            match serde_json::from_str::<FlowRequest>(&fact.content) {
110                Ok(req) => {
111                    if plan_exists(ctx, req_id(&fact.id)) {
112                        continue;
113                    }
114                    let plan = solve(&req);
115                    let confidence = plan.fulfillment;
116                    proposals.push(
117                        ProposedFact::new(
118                            ContextKey::Strategies,
119                            format!("{}{}", PLAN_PREFIX, plan.request_id),
120                            serde_json::to_string(&plan).unwrap_or_default(),
121                            self.name(),
122                        )
123                        .with_confidence(confidence),
124                    );
125                }
126                Err(e) => {
127                    if error_exists(ctx, &fact.id) {
128                        continue;
129                    }
130                    let diag = serde_json::json!({
131                        "request_fact_id": fact.id,
132                        "message": "malformed flow request",
133                        "error": e.to_string(),
134                    });
135                    proposals.push(
136                        ProposedFact::new(
137                            ContextKey::Diagnostic,
138                            format!("{}{}", ERROR_PREFIX, fact.id),
139                            diag.to_string(),
140                            self.name(),
141                        )
142                        .with_confidence(1.0),
143                    );
144                }
145            }
146        }
147
148        if proposals.is_empty() {
149            AgentEffect::empty()
150        } else {
151            AgentEffect::with_proposals(proposals)
152        }
153    }
154}
155
156// ── Core logic ────────────────────────────────────────────────────────────────
157
158fn solve(req: &FlowRequest) -> FlowPlan {
159    if req.edges.is_empty() || req.demand == 0 {
160        return FlowPlan {
161            request_id: req.id.clone(),
162            total_flow: 0,
163            total_cost: 0,
164            edge_flows: vec![],
165            fulfillment: 1.0,
166            feasible: true,
167        };
168    }
169
170    let mut net = FlowNetwork::new(req.num_nodes);
171    for edge in &req.edges {
172        net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
173    }
174
175    let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
176        return FlowPlan {
177            request_id: req.id.clone(),
178            total_flow: 0,
179            total_cost: 0,
180            edge_flows: vec![0; req.edges.len()],
181            fulfillment: 0.0,
182            feasible: false,
183        };
184    };
185
186    match min_cost_flow(&problem) {
187        Ok(sol) => {
188            let fulfillment = if req.demand > 0 {
189                (sol.flow as f64 / req.demand as f64).min(1.0)
190            } else {
191                1.0
192            };
193            FlowPlan {
194                request_id: req.id.clone(),
195                total_flow: sol.flow,
196                total_cost: sol.cost,
197                edge_flows: sol.edge_flows,
198                fulfillment,
199                feasible: true,
200            }
201        }
202        Err(_) => FlowPlan {
203            request_id: req.id.clone(),
204            total_flow: 0,
205            total_cost: 0,
206            edge_flows: vec![0; req.edges.len()],
207            fulfillment: 0.0,
208            feasible: false,
209        },
210    }
211}
212
213// ── Helpers ───────────────────────────────────────────────────────────────────
214
215fn req_id(fact_id: &str) -> &str {
216    fact_id.trim_start_matches(REQUEST_PREFIX)
217}
218
219fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
220    let id = format!("{}{}", PLAN_PREFIX, request_id);
221    ctx.get(ContextKey::Strategies).iter().any(|f| f.id == id)
222}
223
224fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
225    let id = format!("{}{}", ERROR_PREFIX, fact_id);
226    ctx.get(ContextKey::Diagnostic).iter().any(|f| f.id == id)
227}
228
229// ── Tests ─────────────────────────────────────────────────────────────────────
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use converge_core::{ContextState, Engine};
235
236    fn two_path_request(demand: i64) -> String {
237        // Cheap path (cost=2/unit, cap=3): s→a→t
238        // Expensive path (cost=10/unit, cap=3): s→b→t
239        serde_json::to_string(&FlowRequest {
240            id: "r1".into(),
241            num_nodes: 4,
242            edges: vec![
243                FlowEdgeSpec {
244                    from: 0,
245                    to: 1,
246                    capacity: 3,
247                    cost: 1,
248                    label: Some("s→a".into()),
249                },
250                FlowEdgeSpec {
251                    from: 1,
252                    to: 3,
253                    capacity: 3,
254                    cost: 1,
255                    label: Some("a→t".into()),
256                },
257                FlowEdgeSpec {
258                    from: 0,
259                    to: 2,
260                    capacity: 3,
261                    cost: 5,
262                    label: Some("s→b".into()),
263                },
264                FlowEdgeSpec {
265                    from: 2,
266                    to: 3,
267                    capacity: 3,
268                    cost: 5,
269                    label: Some("b→t".into()),
270                },
271            ],
272            source: 0,
273            sink: 3,
274            demand,
275        })
276        .unwrap()
277    }
278
279    #[tokio::test]
280    async fn cheap_path_used_first() {
281        // demand=3: all flow via cheap path, cost = 3×2 = 6
282        let mut engine = Engine::new();
283        engine.register_suggestor(FlowOptimizationSuggestor);
284
285        let mut ctx = ContextState::new();
286        ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(3))
287            .unwrap();
288
289        let result = engine.run(ctx).await.unwrap();
290        let facts = result.context.get(ContextKey::Strategies);
291        assert_eq!(facts.len(), 1);
292        let plan: FlowPlan = serde_json::from_str(&facts[0].content).unwrap();
293        assert_eq!(plan.total_flow, 3);
294        assert_eq!(plan.total_cost, 6);
295        assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
296    }
297
298    #[tokio::test]
299    async fn overflow_uses_expensive_path() {
300        // demand=4: 3 cheap (cost 6) + 1 expensive (cost 10) = 16
301        let mut engine = Engine::new();
302        engine.register_suggestor(FlowOptimizationSuggestor);
303
304        let mut ctx = ContextState::new();
305        ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(4))
306            .unwrap();
307
308        let result = engine.run(ctx).await.unwrap();
309        let plan: FlowPlan =
310            serde_json::from_str(&result.context.get(ContextKey::Strategies)[0].content).unwrap();
311        assert_eq!(plan.total_flow, 4);
312        assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
313    }
314
315    #[tokio::test]
316    async fn result_is_idempotent() {
317        let mut engine = Engine::new();
318        engine.register_suggestor(FlowOptimizationSuggestor);
319
320        let mut ctx = ContextState::new();
321        ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(3))
322            .unwrap();
323
324        let first = engine.run(ctx).await.unwrap();
325        let mut engine2 = Engine::new();
326        engine2.register_suggestor(FlowOptimizationSuggestor);
327        let second = engine2.run(first.context.clone()).await.unwrap();
328        assert_eq!(
329            second.context.get(ContextKey::Strategies).len(),
330            first.context.get(ContextKey::Strategies).len(),
331        );
332    }
333
334    #[tokio::test]
335    async fn malformed_request_emits_diagnostic() {
336        let mut engine = Engine::new();
337        engine.register_suggestor(FlowOptimizationSuggestor);
338
339        let mut ctx = ContextState::new();
340        ctx.add_input(ContextKey::Seeds, "flow-request:bad", "not-json")
341            .unwrap();
342
343        let result = engine.run(ctx).await.unwrap();
344        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
345        assert!(!result.context.has(ContextKey::Strategies));
346    }
347}