1use async_trait::async_trait;
19use converge_pack::Provenance;
20use converge_pack::ProvenanceSource;
21use converge_pack::{
22 AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
23};
24use serde::{Deserialize, Serialize};
25
26use crate::graph::flow::{FlowNetwork, MinCostFlowProblem, min_cost_flow};
27
28#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(deny_unknown_fields)]
33pub struct FlowRequest {
34 pub id: String,
35 pub num_nodes: usize,
36 pub edges: Vec<FlowEdgeSpec>,
37 pub source: usize,
38 pub sink: usize,
39 pub demand: i64,
41}
42
43impl FactPayload for FlowRequest {
44 const FAMILY: &'static str = "converge.optimization.flow.request";
45 const VERSION: u16 = 1;
46}
47
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
50#[serde(deny_unknown_fields)]
51pub struct FlowEdgeSpec {
52 pub from: usize,
53 pub to: usize,
54 pub capacity: i64,
55 pub cost: i64,
57 pub label: Option<String>,
59}
60
61#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
65#[serde(deny_unknown_fields)]
66pub struct FlowPlan {
67 pub request_id: String,
68 pub total_flow: i64,
69 pub total_cost: i64,
70 pub edge_flows: Vec<i64>,
72 pub fulfillment: f64,
74 pub feasible: bool,
75}
76
77impl FactPayload for FlowPlan {
78 const FAMILY: &'static str = "converge.optimization.flow.plan";
79 const VERSION: u16 = 1;
80}
81
82const REQUEST_PREFIX: &str = "flow-request:";
85const PLAN_PREFIX: &str = "flow-plan:";
86const ERROR_PREFIX: &str = "flow-request-error:";
87
88pub struct FlowOptimizationSuggestor;
91
92#[async_trait]
93impl Suggestor for FlowOptimizationSuggestor {
94 fn name(&self) -> &str {
95 "FlowOptimizationSuggestor"
96 }
97
98 fn dependencies(&self) -> &[ContextKey] {
99 &[ContextKey::Seeds]
100 }
101
102 fn complexity_hint(&self) -> Option<&'static str> {
103 Some(
104 "O(V × E × F) successive shortest paths — V = nodes, E = edges, F = total flow; practical for V ≤ 1000",
105 )
106 }
107
108 fn accepts(&self, ctx: &dyn Context) -> bool {
109 ctx.get(ContextKey::Seeds).iter().any(|f| {
110 f.id().as_str().starts_with(REQUEST_PREFIX)
111 && match f.payload::<FlowRequest>() {
112 Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
113 None => !error_exists(ctx, f.id().as_str()),
114 }
115 })
116 }
117
118 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
119 let mut proposals = Vec::new();
120
121 for fact in ctx
122 .get(ContextKey::Seeds)
123 .iter()
124 .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
125 {
126 match fact.payload::<FlowRequest>() {
127 Some(req) => {
128 if plan_exists(ctx, req_id(fact.id().as_str())) {
129 continue;
130 }
131 let plan = solve(req);
132 let confidence = plan.fulfillment;
133 proposals.push(
134 ProposedFact::new(
135 ContextKey::Strategies,
136 format!("{}{}", PLAN_PREFIX, plan.request_id),
137 plan.clone(),
138 self.provenance(),
139 )
140 .with_confidence(confidence),
141 );
142 }
143 None => {
144 if error_exists(ctx, fact.id().as_str()) {
145 continue;
146 }
147 proposals.push(
148 ProposedFact::new(
149 ContextKey::Diagnostic,
150 format!("{}{}", ERROR_PREFIX, fact.id()),
151 DiagnosticPayload::new(
152 self.name(),
153 format!(
154 "malformed flow request '{}': expected {} v{} payload",
155 fact.id(),
156 FlowRequest::FAMILY,
157 FlowRequest::VERSION
158 ),
159 ),
160 self.provenance(),
161 )
162 .with_confidence(1.0),
163 );
164 }
165 }
166 }
167
168 if proposals.is_empty() {
169 AgentEffect::empty()
170 } else {
171 AgentEffect::with_proposals(proposals)
172 }
173 }
174
175 fn provenance(&self) -> Provenance {
176 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE.provenance()
177 }
178}
179
180fn solve(req: &FlowRequest) -> FlowPlan {
183 if req.edges.is_empty() || req.demand == 0 {
184 return FlowPlan {
185 request_id: req.id.clone(),
186 total_flow: 0,
187 total_cost: 0,
188 edge_flows: vec![],
189 fulfillment: 1.0,
190 feasible: true,
191 };
192 }
193
194 let mut net = FlowNetwork::new(req.num_nodes);
195 for edge in &req.edges {
196 net.add_edge(edge.from, edge.to, edge.capacity, edge.cost);
197 }
198
199 let Ok(problem) = MinCostFlowProblem::source_sink(net, req.source, req.sink, req.demand) else {
200 return FlowPlan {
201 request_id: req.id.clone(),
202 total_flow: 0,
203 total_cost: 0,
204 edge_flows: vec![0; req.edges.len()],
205 fulfillment: 0.0,
206 feasible: false,
207 };
208 };
209
210 match min_cost_flow(&problem) {
211 Ok(sol) => {
212 let fulfillment = if req.demand > 0 {
213 (sol.flow as f64 / req.demand as f64).min(1.0)
214 } else {
215 1.0
216 };
217 FlowPlan {
218 request_id: req.id.clone(),
219 total_flow: sol.flow,
220 total_cost: sol.cost,
221 edge_flows: sol.edge_flows,
222 fulfillment,
223 feasible: true,
224 }
225 }
226 Err(_) => FlowPlan {
227 request_id: req.id.clone(),
228 total_flow: 0,
229 total_cost: 0,
230 edge_flows: vec![0; req.edges.len()],
231 fulfillment: 0.0,
232 feasible: false,
233 },
234 }
235}
236
237fn req_id(fact_id: &str) -> &str {
240 fact_id.trim_start_matches(REQUEST_PREFIX)
241}
242
243fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
244 let id = format!("{}{}", PLAN_PREFIX, request_id);
245 ctx.get(ContextKey::Strategies)
246 .iter()
247 .any(|f| f.id().as_str() == id)
248}
249
250fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
251 let id = format!("{}{}", ERROR_PREFIX, fact_id);
252 ctx.get(ContextKey::Diagnostic)
253 .iter()
254 .any(|f| f.id().as_str() == id)
255}
256
257#[cfg(test)]
260mod tests {
261 use super::*;
262 use converge_core::{ContextState, Engine};
263 use converge_pack::TextPayload;
264
265 fn two_path_request(demand: i64) -> FlowRequest {
266 FlowRequest {
269 id: "r1".into(),
270 num_nodes: 4,
271 edges: vec![
272 FlowEdgeSpec {
273 from: 0,
274 to: 1,
275 capacity: 3,
276 cost: 1,
277 label: Some("s→a".into()),
278 },
279 FlowEdgeSpec {
280 from: 1,
281 to: 3,
282 capacity: 3,
283 cost: 1,
284 label: Some("a→t".into()),
285 },
286 FlowEdgeSpec {
287 from: 0,
288 to: 2,
289 capacity: 3,
290 cost: 5,
291 label: Some("s→b".into()),
292 },
293 FlowEdgeSpec {
294 from: 2,
295 to: 3,
296 capacity: 3,
297 cost: 5,
298 label: Some("b→t".into()),
299 },
300 ],
301 source: 0,
302 sink: 3,
303 demand,
304 }
305 }
306
307 #[tokio::test]
308 async fn cheap_path_used_first() {
309 let mut engine = Engine::new();
311 engine.register_suggestor(FlowOptimizationSuggestor);
312
313 let mut ctx = ContextState::new();
314 ctx.add_proposal(ProposedFact::new(
315 ContextKey::Seeds,
316 "flow-request:r1",
317 two_path_request(3),
318 converge_pack::ProvenanceSource::provenance(
319 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
320 ),
321 ))
322 .unwrap();
323
324 let result = engine.run(ctx).await.unwrap();
325 let facts = result.context.get(ContextKey::Strategies);
326 assert_eq!(facts.len(), 1);
327 let plan = facts[0].require_payload::<FlowPlan>().unwrap();
328 assert_eq!(plan.total_flow, 3);
329 assert_eq!(plan.total_cost, 6);
330 assert!((plan.fulfillment - 1.0).abs() < f64::EPSILON);
331 }
332
333 #[tokio::test]
334 async fn overflow_uses_expensive_path() {
335 let mut engine = Engine::new();
337 engine.register_suggestor(FlowOptimizationSuggestor);
338
339 let mut ctx = ContextState::new();
340 ctx.add_proposal(ProposedFact::new(
341 ContextKey::Seeds,
342 "flow-request:r1",
343 two_path_request(4),
344 converge_pack::ProvenanceSource::provenance(
345 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
346 ),
347 ))
348 .unwrap();
349
350 let result = engine.run(ctx).await.unwrap();
351 let plan = result.context.get(ContextKey::Strategies)[0]
352 .require_payload::<FlowPlan>()
353 .unwrap();
354 assert_eq!(plan.total_flow, 4);
355 assert_eq!(plan.total_cost, 16, "3×2 + 1×10 = 16");
356 }
357
358 #[tokio::test]
359 async fn result_is_idempotent() {
360 let mut engine = Engine::new();
361 engine.register_suggestor(FlowOptimizationSuggestor);
362
363 let mut ctx = ContextState::new();
364 ctx.add_proposal(ProposedFact::new(
365 ContextKey::Seeds,
366 "flow-request:r1",
367 two_path_request(3),
368 converge_pack::ProvenanceSource::provenance(
369 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
370 ),
371 ))
372 .unwrap();
373
374 let first = engine.run(ctx).await.unwrap();
375 let mut engine2 = Engine::new();
376 engine2.register_suggestor(FlowOptimizationSuggestor);
377 let second = engine2.run(first.context.clone()).await.unwrap();
378 assert_eq!(
379 second.context.get(ContextKey::Strategies).len(),
380 first.context.get(ContextKey::Strategies).len(),
381 );
382 }
383
384 #[tokio::test]
385 async fn malformed_request_emits_diagnostic() {
386 let mut engine = Engine::new();
387 engine.register_suggestor(FlowOptimizationSuggestor);
388
389 let mut ctx = ContextState::new();
390 ctx.add_proposal(ProposedFact::new(
391 ContextKey::Seeds,
392 "flow-request:bad",
393 TextPayload::new("not a flow request"),
394 converge_pack::ProvenanceSource::provenance(
395 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
396 ),
397 ))
398 .unwrap();
399
400 let result = engine.run(ctx).await.unwrap();
401 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
402 assert!(!result.context.has(ContextKey::Strategies));
403 }
404}