converge_optimization/suggestors/
flow_optimization.rs1use async_trait::async_trait;
19use converge_pack::ProvenanceSource;
20use converge_pack::{
21 AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
22};
23use serde::{Deserialize, Serialize};
24
25use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
26
27#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
31#[serde(deny_unknown_fields)]
32pub struct FlowRequest {
33 pub id: String,
34 pub num_nodes: usize,
35 pub edges: Vec<FlowEdgeSpec>,
36 pub source: usize,
37 pub sink: usize,
38 pub demand: i64,
40}
41
42impl FactPayload for FlowRequest {
43 const FAMILY: &'static str = "converge.optimization.flow.request";
44 const VERSION: u16 = 1;
45}
46
47#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
49#[serde(deny_unknown_fields)]
50pub struct FlowEdgeSpec {
51 pub from: usize,
52 pub to: usize,
53 pub capacity: i64,
54 pub cost: i64,
56 pub label: Option<String>,
58}
59
60#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
64#[serde(deny_unknown_fields)]
65pub struct FlowPlan {
66 pub request_id: String,
67 pub total_flow: i64,
68 pub total_cost: i64,
69 pub edge_flows: Vec<i64>,
71 pub fulfillment: f64,
73 pub feasible: bool,
74}
75
76impl FactPayload for FlowPlan {
77 const FAMILY: &'static str = "converge.optimization.flow.plan";
78 const VERSION: u16 = 1;
79}
80
81const REQUEST_PREFIX: &str = "flow-request:";
84const PLAN_PREFIX: &str = "flow-plan:";
85const ERROR_PREFIX: &str = "flow-request-error:";
86
87pub struct FlowOptimizationSuggestor;
90
91#[async_trait]
92impl Suggestor for FlowOptimizationSuggestor {
93 fn name(&self) -> &str {
94 "FlowOptimizationSuggestor"
95 }
96
97 fn dependencies(&self) -> &[ContextKey] {
98 &[ContextKey::Seeds]
99 }
100
101 fn complexity_hint(&self) -> Option<&'static str> {
102 Some(
103 "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
104 )
105 }
106
107 fn accepts(&self, ctx: &dyn Context) -> bool {
108 ctx.get(ContextKey::Seeds).iter().any(|f| {
109 f.id().as_str().starts_with(REQUEST_PREFIX)
110 && match f.payload::<FlowRequest>() {
111 Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
112 None => !error_exists(ctx, f.id().as_str()),
113 }
114 })
115 }
116
117 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
118 let mut proposals = Vec::new();
119
120 for fact in ctx
121 .get(ContextKey::Seeds)
122 .iter()
123 .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
124 {
125 match fact.payload::<FlowRequest>() {
126 Some(req) => {
127 if plan_exists(ctx, req_id(fact.id().as_str())) {
128 continue;
129 }
130 let plan = solve(req);
131 let confidence = plan.fulfillment;
132 proposals.push(
133 ProposedFact::new(
134 ContextKey::Strategies,
135 format!("{}{}", PLAN_PREFIX, plan.request_id),
136 plan.clone(),
137 self.name().to_string(),
138 )
139 .with_confidence(confidence),
140 );
141 }
142 None => {
143 if error_exists(ctx, fact.id().as_str()) {
144 continue;
145 }
146 proposals.push(
147 ProposedFact::new(
148 ContextKey::Diagnostic,
149 format!("{}{}", ERROR_PREFIX, fact.id()),
150 DiagnosticPayload::new(
151 self.name(),
152 format!(
153 "malformed flow request '{}': expected {} v{} payload",
154 fact.id(),
155 FlowRequest::FAMILY,
156 FlowRequest::VERSION
157 ),
158 ),
159 self.name().to_string(),
160 )
161 .with_confidence(1.0),
162 );
163 }
164 }
165 }
166
167 if proposals.is_empty() {
168 AgentEffect::empty()
169 } else {
170 AgentEffect::with_proposals(proposals)
171 }
172 }
173
174 fn provenance(&self) -> &'static str {
175 super::CONVERGE_OPTIMIZATION_PROVENANCE.as_str()
176 }
177}
178
179fn solve(req: &FlowRequest) -> FlowPlan {
182 if req.edges.is_empty() || req.demand == 0 {
183 return FlowPlan {
184 request_id: req.id.clone(),
185 total_flow: 0,
186 total_cost: 0,
187 edge_flows: vec![],
188 fulfillment: 1.0,
189 feasible: true,
190 };
191 }
192
193 let mut net = FlowNetwork::new(req.num_nodes);
194 for edge in &req.edges {
195 net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
196 }
197
198 let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
199 return FlowPlan {
200 request_id: req.id.clone(),
201 total_flow: 0,
202 total_cost: 0,
203 edge_flows: vec![0; req.edges.len()],
204 fulfillment: 0.0,
205 feasible: false,
206 };
207 };
208
209 match min_cost_flow(&problem) {
210 Ok(sol) => {
211 let fulfillment = if req.demand > 0 {
212 (sol.flow as f64 / req.demand as f64).min(1.0)
213 } else {
214 1.0
215 };
216 FlowPlan {
217 request_id: req.id.clone(),
218 total_flow: sol.flow,
219 total_cost: sol.cost,
220 edge_flows: sol.edge_flows,
221 fulfillment,
222 feasible: true,
223 }
224 }
225 Err(_) => FlowPlan {
226 request_id: req.id.clone(),
227 total_flow: 0,
228 total_cost: 0,
229 edge_flows: vec![0; req.edges.len()],
230 fulfillment: 0.0,
231 feasible: false,
232 },
233 }
234}
235
236fn req_id(fact_id: &str) -> &str {
239 fact_id.trim_start_matches(REQUEST_PREFIX)
240}
241
242fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
243 let id = format!("{}{}", PLAN_PREFIX, request_id);
244 ctx.get(ContextKey::Strategies)
245 .iter()
246 .any(|f| f.id().as_str() == id)
247}
248
249fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
250 let id = format!("{}{}", ERROR_PREFIX, fact_id);
251 ctx.get(ContextKey::Diagnostic)
252 .iter()
253 .any(|f| f.id().as_str() == id)
254}
255
256#[cfg(test)]
259mod tests {
260 use super::*;
261 use converge_core::{ContextState, Engine};
262 use converge_pack::TextPayload;
263
264 fn two_path_request(demand: i64) -> FlowRequest {
265 FlowRequest {
268 id: "r1".into(),
269 num_nodes: 4,
270 edges: vec![
271 FlowEdgeSpec {
272 from: 0,
273 to: 1,
274 capacity: 3,
275 cost: 1,
276 label: Some("s→a".into()),
277 },
278 FlowEdgeSpec {
279 from: 1,
280 to: 3,
281 capacity: 3,
282 cost: 1,
283 label: Some("a→t".into()),
284 },
285 FlowEdgeSpec {
286 from: 0,
287 to: 2,
288 capacity: 3,
289 cost: 5,
290 label: Some("s→b".into()),
291 },
292 FlowEdgeSpec {
293 from: 2,
294 to: 3,
295 capacity: 3,
296 cost: 5,
297 label: Some("b→t".into()),
298 },
299 ],
300 source: 0,
301 sink: 3,
302 demand,
303 }
304 }
305
306 #[tokio::test]
307 async fn cheap_path_used_first() {
308 let mut engine = Engine::new();
310 engine.register_suggestor(FlowOptimizationSuggestor);
311
312 let mut ctx = ContextState::new();
313 ctx.add_proposal(ProposedFact::new(
314 ContextKey::Seeds,
315 "flow-request:r1",
316 two_path_request(3),
317 "test",
318 ))
319 .unwrap();
320
321 let result = engine.run(ctx).await.unwrap();
322 let facts = result.context.get(ContextKey::Strategies);
323 assert_eq!(facts.len(), 1);
324 let plan = facts[0].require_payload::<FlowPlan>().unwrap();
325 assert_eq!(plan.total_flow, 3);
326 assert_eq!(plan.total_cost, 6);
327 assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
328 }
329
330 #[tokio::test]
331 async fn overflow_uses_expensive_path() {
332 let mut engine = Engine::new();
334 engine.register_suggestor(FlowOptimizationSuggestor);
335
336 let mut ctx = ContextState::new();
337 ctx.add_proposal(ProposedFact::new(
338 ContextKey::Seeds,
339 "flow-request:r1",
340 two_path_request(4),
341 "test",
342 ))
343 .unwrap();
344
345 let result = engine.run(ctx).await.unwrap();
346 let plan = result.context.get(ContextKey::Strategies)[0]
347 .require_payload::<FlowPlan>()
348 .unwrap();
349 assert_eq!(plan.total_flow, 4);
350 assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
351 }
352
353 #[tokio::test]
354 async fn result_is_idempotent() {
355 let mut engine = Engine::new();
356 engine.register_suggestor(FlowOptimizationSuggestor);
357
358 let mut ctx = ContextState::new();
359 ctx.add_proposal(ProposedFact::new(
360 ContextKey::Seeds,
361 "flow-request:r1",
362 two_path_request(3),
363 "test",
364 ))
365 .unwrap();
366
367 let first = engine.run(ctx).await.unwrap();
368 let mut engine2 = Engine::new();
369 engine2.register_suggestor(FlowOptimizationSuggestor);
370 let second = engine2.run(first.context.clone()).await.unwrap();
371 assert_eq!(
372 second.context.get(ContextKey::Strategies).len(),
373 first.context.get(ContextKey::Strategies).len(),
374 );
375 }
376
377 #[tokio::test]
378 async fn malformed_request_emits_diagnostic() {
379 let mut engine = Engine::new();
380 engine.register_suggestor(FlowOptimizationSuggestor);
381
382 let mut ctx = ContextState::new();
383 ctx.add_proposal(ProposedFact::new(
384 ContextKey::Seeds,
385 "flow-request:bad",
386 TextPayload::new("not a flow request"),
387 "test",
388 ))
389 .unwrap();
390
391 let result = engine.run(ctx).await.unwrap();
392 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
393 assert!(!result.context.has(ContextKey::Strategies));
394 }
395}