converge_optimization/suggestors/
work_schedule.rs1use async_trait::async_trait;
22use converge_pack::{AgentEffect, Context, ContextKey, ProposedFact, Suggestor};
23use serde::{Deserialize, Serialize};
24
25use crate::scheduling::{Interval, SchedulingProblem, list_schedule};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ScheduleRequest {
32 pub id: String,
33 pub tasks: Vec<ScheduleTask>,
34 pub capacity: Option<i64>,
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ScheduleTask {
41 pub label: String,
42 pub earliest_start: i64,
43 pub latest_end: i64,
44 pub duration: i64,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct SchedulePlan {
52 pub request_id: String,
53 pub scheduled: Vec<ScheduledTask>,
54 pub makespan: i64,
55 pub efficiency: f64,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ScheduledTask {
61 pub label: String,
62 pub start: i64,
63 pub end: i64,
64}
65
66const REQUEST_PREFIX: &str = "schedule-request:";
69const PLAN_PREFIX: &str = "schedule-plan:";
70const ERROR_PREFIX: &str = "schedule-request-error:";
71
72pub struct WorkScheduleSuggestor;
74
75#[async_trait]
76impl Suggestor for WorkScheduleSuggestor {
77 fn name(&self) -> &str {
78 "WorkScheduleSuggestor"
79 }
80
81 fn dependencies(&self) -> &[ContextKey] {
82 &[ContextKey::Seeds]
83 }
84
85 fn complexity_hint(&self) -> Option<&'static str> {
86 Some("O(n log n) list scheduling — n = tasks; scales to thousands of tasks")
87 }
88
89 fn accepts(&self, ctx: &dyn Context) -> bool {
90 ctx.get(ContextKey::Seeds).iter().any(|f| {
91 f.id().as_str().starts_with(REQUEST_PREFIX)
92 && match serde_json::from_str::<ScheduleRequest>(f.content()) {
93 Ok(_) => !plan_exists(ctx, req_id(f.id().as_str())),
94 Err(_) => !error_exists(ctx, f.id().as_str()),
95 }
96 })
97 }
98
99 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
100 let mut proposals = Vec::new();
101
102 for fact in ctx
103 .get(ContextKey::Seeds)
104 .iter()
105 .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
106 {
107 match serde_json::from_str::<ScheduleRequest>(fact.content()) {
108 Ok(req) => {
109 if plan_exists(ctx, req_id(fact.id().as_str())) {
110 continue;
111 }
112 let plan = solve(&req);
113 proposals.push(
114 ProposedFact::new(
115 ContextKey::Strategies,
116 format!("{}{}", PLAN_PREFIX, plan.request_id),
117 serde_json::to_string(&plan).unwrap_or_default(),
118 self.name(),
119 )
120 .with_confidence(plan.efficiency.clamp(0.0, 1.0)),
121 );
122 }
123 Err(e) => {
124 if error_exists(ctx, fact.id().as_str()) {
125 continue;
126 }
127 let diag = serde_json::json!({
128 "request_fact_id": fact.id(),
129 "message": "malformed schedule request",
130 "error": e.to_string(),
131 });
132 proposals.push(
133 ProposedFact::new(
134 ContextKey::Diagnostic,
135 format!("{}{}", ERROR_PREFIX, fact.id()),
136 diag.to_string(),
137 self.name(),
138 )
139 .with_confidence(1.0),
140 );
141 }
142 }
143 }
144
145 if proposals.is_empty() {
146 AgentEffect::empty()
147 } else {
148 AgentEffect::with_proposals(proposals)
149 }
150 }
151}
152
153fn solve(req: &ScheduleRequest) -> SchedulePlan {
156 if req.tasks.is_empty() {
157 return SchedulePlan {
158 request_id: req.id.clone(),
159 scheduled: vec![],
160 makespan: 0,
161 efficiency: 1.0,
162 };
163 }
164
165 let intervals: Vec<Interval> = req
166 .tasks
167 .iter()
168 .enumerate()
169 .map(|(i, t)| Interval::new(i, t.earliest_start, t.latest_end, t.duration))
170 .collect();
171
172 let max_window = req.tasks.iter().map(|t| t.latest_end).max().unwrap_or(1);
173
174 let problem = match req.capacity {
175 None => SchedulingProblem::disjunctive(intervals),
176 Some(cap) => SchedulingProblem::cumulative(intervals, cap),
177 };
178
179 match list_schedule(&problem) {
180 Ok(sol) => {
181 let scheduled = sol
182 .schedule
183 .iter()
184 .map(|s| ScheduledTask {
185 label: req
186 .tasks
187 .get(s.interval.id)
188 .map(|t| t.label.clone())
189 .unwrap_or_default(),
190 start: s.start,
191 end: s.end(),
192 })
193 .collect();
194 let efficiency = if max_window > 0 {
195 1.0 - sol.makespan as f64 / max_window as f64
196 } else {
197 1.0
198 };
199 SchedulePlan {
200 request_id: req.id.clone(),
201 scheduled,
202 makespan: sol.makespan,
203 efficiency: efficiency.clamp(0.0, 1.0),
204 }
205 }
206 Err(_) => {
207 SchedulePlan {
210 request_id: req.id.clone(),
211 scheduled: vec![],
212 makespan: -1,
213 efficiency: 0.0,
214 }
215 }
216 }
217}
218
219fn req_id(fact_id: &str) -> &str {
222 fact_id.trim_start_matches(REQUEST_PREFIX)
223}
224
225fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
226 let id = format!("{}{}", PLAN_PREFIX, request_id);
227 ctx.get(ContextKey::Strategies)
228 .iter()
229 .any(|f| f.id().as_str() == id)
230}
231
232fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
233 let id = format!("{}{}", ERROR_PREFIX, fact_id);
234 ctx.get(ContextKey::Diagnostic)
235 .iter()
236 .any(|f| f.id().as_str() == id)
237}
238
239#[cfg(test)]
242mod tests {
243 use super::*;
244 use converge_core::{ContextState, Engine};
245
246 fn req_json(id: &str, tasks: Vec<(&str, i64, i64, i64)>, capacity: Option<i64>) -> String {
247 serde_json::to_string(&ScheduleRequest {
248 id: id.to_string(),
249 tasks: tasks
250 .into_iter()
251 .map(|(label, es, le, dur)| ScheduleTask {
252 label: label.to_string(),
253 earliest_start: es,
254 latest_end: le,
255 duration: dur,
256 })
257 .collect(),
258 capacity,
259 })
260 .unwrap()
261 }
262
263 #[tokio::test]
264 async fn three_tasks_makespan_equals_sum() {
265 let mut engine = Engine::new();
266 engine.register_suggestor(WorkScheduleSuggestor);
267
268 let mut ctx = ContextState::new();
269 ctx.add_input(
270 ContextKey::Seeds,
271 "schedule-request:r1",
272 req_json(
273 "r1",
274 vec![
275 ("design", 0, 30, 5),
276 ("build", 0, 30, 8),
277 ("test", 0, 30, 3),
278 ],
279 None,
280 ),
281 )
282 .unwrap();
283
284 let result = engine.run(ctx).await.unwrap();
285 let facts = result.context.get(ContextKey::Strategies);
286 assert_eq!(facts.len(), 1);
287 let plan: SchedulePlan = serde_json::from_str(facts[0].content()).unwrap();
288 assert_eq!(plan.makespan, 16, "3 sequential tasks: 5+8+3=16");
289 assert_eq!(plan.scheduled.len(), 3);
290 }
291
292 #[tokio::test]
293 async fn result_is_idempotent() {
294 let mut engine = Engine::new();
295 engine.register_suggestor(WorkScheduleSuggestor);
296
297 let mut ctx = ContextState::new();
298 ctx.add_input(
299 ContextKey::Seeds,
300 "schedule-request:r1",
301 req_json("r1", vec![("a", 0, 20, 5), ("b", 0, 20, 3)], None),
302 )
303 .unwrap();
304
305 let first = engine.run(ctx).await.unwrap();
306 let mut engine2 = Engine::new();
307 engine2.register_suggestor(WorkScheduleSuggestor);
308 let second = engine2.run(first.context.clone()).await.unwrap();
309 assert_eq!(
310 second.context.get(ContextKey::Strategies).len(),
311 first.context.get(ContextKey::Strategies).len(),
312 );
313 }
314
315 #[tokio::test]
316 async fn malformed_request_emits_diagnostic() {
317 let mut engine = Engine::new();
318 engine.register_suggestor(WorkScheduleSuggestor);
319
320 let mut ctx = ContextState::new();
321 ctx.add_input(ContextKey::Seeds, "schedule-request:bad", "not-json")
322 .unwrap();
323
324 let result = engine.run(ctx).await.unwrap();
325 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
326 assert!(!result.context.has(ContextKey::Strategies));
327 }
328}