1use async_trait::async_trait;
11use converge_pack::Provenance;
12use converge_pack::ProvenanceSource;
13use converge_pack::{
14 AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
15};
16use serde::{Deserialize, Serialize};
17use std::time::Instant;
18
19#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23#[serde(deny_unknown_fields)]
24pub struct SchedulingAgent {
25 pub id: usize,
26 pub name: String,
27 pub capabilities: Vec<String>,
28}
29
30#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(deny_unknown_fields)]
33pub struct SchedulingTask {
34 pub id: usize,
35 pub name: String,
36 pub required_capability: String,
37 pub duration_min: i64,
38 pub release_min: i64,
39 pub deadline_min: i64,
40}
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44#[serde(deny_unknown_fields)]
45pub struct SchedulingRequest {
46 pub id: String,
47 pub agents: Vec<SchedulingAgent>,
48 pub tasks: Vec<SchedulingTask>,
49 pub horizon_min: i64,
50 #[serde(default = "default_time_limit")]
51 pub time_limit_seconds: f64,
52}
53
54impl FactPayload for SchedulingRequest {
55 const FAMILY: &'static str = "converge.optimization.scheduling.request";
56 const VERSION: u16 = 1;
57}
58
59fn default_time_limit() -> f64 {
60 30.0
61}
62
63#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
67#[serde(deny_unknown_fields)]
68pub struct TaskAssignment {
69 pub task_id: usize,
70 pub task_name: String,
71 pub agent_id: usize,
72 pub agent_name: String,
73 pub start_min: i64,
74 pub end_min: i64,
75}
76
77#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
80#[serde(deny_unknown_fields)]
81pub struct SchedulingPlan {
82 pub request_id: String,
83 pub assignments: Vec<TaskAssignment>,
84 pub tasks_total: usize,
85 pub tasks_scheduled: usize,
86 pub makespan_min: i64,
87 pub solver: String,
88 pub status: String,
89 pub wall_time_seconds: f64,
90}
91
92impl FactPayload for SchedulingPlan {
93 const FAMILY: &'static str = "converge.optimization.scheduling.plan";
94 const VERSION: u16 = 1;
95}
96
97impl SchedulingPlan {
98 #[allow(clippy::cast_precision_loss)]
99 pub fn throughput_ratio(&self) -> f64 {
100 if self.tasks_total == 0 {
101 return 0.0;
102 }
103 self.tasks_scheduled as f64 / self.tasks_total as f64
104 }
105}
106
107const REQUEST_PREFIX: &str = "scheduling-request:";
110const PLAN_PREFIX: &str = "scheduling-plan-greedy:";
111const ERROR_PREFIX: &str = "scheduling-request-error:";
112
113pub struct GreedySchedulerSuggestor;
116
117#[async_trait]
118impl Suggestor for GreedySchedulerSuggestor {
119 fn name(&self) -> &str {
120 "GreedySchedulerSuggestor"
121 }
122
123 fn dependencies(&self) -> &[ContextKey] {
124 &[ContextKey::Seeds]
125 }
126
127 fn complexity_hint(&self) -> Option<&'static str> {
128 Some("O(n*m*log n) EDF scheduling, n = tasks, m = agents")
129 }
130
131 fn accepts(&self, ctx: &dyn Context) -> bool {
132 ctx.get(ContextKey::Seeds).iter().any(|f| {
133 f.id().as_str().starts_with(REQUEST_PREFIX)
134 && match f.payload::<SchedulingRequest>() {
135 Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
136 None => !error_exists(ctx, f.id().as_str()),
137 }
138 })
139 }
140
141 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
142 let mut proposals = Vec::new();
143
144 for fact in ctx
145 .get(ContextKey::Seeds)
146 .iter()
147 .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
148 {
149 match fact.payload::<SchedulingRequest>() {
150 Some(req) => {
151 if plan_exists(ctx, req_id(fact.id().as_str())) {
152 continue;
153 }
154 let plan = solve_greedy_schedule(req);
155 let confidence = (plan.throughput_ratio() * 0.65).min(0.65);
156 proposals.push(
157 ProposedFact::new(
158 ContextKey::Strategies,
159 format!("{}{}", PLAN_PREFIX, plan.request_id),
160 plan.clone(),
161 self.provenance(),
162 )
163 .with_confidence(confidence),
164 );
165 }
166 None => {
167 if error_exists(ctx, fact.id().as_str()) {
168 continue;
169 }
170 proposals.push(
171 ProposedFact::new(
172 ContextKey::Diagnostic,
173 format!("{}{}", ERROR_PREFIX, fact.id()),
174 DiagnosticPayload::new(
175 self.name(),
176 format!(
177 "malformed scheduling request '{}': expected {} v{} payload",
178 fact.id(),
179 SchedulingRequest::FAMILY,
180 SchedulingRequest::VERSION
181 ),
182 ),
183 self.provenance(),
184 )
185 .with_confidence(1.0),
186 );
187 }
188 }
189 }
190
191 if proposals.is_empty() {
192 AgentEffect::empty()
193 } else {
194 AgentEffect::with_proposals(proposals)
195 }
196 }
197
198 fn provenance(&self) -> Provenance {
199 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE.provenance()
200 }
201}
202
203pub fn solve_greedy_schedule(req: &SchedulingRequest) -> SchedulingPlan {
207 let t0 = Instant::now();
208
209 let mut ordered: Vec<_> = req.tasks.iter().collect();
210 ordered.sort_by_key(|t| (t.deadline_min, t.release_min, t.id));
211
212 let mut next_free = vec![0i64; req.agents.len()];
213 let mut assignments = Vec::new();
214
215 for task in &ordered {
216 if task.duration_min <= 0 || task.deadline_min < task.release_min + task.duration_min {
217 continue;
218 }
219
220 let best = req
221 .agents
222 .iter()
223 .enumerate()
224 .filter(|(_, agent)| {
225 agent
226 .capabilities
227 .iter()
228 .any(|cap| cap == &task.required_capability)
229 })
230 .map(|(agent_idx, agent)| {
231 let start = next_free[agent_idx].max(task.release_min);
232 (agent_idx, agent, start)
233 })
234 .filter(|(_, _, start)| start + task.duration_min <= task.deadline_min)
235 .min_by_key(|(_, agent, start)| (*start, agent.id));
236
237 if let Some((agent_idx, agent, start)) = best {
238 let end = start + task.duration_min;
239 next_free[agent_idx] = end;
240 assignments.push(TaskAssignment {
241 task_id: task.id,
242 task_name: task.name.clone(),
243 agent_id: agent.id,
244 agent_name: agent.name.clone(),
245 start_min: start,
246 end_min: end,
247 });
248 }
249 }
250
251 assignments.sort_by_key(|a| (a.start_min, a.agent_id, a.task_id));
252 let makespan = assignments.iter().map(|a| a.end_min).max().unwrap_or(0);
253 let tasks_scheduled = assignments.len();
254 let status = if req.tasks.is_empty() || tasks_scheduled > 0 {
255 "feasible"
256 } else {
257 "infeasible"
258 };
259
260 SchedulingPlan {
261 request_id: req.id.clone(),
262 assignments,
263 tasks_total: req.tasks.len(),
264 tasks_scheduled,
265 makespan_min: makespan,
266 solver: "greedy-edf".to_string(),
267 status: status.to_string(),
268 wall_time_seconds: t0.elapsed().as_secs_f64(),
269 }
270}
271
272fn req_id(fact_id: &str) -> &str {
273 fact_id.trim_start_matches(REQUEST_PREFIX)
274}
275
276fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
277 let id = format!("{}{}", PLAN_PREFIX, request_id);
278 ctx.get(ContextKey::Strategies)
279 .iter()
280 .any(|f| f.id().as_str() == id)
281}
282
283fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
284 let id = format!("{}{}", ERROR_PREFIX, fact_id);
285 ctx.get(ContextKey::Diagnostic)
286 .iter()
287 .any(|f| f.id().as_str() == id)
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use converge_core::{ContextState, Engine};
294 use converge_pack::TextPayload;
295 use proptest::prelude::*;
296 use std::collections::BTreeMap;
297
298 fn agent(id: usize, name: &str, caps: &[&str]) -> SchedulingAgent {
299 SchedulingAgent {
300 id,
301 name: name.to_string(),
302 capabilities: caps.iter().map(|cap| (*cap).to_string()).collect(),
303 }
304 }
305
306 fn task(id: usize, cap: &str, duration: i64, release: i64, deadline: i64) -> SchedulingTask {
307 SchedulingTask {
308 id,
309 name: format!("task-{id}"),
310 required_capability: cap.to_string(),
311 duration_min: duration,
312 release_min: release,
313 deadline_min: deadline,
314 }
315 }
316
317 fn req(tasks: Vec<SchedulingTask>, agents: Vec<SchedulingAgent>) -> SchedulingRequest {
318 SchedulingRequest {
319 id: "sched-1".to_string(),
320 agents,
321 tasks,
322 horizon_min: 480,
323 time_limit_seconds: 1.0,
324 }
325 }
326
327 #[tokio::test]
328 async fn suggestor_emits_greedy_schedule() {
329 let request = req(
330 vec![task(1, "rust", 30, 0, 120), task(2, "rust", 30, 0, 120)],
331 vec![agent(10, "alice", &["rust"])],
332 );
333
334 let mut engine = Engine::new();
335 engine.register_suggestor(GreedySchedulerSuggestor);
336
337 let mut ctx = ContextState::new();
338 ctx.add_proposal(ProposedFact::new(
339 ContextKey::Seeds,
340 "scheduling-request:sched-1",
341 request,
342 converge_pack::ProvenanceSource::provenance(
343 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
344 ),
345 ))
346 .unwrap();
347
348 let result = engine.run(ctx).await.unwrap();
349 let facts = result.context.get(ContextKey::Strategies);
350 assert_eq!(facts.len(), 1);
351 assert_eq!(facts[0].id().as_str(), "scheduling-plan-greedy:sched-1");
352 let plan = facts[0].require_payload::<SchedulingPlan>().unwrap();
353 assert_eq!(plan.tasks_scheduled, 2);
354 assert_eq!(plan.assignments[0].agent_id, 10);
355 }
356
357 #[tokio::test]
358 async fn malformed_request_emits_diagnostic() {
359 let mut engine = Engine::new();
360 engine.register_suggestor(GreedySchedulerSuggestor);
361
362 let mut ctx = ContextState::new();
363 ctx.add_proposal(ProposedFact::new(
364 ContextKey::Seeds,
365 "scheduling-request:bad",
366 TextPayload::new("not a scheduling request"),
367 converge_pack::ProvenanceSource::provenance(
368 crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
369 ),
370 ))
371 .unwrap();
372
373 let result = engine.run(ctx).await.unwrap();
374 assert!(result.context.get(ContextKey::Strategies).is_empty());
375 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
376 }
377
378 #[test]
379 fn non_dense_agent_ids_do_not_index_panic() {
380 let plan = solve_greedy_schedule(&req(
381 vec![task(1, "rust", 30, 0, 120)],
382 vec![agent(99, "alice", &["rust"])],
383 ));
384
385 assert_eq!(plan.tasks_scheduled, 1);
386 assert_eq!(plan.assignments[0].agent_id, 99);
387 }
388
389 proptest! {
390 #[test]
391 fn scheduled_tasks_respect_windows_and_agent_non_overlap(
392 durations in prop::collection::vec(1i64..20, 1..40)
393 ) {
394 let total_duration: i64 = durations.iter().sum();
395 let tasks: Vec<_> = durations
396 .iter()
397 .enumerate()
398 .map(|(i, duration)| task(i, "work", *duration, 0, total_duration + 100))
399 .collect();
400 let request = req(tasks.clone(), vec![agent(0, "a", &["work"]), agent(1, "b", &["work"])]);
401 let plan = solve_greedy_schedule(&request);
402
403 let by_task: BTreeMap<usize, &SchedulingTask> = tasks.iter().map(|t| (t.id, t)).collect();
404 let mut by_agent: BTreeMap<usize, Vec<&TaskAssignment>> = BTreeMap::new();
405
406 for assignment in &plan.assignments {
407 let original = by_task[&assignment.task_id];
408 prop_assert!(assignment.start_min >= original.release_min);
409 prop_assert!(assignment.end_min <= original.deadline_min);
410 prop_assert_eq!(assignment.end_min - assignment.start_min, original.duration_min);
411 by_agent.entry(assignment.agent_id).or_default().push(assignment);
412 }
413
414 for assignments in by_agent.values_mut() {
415 assignments.sort_by_key(|assignment| assignment.start_min);
416 for pair in assignments.windows(2) {
417 prop_assert!(pair[0].end_min <= pair[1].start_min);
418 }
419 }
420 }
421 }
422}