converge_optimization/suggestors/
flow_optimization.rs1use async_trait::async_trait;
19use converge_pack::{AgentEffect, Context, ContextKey, ProposedFact, Suggestor};
20use serde::{Deserialize, Serialize};
21
22use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct FlowRequest {
29 pub id: String,
30 pub num_nodes: usize,
31 pub edges: Vec<FlowEdgeSpec>,
32 pub source: usize,
33 pub sink: usize,
34 pub demand: i64,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct FlowEdgeSpec {
41 pub from: usize,
42 pub to: usize,
43 pub capacity: i64,
44 pub cost: i64,
46 pub label: Option<String>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FlowPlan {
55 pub request_id: String,
56 pub total_flow: i64,
57 pub total_cost: i64,
58 pub edge_flows: Vec<i64>,
60 pub fulfillment: f64,
62 pub feasible: bool,
63}
64
65const REQUEST_PREFIX: &str = "flow-request:";
68const PLAN_PREFIX: &str = "flow-plan:";
69const ERROR_PREFIX: &str = "flow-request-error:";
70
71pub struct FlowOptimizationSuggestor;
74
75#[async_trait]
76impl Suggestor for FlowOptimizationSuggestor {
77 fn name(&self) -> &str {
78 "FlowOptimizationSuggestor"
79 }
80
81 fn dependencies(&self) -> &[ContextKey] {
82 &[ContextKey::Seeds]
83 }
84
85 fn complexity_hint(&self) -> Option<&'static str> {
86 Some(
87 "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
88 )
89 }
90
91 fn accepts(&self, ctx: &dyn Context) -> bool {
92 ctx.get(ContextKey::Seeds).iter().any(|f| {
93 f.id().as_str().starts_with(REQUEST_PREFIX)
94 && match serde_json::from_str::<FlowRequest>(f.content()) {
95 Ok(_) => !plan_exists(ctx, req_id(f.id().as_str())),
96 Err(_) => !error_exists(ctx, f.id().as_str()),
97 }
98 })
99 }
100
101 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
102 let mut proposals = Vec::new();
103
104 for fact in ctx
105 .get(ContextKey::Seeds)
106 .iter()
107 .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
108 {
109 match serde_json::from_str::<FlowRequest>(fact.content()) {
110 Ok(req) => {
111 if plan_exists(ctx, req_id(fact.id().as_str())) {
112 continue;
113 }
114 let plan = solve(&req);
115 let confidence = plan.fulfillment;
116 proposals.push(
117 ProposedFact::new(
118 ContextKey::Strategies,
119 format!("{}{}", PLAN_PREFIX, plan.request_id),
120 serde_json::to_string(&plan).unwrap_or_default(),
121 self.name(),
122 )
123 .with_confidence(confidence),
124 );
125 }
126 Err(e) => {
127 if error_exists(ctx, fact.id().as_str()) {
128 continue;
129 }
130 let diag = serde_json::json!({
131 "request_fact_id": fact.id(),
132 "message": "malformed flow request",
133 "error": e.to_string(),
134 });
135 proposals.push(
136 ProposedFact::new(
137 ContextKey::Diagnostic,
138 format!("{}{}", ERROR_PREFIX, fact.id()),
139 diag.to_string(),
140 self.name(),
141 )
142 .with_confidence(1.0),
143 );
144 }
145 }
146 }
147
148 if proposals.is_empty() {
149 AgentEffect::empty()
150 } else {
151 AgentEffect::with_proposals(proposals)
152 }
153 }
154}
155
156fn solve(req: &FlowRequest) -> FlowPlan {
159 if req.edges.is_empty() || req.demand == 0 {
160 return FlowPlan {
161 request_id: req.id.clone(),
162 total_flow: 0,
163 total_cost: 0,
164 edge_flows: vec![],
165 fulfillment: 1.0,
166 feasible: true,
167 };
168 }
169
170 let mut net = FlowNetwork::new(req.num_nodes);
171 for edge in &req.edges {
172 net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
173 }
174
175 let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
176 return FlowPlan {
177 request_id: req.id.clone(),
178 total_flow: 0,
179 total_cost: 0,
180 edge_flows: vec![0; req.edges.len()],
181 fulfillment: 0.0,
182 feasible: false,
183 };
184 };
185
186 match min_cost_flow(&problem) {
187 Ok(sol) => {
188 let fulfillment = if req.demand > 0 {
189 (sol.flow as f64 / req.demand as f64).min(1.0)
190 } else {
191 1.0
192 };
193 FlowPlan {
194 request_id: req.id.clone(),
195 total_flow: sol.flow,
196 total_cost: sol.cost,
197 edge_flows: sol.edge_flows,
198 fulfillment,
199 feasible: true,
200 }
201 }
202 Err(_) => FlowPlan {
203 request_id: req.id.clone(),
204 total_flow: 0,
205 total_cost: 0,
206 edge_flows: vec![0; req.edges.len()],
207 fulfillment: 0.0,
208 feasible: false,
209 },
210 }
211}
212
213fn req_id(fact_id: &str) -> &str {
216 fact_id.trim_start_matches(REQUEST_PREFIX)
217}
218
219fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
220 let id = format!("{}{}", PLAN_PREFIX, request_id);
221 ctx.get(ContextKey::Strategies)
222 .iter()
223 .any(|f| f.id().as_str() == id)
224}
225
226fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
227 let id = format!("{}{}", ERROR_PREFIX, fact_id);
228 ctx.get(ContextKey::Diagnostic)
229 .iter()
230 .any(|f| f.id().as_str() == id)
231}
232
233#[cfg(test)]
236mod tests {
237 use super::*;
238 use converge_core::{ContextState, Engine};
239
240 fn two_path_request(demand: i64) -> String {
241 serde_json::to_string(&FlowRequest {
244 id: "r1".into(),
245 num_nodes: 4,
246 edges: vec![
247 FlowEdgeSpec {
248 from: 0,
249 to: 1,
250 capacity: 3,
251 cost: 1,
252 label: Some("s→a".into()),
253 },
254 FlowEdgeSpec {
255 from: 1,
256 to: 3,
257 capacity: 3,
258 cost: 1,
259 label: Some("a→t".into()),
260 },
261 FlowEdgeSpec {
262 from: 0,
263 to: 2,
264 capacity: 3,
265 cost: 5,
266 label: Some("s→b".into()),
267 },
268 FlowEdgeSpec {
269 from: 2,
270 to: 3,
271 capacity: 3,
272 cost: 5,
273 label: Some("b→t".into()),
274 },
275 ],
276 source: 0,
277 sink: 3,
278 demand,
279 })
280 .unwrap()
281 }
282
283 #[tokio::test]
284 async fn cheap_path_used_first() {
285 let mut engine = Engine::new();
287 engine.register_suggestor(FlowOptimizationSuggestor);
288
289 let mut ctx = ContextState::new();
290 ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(3))
291 .unwrap();
292
293 let result = engine.run(ctx).await.unwrap();
294 let facts = result.context.get(ContextKey::Strategies);
295 assert_eq!(facts.len(), 1);
296 let plan: FlowPlan = serde_json::from_str(facts[0].content()).unwrap();
297 assert_eq!(plan.total_flow, 3);
298 assert_eq!(plan.total_cost, 6);
299 assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
300 }
301
302 #[tokio::test]
303 async fn overflow_uses_expensive_path() {
304 let mut engine = Engine::new();
306 engine.register_suggestor(FlowOptimizationSuggestor);
307
308 let mut ctx = ContextState::new();
309 ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(4))
310 .unwrap();
311
312 let result = engine.run(ctx).await.unwrap();
313 let plan: FlowPlan =
314 serde_json::from_str(result.context.get(ContextKey::Strategies)[0].content()).unwrap();
315 assert_eq!(plan.total_flow, 4);
316 assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
317 }
318
319 #[tokio::test]
320 async fn result_is_idempotent() {
321 let mut engine = Engine::new();
322 engine.register_suggestor(FlowOptimizationSuggestor);
323
324 let mut ctx = ContextState::new();
325 ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(3))
326 .unwrap();
327
328 let first = engine.run(ctx).await.unwrap();
329 let mut engine2 = Engine::new();
330 engine2.register_suggestor(FlowOptimizationSuggestor);
331 let second = engine2.run(first.context.clone()).await.unwrap();
332 assert_eq!(
333 second.context.get(ContextKey::Strategies).len(),
334 first.context.get(ContextKey::Strategies).len(),
335 );
336 }
337
338 #[tokio::test]
339 async fn malformed_request_emits_diagnostic() {
340 let mut engine = Engine::new();
341 engine.register_suggestor(FlowOptimizationSuggestor);
342
343 let mut ctx = ContextState::new();
344 ctx.add_input(ContextKey::Seeds, "flow-request:bad", "not-json")
345 .unwrap();
346
347 let result = engine.run(ctx).await.unwrap();
348 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
349 assert!(!result.context.has(ContextKey::Strategies));
350 }
351}