Skip to main content

converge_optimization/suggestors/
flow_optimization.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Min-cost flow optimisation over a supply/demand network.
5//!
6//! Reads a [`FlowRequest`] from context — a directed graph with edge
7//! capacities and costs, a source, a sink, and a required flow demand —
8//! and proposes a [`FlowPlan`] with the cheapest routing of that flow.
9//!
10//! # Formation role
11//!
12//! Resource allocation suggestors (budget, capacity, workforce) produce demand
13//! signals that are reflected in the flow request. The network models supply
14//! chains, distribution networks, or internal resource routing. When any
15//! upstream signal changes, a new request is seeded and the formation
16//! re-converges on the updated routing.
17
18use async_trait::async_trait;
19use converge_pack::Provenance;
20use converge_pack::ProvenanceSource;
21use converge_pack::{
22    AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
23};
24use serde::{Deserialize, Serialize};
25
26use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
27
28// ── Request ───────────────────────────────────────────────────────────────────
29
30/// Seed under [`ContextKey::Seeds`] with id prefix `"flow-request:"`.
31#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(deny_unknown_fields)]
33pub struct FlowRequest {
34    pub id: String,
35    pub num_nodes: usize,
36    pub edges: Vec<FlowEdgeSpec>,
37    pub source: usize,
38    pub sink: usize,
39    /// Required flow volume from source to sink.
40    pub demand: i64,
41}
42
43impl FactPayload for FlowRequest {
44    const FAMILY: &'static str = "converge.optimization.flow.request";
45    const VERSION: u16 = 1;
46}
47
48/// One directed edge in the flow network.
49#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
50#[serde(deny_unknown_fields)]
51pub struct FlowEdgeSpec {
52    pub from: usize,
53    pub to: usize,
54    pub capacity: i64,
55    /// Per-unit routing cost.
56    pub cost: i64,
57    /// Optional human-readable label for the edge (supply route, link, etc.).
58    pub label: Option<String>,
59}
60
61// ── Plan (output) ─────────────────────────────────────────────────────────────
62
63/// The min-cost flow routing.
64#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
65#[serde(deny_unknown_fields)]
66pub struct FlowPlan {
67    pub request_id: String,
68    pub total_flow: i64,
69    pub total_cost: i64,
70    /// Flow on each edge, in the same order as the request's `edges` vec.
71    pub edge_flows: Vec<i64>,
72    /// `total_flow / demand` — 1.0 when demand is fully satisfied.
73    pub fulfillment: f64,
74    pub feasible: bool,
75}
76
77impl FactPayload for FlowPlan {
78    const FAMILY: &'static str = "converge.optimization.flow.plan";
79    const VERSION: u16 = 1;
80}
81
82// ── Suggestor ─────────────────────────────────────────────────────────────────
83
84const REQUEST_PREFIX: &str = "flow-request:";
85const PLAN_PREFIX: &str = "flow-plan:";
86const ERROR_PREFIX: &str = "flow-request-error:";
87
88/// Routes flow through a directed network at minimum cost (Successive Shortest
89/// Paths with Bellman-Ford).
90pub struct FlowOptimizationSuggestor;
91
92#[async_trait]
93impl Suggestor for FlowOptimizationSuggestor {
94    fn name(&self) -> &str {
95        "FlowOptimizationSuggestor"
96    }
97
98    fn dependencies(&self) -> &[ContextKey] {
99        &[ContextKey::Seeds]
100    }
101
102    fn complexity_hint(&self) -> Option<&'static str> {
103        Some(
104            "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
105        )
106    }
107
108    fn accepts(&self, ctx: &dyn Context) -> bool {
109        ctx.get(ContextKey::Seeds).iter().any(|f| {
110            f.id().as_str().starts_with(REQUEST_PREFIX)
111                && match f.payload::<FlowRequest>() {
112                    Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
113                    None => !error_exists(ctx, f.id().as_str()),
114                }
115        })
116    }
117
118    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
119        let mut proposals = Vec::new();
120
121        for fact in ctx
122            .get(ContextKey::Seeds)
123            .iter()
124            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
125        {
126            match fact.payload::<FlowRequest>() {
127                Some(req) => {
128                    if plan_exists(ctx, req_id(fact.id().as_str())) {
129                        continue;
130                    }
131                    let plan = solve(req);
132                    let confidence = plan.fulfillment;
133                    proposals.push(
134                        ProposedFact::new(
135                            ContextKey::Strategies,
136                            format!("{}{}", PLAN_PREFIX, plan.request_id),
137                            plan.clone(),
138                            self.provenance(),
139                        )
140                        .with_confidence(confidence),
141                    );
142                }
143                None => {
144                    if error_exists(ctx, fact.id().as_str()) {
145                        continue;
146                    }
147                    proposals.push(
148                        ProposedFact::new(
149                            ContextKey::Diagnostic,
150                            format!("{}{}", ERROR_PREFIX, fact.id()),
151                            DiagnosticPayload::new(
152                                self.name(),
153                                format!(
154                                    "malformed flow request '{}': expected {} v{} payload",
155                                    fact.id(),
156                                    FlowRequest::FAMILY,
157                                    FlowRequest::VERSION
158                                ),
159                            ),
160                            self.provenance(),
161                        )
162                        .with_confidence(1.0),
163                    );
164                }
165            }
166        }
167
168        if proposals.is_empty() {
169            AgentEffect::empty()
170        } else {
171            AgentEffect::with_proposals(proposals)
172        }
173    }
174
175    fn provenance(&self) -> Provenance {
176        crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE.provenance()
177    }
178}
179
180// ── Core logic ────────────────────────────────────────────────────────────────
181
182fn solve(req: &FlowRequest) -> FlowPlan {
183    if req.edges.is_empty() || req.demand == 0 {
184        return FlowPlan {
185            request_id: req.id.clone(),
186            total_flow: 0,
187            total_cost: 0,
188            edge_flows: vec![],
189            fulfillment: 1.0,
190            feasible: true,
191        };
192    }
193
194    let mut net = FlowNetwork::new(req.num_nodes);
195    for edge in &req.edges {
196        net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
197    }
198
199    let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
200        return FlowPlan {
201            request_id: req.id.clone(),
202            total_flow: 0,
203            total_cost: 0,
204            edge_flows: vec![0; req.edges.len()],
205            fulfillment: 0.0,
206            feasible: false,
207        };
208    };
209
210    match min_cost_flow(&problem) {
211        Ok(sol) => {
212            let fulfillment = if req.demand > 0 {
213                (sol.flow as f64 / req.demand as f64).min(1.0)
214            } else {
215                1.0
216            };
217            FlowPlan {
218                request_id: req.id.clone(),
219                total_flow: sol.flow,
220                total_cost: sol.cost,
221                edge_flows: sol.edge_flows,
222                fulfillment,
223                feasible: true,
224            }
225        }
226        Err(_) => FlowPlan {
227            request_id: req.id.clone(),
228            total_flow: 0,
229            total_cost: 0,
230            edge_flows: vec![0; req.edges.len()],
231            fulfillment: 0.0,
232            feasible: false,
233        },
234    }
235}
236
237// ── Helpers ───────────────────────────────────────────────────────────────────
238
239fn req_id(fact_id: &str) -> &str {
240    fact_id.trim_start_matches(REQUEST_PREFIX)
241}
242
243fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
244    let id = format!("{}{}", PLAN_PREFIX, request_id);
245    ctx.get(ContextKey::Strategies)
246        .iter()
247        .any(|f| f.id().as_str() == id)
248}
249
250fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
251    let id = format!("{}{}", ERROR_PREFIX, fact_id);
252    ctx.get(ContextKey::Diagnostic)
253        .iter()
254        .any(|f| f.id().as_str() == id)
255}
256
257// ── Tests ─────────────────────────────────────────────────────────────────────
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use converge_core::{ContextState, Engine};
263    use converge_pack::TextPayload;
264
265    fn two_path_request(demand: i64) -> FlowRequest {
266        // Cheap path (cost=2/unit, cap=3): s→a→t
267        // Expensive path (cost=10/unit, cap=3): s→b→t
268        FlowRequest {
269            id: "r1".into(),
270            num_nodes: 4,
271            edges: vec![
272                FlowEdgeSpec {
273                    from: 0,
274                    to: 1,
275                    capacity: 3,
276                    cost: 1,
277                    label: Some("s→a".into()),
278                },
279                FlowEdgeSpec {
280                    from: 1,
281                    to: 3,
282                    capacity: 3,
283                    cost: 1,
284                    label: Some("a→t".into()),
285                },
286                FlowEdgeSpec {
287                    from: 0,
288                    to: 2,
289                    capacity: 3,
290                    cost: 5,
291                    label: Some("s→b".into()),
292                },
293                FlowEdgeSpec {
294                    from: 2,
295                    to: 3,
296                    capacity: 3,
297                    cost: 5,
298                    label: Some("b→t".into()),
299                },
300            ],
301            source: 0,
302            sink: 3,
303            demand,
304        }
305    }
306
307    #[tokio::test]
308    async fn cheap_path_used_first() {
309        // demand=3: all flow via cheap path, cost = 3×2 = 6
310        let mut engine = Engine::new();
311        engine.register_suggestor(FlowOptimizationSuggestor);
312
313        let mut ctx = ContextState::new();
314        ctx.add_proposal(ProposedFact::new(
315            ContextKey::Seeds,
316            "flow-request:r1",
317            two_path_request(3),
318            converge_pack::ProvenanceSource::provenance(
319                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
320            ),
321        ))
322        .unwrap();
323
324        let result = engine.run(ctx).await.unwrap();
325        let facts = result.context.get(ContextKey::Strategies);
326        assert_eq!(facts.len(), 1);
327        let plan = facts[0].require_payload::<FlowPlan>().unwrap();
328        assert_eq!(plan.total_flow, 3);
329        assert_eq!(plan.total_cost, 6);
330        assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
331    }
332
333    #[tokio::test]
334    async fn overflow_uses_expensive_path() {
335        // demand=4: 3 cheap (cost 6) + 1 expensive (cost 10) = 16
336        let mut engine = Engine::new();
337        engine.register_suggestor(FlowOptimizationSuggestor);
338
339        let mut ctx = ContextState::new();
340        ctx.add_proposal(ProposedFact::new(
341            ContextKey::Seeds,
342            "flow-request:r1",
343            two_path_request(4),
344            converge_pack::ProvenanceSource::provenance(
345                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
346            ),
347        ))
348        .unwrap();
349
350        let result = engine.run(ctx).await.unwrap();
351        let plan = result.context.get(ContextKey::Strategies)[0]
352            .require_payload::<FlowPlan>()
353            .unwrap();
354        assert_eq!(plan.total_flow, 4);
355        assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
356    }
357
358    #[tokio::test]
359    async fn result_is_idempotent() {
360        let mut engine = Engine::new();
361        engine.register_suggestor(FlowOptimizationSuggestor);
362
363        let mut ctx = ContextState::new();
364        ctx.add_proposal(ProposedFact::new(
365            ContextKey::Seeds,
366            "flow-request:r1",
367            two_path_request(3),
368            converge_pack::ProvenanceSource::provenance(
369                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
370            ),
371        ))
372        .unwrap();
373
374        let first = engine.run(ctx).await.unwrap();
375        let mut engine2 = Engine::new();
376        engine2.register_suggestor(FlowOptimizationSuggestor);
377        let second = engine2.run(first.context.clone()).await.unwrap();
378        assert_eq!(
379            second.context.get(ContextKey::Strategies).len(),
380            first.context.get(ContextKey::Strategies).len(),
381        );
382    }
383
384    #[tokio::test]
385    async fn malformed_request_emits_diagnostic() {
386        let mut engine = Engine::new();
387        engine.register_suggestor(FlowOptimizationSuggestor);
388
389        let mut ctx = ContextState::new();
390        ctx.add_proposal(ProposedFact::new(
391            ContextKey::Seeds,
392            "flow-request:bad",
393            TextPayload::new("not a flow request"),
394            converge_pack::ProvenanceSource::provenance(
395                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
396            ),
397        ))
398        .unwrap();
399
400        let result = engine.run(ctx).await.unwrap();
401        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
402        assert!(!result.context.has(ContextKey::Strategies));
403    }
404}