Skip to main content

converge_optimization/suggestors/
flow_optimization.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Min-cost flow optimisation over a supply/demand network.
5//!
6//! Reads a [`FlowRequest`] from context — a directed graph with edge
7//! capacities and costs, a source, a sink, and a required flow demand —
8//! and proposes a [`FlowPlan`] with the cheapest routing of that flow.
9//!
10//! # Formation role
11//!
12//! Resource allocation suggestors (budget, capacity, workforce) produce demand
13//! signals that are reflected in the flow request. The network models supply
14//! chains, distribution networks, or internal resource routing. When any
15//! upstream signal changes, a new request is seeded and the formation
16//! re-converges on the updated routing.
17
18use async_trait::async_trait;
19use converge_pack::ProvenanceSource;
20use converge_pack::{
21    AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
22};
23use serde::{Deserialize, Serialize};
24
25use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
26
27// ── Request ───────────────────────────────────────────────────────────────────
28
29/// Seed under [`ContextKey::Seeds`] with id prefix `"flow-request:"`.
30#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
31#[serde(deny_unknown_fields)]
32pub struct FlowRequest {
33    pub id: String,
34    pub num_nodes: usize,
35    pub edges: Vec<FlowEdgeSpec>,
36    pub source: usize,
37    pub sink: usize,
38    /// Required flow volume from source to sink.
39    pub demand: i64,
40}
41
42impl FactPayload for FlowRequest {
43    const FAMILY: &'static str = "converge.optimization.flow.request";
44    const VERSION: u16 = 1;
45}
46
47/// One directed edge in the flow network.
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
49#[serde(deny_unknown_fields)]
50pub struct FlowEdgeSpec {
51    pub from: usize,
52    pub to: usize,
53    pub capacity: i64,
54    /// Per-unit routing cost.
55    pub cost: i64,
56    /// Optional human-readable label for the edge (supply route, link, etc.).
57    pub label: Option<String>,
58}
59
60// ── Plan (output) ─────────────────────────────────────────────────────────────
61
62/// The min-cost flow routing.
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
64#[serde(deny_unknown_fields)]
65pub struct FlowPlan {
66    pub request_id: String,
67    pub total_flow: i64,
68    pub total_cost: i64,
69    /// Flow on each edge, in the same order as the request's `edges` vec.
70    pub edge_flows: Vec<i64>,
71    /// `total_flow / demand` — 1.0 when demand is fully satisfied.
72    pub fulfillment: f64,
73    pub feasible: bool,
74}
75
76impl FactPayload for FlowPlan {
77    const FAMILY: &'static str = "converge.optimization.flow.plan";
78    const VERSION: u16 = 1;
79}
80
81// ── Suggestor ─────────────────────────────────────────────────────────────────
82
83const REQUEST_PREFIX: &str = "flow-request:";
84const PLAN_PREFIX: &str = "flow-plan:";
85const ERROR_PREFIX: &str = "flow-request-error:";
86
87/// Routes flow through a directed network at minimum cost (Successive Shortest
88/// Paths with Bellman-Ford).
89pub struct FlowOptimizationSuggestor;
90
91#[async_trait]
92impl Suggestor for FlowOptimizationSuggestor {
93    fn name(&self) -> &str {
94        "FlowOptimizationSuggestor"
95    }
96
97    fn dependencies(&self) -> &[ContextKey] {
98        &[ContextKey::Seeds]
99    }
100
101    fn complexity_hint(&self) -> Option<&'static str> {
102        Some(
103            "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
104        )
105    }
106
107    fn accepts(&self, ctx: &dyn Context) -> bool {
108        ctx.get(ContextKey::Seeds).iter().any(|f| {
109            f.id().as_str().starts_with(REQUEST_PREFIX)
110                && match f.payload::<FlowRequest>() {
111                    Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
112                    None => !error_exists(ctx, f.id().as_str()),
113                }
114        })
115    }
116
117    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
118        let mut proposals = Vec::new();
119
120        for fact in ctx
121            .get(ContextKey::Seeds)
122            .iter()
123            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
124        {
125            match fact.payload::<FlowRequest>() {
126                Some(req) => {
127                    if plan_exists(ctx, req_id(fact.id().as_str())) {
128                        continue;
129                    }
130                    let plan = solve(req);
131                    let confidence = plan.fulfillment;
132                    proposals.push(
133                        ProposedFact::new(
134                            ContextKey::Strategies,
135                            format!("{}{}", PLAN_PREFIX, plan.request_id),
136                            plan.clone(),
137                            self.name().to_string(),
138                        )
139                        .with_confidence(confidence),
140                    );
141                }
142                None => {
143                    if error_exists(ctx, fact.id().as_str()) {
144                        continue;
145                    }
146                    proposals.push(
147                        ProposedFact::new(
148                            ContextKey::Diagnostic,
149                            format!("{}{}", ERROR_PREFIX, fact.id()),
150                            DiagnosticPayload::new(
151                                self.name(),
152                                format!(
153                                    "malformed flow request '{}': expected {} v{} payload",
154                                    fact.id(),
155                                    FlowRequest::FAMILY,
156                                    FlowRequest::VERSION
157                                ),
158                            ),
159                            self.name().to_string(),
160                        )
161                        .with_confidence(1.0),
162                    );
163                }
164            }
165        }
166
167        if proposals.is_empty() {
168            AgentEffect::empty()
169        } else {
170            AgentEffect::with_proposals(proposals)
171        }
172    }
173
174    fn provenance(&self) -> &'static str {
175        super::CONVERGE_OPTIMIZATION_PROVENANCE.as_str()
176    }
177}
178
179// ── Core logic ────────────────────────────────────────────────────────────────
180
181fn solve(req: &FlowRequest) -> FlowPlan {
182    if req.edges.is_empty() || req.demand == 0 {
183        return FlowPlan {
184            request_id: req.id.clone(),
185            total_flow: 0,
186            total_cost: 0,
187            edge_flows: vec![],
188            fulfillment: 1.0,
189            feasible: true,
190        };
191    }
192
193    let mut net = FlowNetwork::new(req.num_nodes);
194    for edge in &req.edges {
195        net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
196    }
197
198    let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
199        return FlowPlan {
200            request_id: req.id.clone(),
201            total_flow: 0,
202            total_cost: 0,
203            edge_flows: vec![0; req.edges.len()],
204            fulfillment: 0.0,
205            feasible: false,
206        };
207    };
208
209    match min_cost_flow(&problem) {
210        Ok(sol) => {
211            let fulfillment = if req.demand > 0 {
212                (sol.flow as f64 / req.demand as f64).min(1.0)
213            } else {
214                1.0
215            };
216            FlowPlan {
217                request_id: req.id.clone(),
218                total_flow: sol.flow,
219                total_cost: sol.cost,
220                edge_flows: sol.edge_flows,
221                fulfillment,
222                feasible: true,
223            }
224        }
225        Err(_) => FlowPlan {
226            request_id: req.id.clone(),
227            total_flow: 0,
228            total_cost: 0,
229            edge_flows: vec![0; req.edges.len()],
230            fulfillment: 0.0,
231            feasible: false,
232        },
233    }
234}
235
236// ── Helpers ───────────────────────────────────────────────────────────────────
237
238fn req_id(fact_id: &str) -> &str {
239    fact_id.trim_start_matches(REQUEST_PREFIX)
240}
241
242fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
243    let id = format!("{}{}", PLAN_PREFIX, request_id);
244    ctx.get(ContextKey::Strategies)
245        .iter()
246        .any(|f| f.id().as_str() == id)
247}
248
249fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
250    let id = format!("{}{}", ERROR_PREFIX, fact_id);
251    ctx.get(ContextKey::Diagnostic)
252        .iter()
253        .any(|f| f.id().as_str() == id)
254}
255
256// ── Tests ─────────────────────────────────────────────────────────────────────
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use converge_core::{ContextState, Engine};
262    use converge_pack::TextPayload;
263
264    fn two_path_request(demand: i64) -> FlowRequest {
265        // Cheap path (cost=2/unit, cap=3): s→a→t
266        // Expensive path (cost=10/unit, cap=3): s→b→t
267        FlowRequest {
268            id: "r1".into(),
269            num_nodes: 4,
270            edges: vec![
271                FlowEdgeSpec {
272                    from: 0,
273                    to: 1,
274                    capacity: 3,
275                    cost: 1,
276                    label: Some("s→a".into()),
277                },
278                FlowEdgeSpec {
279                    from: 1,
280                    to: 3,
281                    capacity: 3,
282                    cost: 1,
283                    label: Some("a→t".into()),
284                },
285                FlowEdgeSpec {
286                    from: 0,
287                    to: 2,
288                    capacity: 3,
289                    cost: 5,
290                    label: Some("s→b".into()),
291                },
292                FlowEdgeSpec {
293                    from: 2,
294                    to: 3,
295                    capacity: 3,
296                    cost: 5,
297                    label: Some("b→t".into()),
298                },
299            ],
300            source: 0,
301            sink: 3,
302            demand,
303        }
304    }
305
306    #[tokio::test]
307    async fn cheap_path_used_first() {
308        // demand=3: all flow via cheap path, cost = 3×2 = 6
309        let mut engine = Engine::new();
310        engine.register_suggestor(FlowOptimizationSuggestor);
311
312        let mut ctx = ContextState::new();
313        ctx.add_proposal(ProposedFact::new(
314            ContextKey::Seeds,
315            "flow-request:r1",
316            two_path_request(3),
317            "test",
318        ))
319        .unwrap();
320
321        let result = engine.run(ctx).await.unwrap();
322        let facts = result.context.get(ContextKey::Strategies);
323        assert_eq!(facts.len(), 1);
324        let plan = facts[0].require_payload::<FlowPlan>().unwrap();
325        assert_eq!(plan.total_flow, 3);
326        assert_eq!(plan.total_cost, 6);
327        assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
328    }
329
330    #[tokio::test]
331    async fn overflow_uses_expensive_path() {
332        // demand=4: 3 cheap (cost 6) + 1 expensive (cost 10) = 16
333        let mut engine = Engine::new();
334        engine.register_suggestor(FlowOptimizationSuggestor);
335
336        let mut ctx = ContextState::new();
337        ctx.add_proposal(ProposedFact::new(
338            ContextKey::Seeds,
339            "flow-request:r1",
340            two_path_request(4),
341            "test",
342        ))
343        .unwrap();
344
345        let result = engine.run(ctx).await.unwrap();
346        let plan = result.context.get(ContextKey::Strategies)[0]
347            .require_payload::<FlowPlan>()
348            .unwrap();
349        assert_eq!(plan.total_flow, 4);
350        assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
351    }
352
353    #[tokio::test]
354    async fn result_is_idempotent() {
355        let mut engine = Engine::new();
356        engine.register_suggestor(FlowOptimizationSuggestor);
357
358        let mut ctx = ContextState::new();
359        ctx.add_proposal(ProposedFact::new(
360            ContextKey::Seeds,
361            "flow-request:r1",
362            two_path_request(3),
363            "test",
364        ))
365        .unwrap();
366
367        let first = engine.run(ctx).await.unwrap();
368        let mut engine2 = Engine::new();
369        engine2.register_suggestor(FlowOptimizationSuggestor);
370        let second = engine2.run(first.context.clone()).await.unwrap();
371        assert_eq!(
372            second.context.get(ContextKey::Strategies).len(),
373            first.context.get(ContextKey::Strategies).len(),
374        );
375    }
376
377    #[tokio::test]
378    async fn malformed_request_emits_diagnostic() {
379        let mut engine = Engine::new();
380        engine.register_suggestor(FlowOptimizationSuggestor);
381
382        let mut ctx = ContextState::new();
383        ctx.add_proposal(ProposedFact::new(
384            ContextKey::Seeds,
385            "flow-request:bad",
386            TextPayload::new("not a flow request"),
387            "test",
388        ))
389        .unwrap();
390
391        let result = engine.run(ctx).await.unwrap();
392        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
393        assert!(!result.context.has(ContextKey::Strategies));
394    }
395}