converge_optimization/suggestors/
flow_optimization.rs1use async_trait::async_trait;
19use converge_pack::{AgentEffect, Context, ContextKey, ProposedFact, Suggestor};
20use serde::{Deserialize, Serialize};
21
22use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct FlowRequest {
29 pub id: String,
30 pub num_nodes: usize,
31 pub edges: Vec<FlowEdgeSpec>,
32 pub source: usize,
33 pub sink: usize,
34 pub demand: i64,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct FlowEdgeSpec {
41 pub from: usize,
42 pub to: usize,
43 pub capacity: i64,
44 pub cost: i64,
46 pub label: Option<String>,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct FlowPlan {
55 pub request_id: String,
56 pub total_flow: i64,
57 pub total_cost: i64,
58 pub edge_flows: Vec<i64>,
60 pub fulfillment: f64,
62 pub feasible: bool,
63}
64
65const REQUEST_PREFIX: &str = "flow-request:";
68const PLAN_PREFIX: &str = "flow-plan:";
69const ERROR_PREFIX: &str = "flow-request-error:";
70
71pub struct FlowOptimizationSuggestor;
74
75#[async_trait]
76impl Suggestor for FlowOptimizationSuggestor {
77 fn name(&self) -> &str {
78 "FlowOptimizationSuggestor"
79 }
80
81 fn dependencies(&self) -> &[ContextKey] {
82 &[ContextKey::Seeds]
83 }
84
85 fn complexity_hint(&self) -> Option<&'static str> {
86 Some(
87 "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
88 )
89 }
90
91 fn accepts(&self, ctx: &dyn Context) -> bool {
92 ctx.get(ContextKey::Seeds).iter().any(|f| {
93 f.id.starts_with(REQUEST_PREFIX)
94 && match serde_json::from_str::<FlowRequest>(&f.content) {
95 Ok(_) => !plan_exists(ctx, req_id(&f.id)),
96 Err(_) => !error_exists(ctx, &f.id),
97 }
98 })
99 }
100
101 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
102 let mut proposals = Vec::new();
103
104 for fact in ctx
105 .get(ContextKey::Seeds)
106 .iter()
107 .filter(|f| f.id.starts_with(REQUEST_PREFIX))
108 {
109 match serde_json::from_str::<FlowRequest>(&fact.content) {
110 Ok(req) => {
111 if plan_exists(ctx, req_id(&fact.id)) {
112 continue;
113 }
114 let plan = solve(&req);
115 let confidence = plan.fulfillment;
116 proposals.push(
117 ProposedFact::new(
118 ContextKey::Strategies,
119 format!("{}{}", PLAN_PREFIX, plan.request_id),
120 serde_json::to_string(&plan).unwrap_or_default(),
121 self.name(),
122 )
123 .with_confidence(confidence),
124 );
125 }
126 Err(e) => {
127 if error_exists(ctx, &fact.id) {
128 continue;
129 }
130 let diag = serde_json::json!({
131 "request_fact_id": fact.id,
132 "message": "malformed flow request",
133 "error": e.to_string(),
134 });
135 proposals.push(
136 ProposedFact::new(
137 ContextKey::Diagnostic,
138 format!("{}{}", ERROR_PREFIX, fact.id),
139 diag.to_string(),
140 self.name(),
141 )
142 .with_confidence(1.0),
143 );
144 }
145 }
146 }
147
148 if proposals.is_empty() {
149 AgentEffect::empty()
150 } else {
151 AgentEffect::with_proposals(proposals)
152 }
153 }
154}
155
156fn solve(req: &FlowRequest) -> FlowPlan {
159 if req.edges.is_empty() || req.demand == 0 {
160 return FlowPlan {
161 request_id: req.id.clone(),
162 total_flow: 0,
163 total_cost: 0,
164 edge_flows: vec![],
165 fulfillment: 1.0,
166 feasible: true,
167 };
168 }
169
170 let mut net = FlowNetwork::new(req.num_nodes);
171 for edge in &req.edges {
172 net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
173 }
174
175 let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
176 return FlowPlan {
177 request_id: req.id.clone(),
178 total_flow: 0,
179 total_cost: 0,
180 edge_flows: vec![0; req.edges.len()],
181 fulfillment: 0.0,
182 feasible: false,
183 };
184 };
185
186 match min_cost_flow(&problem) {
187 Ok(sol) => {
188 let fulfillment = if req.demand > 0 {
189 (sol.flow as f64 / req.demand as f64).min(1.0)
190 } else {
191 1.0
192 };
193 FlowPlan {
194 request_id: req.id.clone(),
195 total_flow: sol.flow,
196 total_cost: sol.cost,
197 edge_flows: sol.edge_flows,
198 fulfillment,
199 feasible: true,
200 }
201 }
202 Err(_) => FlowPlan {
203 request_id: req.id.clone(),
204 total_flow: 0,
205 total_cost: 0,
206 edge_flows: vec![0; req.edges.len()],
207 fulfillment: 0.0,
208 feasible: false,
209 },
210 }
211}
212
213fn req_id(fact_id: &str) -> &str {
216 fact_id.trim_start_matches(REQUEST_PREFIX)
217}
218
219fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
220 let id = format!("{}{}", PLAN_PREFIX, request_id);
221 ctx.get(ContextKey::Strategies).iter().any(|f| f.id == id)
222}
223
224fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
225 let id = format!("{}{}", ERROR_PREFIX, fact_id);
226 ctx.get(ContextKey::Diagnostic).iter().any(|f| f.id == id)
227}
228
229#[cfg(test)]
232mod tests {
233 use super::*;
234 use converge_core::{ContextState, Engine};
235
236 fn two_path_request(demand: i64) -> String {
237 serde_json::to_string(&FlowRequest {
240 id: "r1".into(),
241 num_nodes: 4,
242 edges: vec![
243 FlowEdgeSpec {
244 from: 0,
245 to: 1,
246 capacity: 3,
247 cost: 1,
248 label: Some("s→a".into()),
249 },
250 FlowEdgeSpec {
251 from: 1,
252 to: 3,
253 capacity: 3,
254 cost: 1,
255 label: Some("a→t".into()),
256 },
257 FlowEdgeSpec {
258 from: 0,
259 to: 2,
260 capacity: 3,
261 cost: 5,
262 label: Some("s→b".into()),
263 },
264 FlowEdgeSpec {
265 from: 2,
266 to: 3,
267 capacity: 3,
268 cost: 5,
269 label: Some("b→t".into()),
270 },
271 ],
272 source: 0,
273 sink: 3,
274 demand,
275 })
276 .unwrap()
277 }
278
279 #[tokio::test]
280 async fn cheap_path_used_first() {
281 let mut engine = Engine::new();
283 engine.register_suggestor(FlowOptimizationSuggestor);
284
285 let mut ctx = ContextState::new();
286 ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(3))
287 .unwrap();
288
289 let result = engine.run(ctx).await.unwrap();
290 let facts = result.context.get(ContextKey::Strategies);
291 assert_eq!(facts.len(), 1);
292 let plan: FlowPlan = serde_json::from_str(&facts[0].content).unwrap();
293 assert_eq!(plan.total_flow, 3);
294 assert_eq!(plan.total_cost, 6);
295 assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
296 }
297
298 #[tokio::test]
299 async fn overflow_uses_expensive_path() {
300 let mut engine = Engine::new();
302 engine.register_suggestor(FlowOptimizationSuggestor);
303
304 let mut ctx = ContextState::new();
305 ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(4))
306 .unwrap();
307
308 let result = engine.run(ctx).await.unwrap();
309 let plan: FlowPlan =
310 serde_json::from_str(&result.context.get(ContextKey::Strategies)[0].content).unwrap();
311 assert_eq!(plan.total_flow, 4);
312 assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
313 }
314
315 #[tokio::test]
316 async fn result_is_idempotent() {
317 let mut engine = Engine::new();
318 engine.register_suggestor(FlowOptimizationSuggestor);
319
320 let mut ctx = ContextState::new();
321 ctx.add_input(ContextKey::Seeds, "flow-request:r1", two_path_request(3))
322 .unwrap();
323
324 let first = engine.run(ctx).await.unwrap();
325 let mut engine2 = Engine::new();
326 engine2.register_suggestor(FlowOptimizationSuggestor);
327 let second = engine2.run(first.context.clone()).await.unwrap();
328 assert_eq!(
329 second.context.get(ContextKey::Strategies).len(),
330 first.context.get(ContextKey::Strategies).len(),
331 );
332 }
333
334 #[tokio::test]
335 async fn malformed_request_emits_diagnostic() {
336 let mut engine = Engine::new();
337 engine.register_suggestor(FlowOptimizationSuggestor);
338
339 let mut ctx = ContextState::new();
340 ctx.add_input(ContextKey::Seeds, "flow-request:bad", "not-json")
341 .unwrap();
342
343 let result = engine.run(ctx).await.unwrap();
344 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
345 assert!(!result.context.has(ContextKey::Strategies));
346 }
347}