1use async_trait::async_trait;
11use converge_pack::ProvenanceSource;
12use converge_pack::{
13 AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
14};
15use serde::{Deserialize, Serialize};
16use std::time::Instant;
17
18#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22#[serde(deny_unknown_fields)]
23pub struct SchedulingAgent {
24 pub id: usize,
25 pub name: String,
26 pub capabilities: Vec<String>,
27}
28
29#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
31#[serde(deny_unknown_fields)]
32pub struct SchedulingTask {
33 pub id: usize,
34 pub name: String,
35 pub required_capability: String,
36 pub duration_min: i64,
37 pub release_min: i64,
38 pub deadline_min: i64,
39}
40
41#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43#[serde(deny_unknown_fields)]
44pub struct SchedulingRequest {
45 pub id: String,
46 pub agents: Vec<SchedulingAgent>,
47 pub tasks: Vec<SchedulingTask>,
48 pub horizon_min: i64,
49 #[serde(default = "default_time_limit")]
50 pub time_limit_seconds: f64,
51}
52
53impl FactPayload for SchedulingRequest {
54 const FAMILY: &'static str = "converge.optimization.scheduling.request";
55 const VERSION: u16 = 1;
56}
57
58fn default_time_limit() -> f64 {
59 30.0
60}
61
62#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66#[serde(deny_unknown_fields)]
67pub struct TaskAssignment {
68 pub task_id: usize,
69 pub task_name: String,
70 pub agent_id: usize,
71 pub agent_name: String,
72 pub start_min: i64,
73 pub end_min: i64,
74}
75
76#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
79#[serde(deny_unknown_fields)]
80pub struct SchedulingPlan {
81 pub request_id: String,
82 pub assignments: Vec<TaskAssignment>,
83 pub tasks_total: usize,
84 pub tasks_scheduled: usize,
85 pub makespan_min: i64,
86 pub solver: String,
87 pub status: String,
88 pub wall_time_seconds: f64,
89}
90
91impl FactPayload for SchedulingPlan {
92 const FAMILY: &'static str = "converge.optimization.scheduling.plan";
93 const VERSION: u16 = 1;
94}
95
96impl SchedulingPlan {
97 #[allow(clippy::cast_precision_loss)]
98 pub fn throughput_ratio(&self) -> f64 {
99 if self.tasks_total == 0 {
100 return 0.0;
101 }
102 self.tasks_scheduled as f64 / self.tasks_total as f64
103 }
104}
105
106const REQUEST_PREFIX: &str = "scheduling-request:";
109const PLAN_PREFIX: &str = "scheduling-plan-greedy:";
110const ERROR_PREFIX: &str = "scheduling-request-error:";
111
112pub struct GreedySchedulerSuggestor;
115
116#[async_trait]
117impl Suggestor for GreedySchedulerSuggestor {
118 fn name(&self) -> &str {
119 "GreedySchedulerSuggestor"
120 }
121
122 fn dependencies(&self) -> &[ContextKey] {
123 &[ContextKey::Seeds]
124 }
125
126 fn complexity_hint(&self) -> Option<&'static str> {
127 Some("O(n*m*log n) EDF scheduling, n = tasks, m = agents")
128 }
129
130 fn accepts(&self, ctx: &dyn Context) -> bool {
131 ctx.get(ContextKey::Seeds).iter().any(|f| {
132 f.id().as_str().starts_with(REQUEST_PREFIX)
133 && match f.payload::<SchedulingRequest>() {
134 Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
135 None => !error_exists(ctx, f.id().as_str()),
136 }
137 })
138 }
139
140 async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
141 let mut proposals = Vec::new();
142
143 for fact in ctx
144 .get(ContextKey::Seeds)
145 .iter()
146 .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
147 {
148 match fact.payload::<SchedulingRequest>() {
149 Some(req) => {
150 if plan_exists(ctx, req_id(fact.id().as_str())) {
151 continue;
152 }
153 let plan = solve_greedy_schedule(req);
154 let confidence = (plan.throughput_ratio() * 0.65).min(0.65);
155 proposals.push(
156 ProposedFact::new(
157 ContextKey::Strategies,
158 format!("{}{}", PLAN_PREFIX, plan.request_id),
159 plan.clone(),
160 self.name().to_string(),
161 )
162 .with_confidence(confidence),
163 );
164 }
165 None => {
166 if error_exists(ctx, fact.id().as_str()) {
167 continue;
168 }
169 proposals.push(
170 ProposedFact::new(
171 ContextKey::Diagnostic,
172 format!("{}{}", ERROR_PREFIX, fact.id()),
173 DiagnosticPayload::new(
174 self.name(),
175 format!(
176 "malformed scheduling request '{}': expected {} v{} payload",
177 fact.id(),
178 SchedulingRequest::FAMILY,
179 SchedulingRequest::VERSION
180 ),
181 ),
182 self.name().to_string(),
183 )
184 .with_confidence(1.0),
185 );
186 }
187 }
188 }
189
190 if proposals.is_empty() {
191 AgentEffect::empty()
192 } else {
193 AgentEffect::with_proposals(proposals)
194 }
195 }
196
197 fn provenance(&self) -> &'static str {
198 super::CONVERGE_OPTIMIZATION_PROVENANCE.as_str()
199 }
200}
201
202pub fn solve_greedy_schedule(req: &SchedulingRequest) -> SchedulingPlan {
206 let t0 = Instant::now();
207
208 let mut ordered: Vec<_> = req.tasks.iter().collect();
209 ordered.sort_by_key(|t| (t.deadline_min, t.release_min, t.id));
210
211 let mut next_free = vec![0i64; req.agents.len()];
212 let mut assignments = Vec::new();
213
214 for task in &ordered {
215 if task.duration_min <= 0 || task.deadline_min < task.release_min + task.duration_min {
216 continue;
217 }
218
219 let best = req
220 .agents
221 .iter()
222 .enumerate()
223 .filter(|(_, agent)| {
224 agent
225 .capabilities
226 .iter()
227 .any(|cap| cap == &task.required_capability)
228 })
229 .map(|(agent_idx, agent)| {
230 let start = next_free[agent_idx].max(task.release_min);
231 (agent_idx, agent, start)
232 })
233 .filter(|(_, _, start)| start + task.duration_min <= task.deadline_min)
234 .min_by_key(|(_, agent, start)| (*start, agent.id));
235
236 if let Some((agent_idx, agent, start)) = best {
237 let end = start + task.duration_min;
238 next_free[agent_idx] = end;
239 assignments.push(TaskAssignment {
240 task_id: task.id,
241 task_name: task.name.clone(),
242 agent_id: agent.id,
243 agent_name: agent.name.clone(),
244 start_min: start,
245 end_min: end,
246 });
247 }
248 }
249
250 assignments.sort_by_key(|a| (a.start_min, a.agent_id, a.task_id));
251 let makespan = assignments.iter().map(|a| a.end_min).max().unwrap_or(0);
252 let tasks_scheduled = assignments.len();
253 let status = if req.tasks.is_empty() || tasks_scheduled > 0 {
254 "feasible"
255 } else {
256 "infeasible"
257 };
258
259 SchedulingPlan {
260 request_id: req.id.clone(),
261 assignments,
262 tasks_total: req.tasks.len(),
263 tasks_scheduled,
264 makespan_min: makespan,
265 solver: "greedy-edf".to_string(),
266 status: status.to_string(),
267 wall_time_seconds: t0.elapsed().as_secs_f64(),
268 }
269}
270
271fn req_id(fact_id: &str) -> &str {
272 fact_id.trim_start_matches(REQUEST_PREFIX)
273}
274
275fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
276 let id = format!("{}{}", PLAN_PREFIX, request_id);
277 ctx.get(ContextKey::Strategies)
278 .iter()
279 .any(|f| f.id().as_str() == id)
280}
281
282fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
283 let id = format!("{}{}", ERROR_PREFIX, fact_id);
284 ctx.get(ContextKey::Diagnostic)
285 .iter()
286 .any(|f| f.id().as_str() == id)
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use converge_core::{ContextState, Engine};
293 use converge_pack::TextPayload;
294 use proptest::prelude::*;
295 use std::collections::BTreeMap;
296
297 fn agent(id: usize, name: &str, caps: &[&str]) -> SchedulingAgent {
298 SchedulingAgent {
299 id,
300 name: name.to_string(),
301 capabilities: caps.iter().map(|cap| (*cap).to_string()).collect(),
302 }
303 }
304
305 fn task(id: usize, cap: &str, duration: i64, release: i64, deadline: i64) -> SchedulingTask {
306 SchedulingTask {
307 id,
308 name: format!("task-{id}"),
309 required_capability: cap.to_string(),
310 duration_min: duration,
311 release_min: release,
312 deadline_min: deadline,
313 }
314 }
315
316 fn req(tasks: Vec<SchedulingTask>, agents: Vec<SchedulingAgent>) -> SchedulingRequest {
317 SchedulingRequest {
318 id: "sched-1".to_string(),
319 agents,
320 tasks,
321 horizon_min: 480,
322 time_limit_seconds: 1.0,
323 }
324 }
325
326 #[tokio::test]
327 async fn suggestor_emits_greedy_schedule() {
328 let request = req(
329 vec![task(1, "rust", 30, 0, 120), task(2, "rust", 30, 0, 120)],
330 vec![agent(10, "alice", &["rust"])],
331 );
332
333 let mut engine = Engine::new();
334 engine.register_suggestor(GreedySchedulerSuggestor);
335
336 let mut ctx = ContextState::new();
337 ctx.add_proposal(ProposedFact::new(
338 ContextKey::Seeds,
339 "scheduling-request:sched-1",
340 request,
341 "test",
342 ))
343 .unwrap();
344
345 let result = engine.run(ctx).await.unwrap();
346 let facts = result.context.get(ContextKey::Strategies);
347 assert_eq!(facts.len(), 1);
348 assert_eq!(facts[0].id().as_str(), "scheduling-plan-greedy:sched-1");
349 let plan = facts[0].require_payload::<SchedulingPlan>().unwrap();
350 assert_eq!(plan.tasks_scheduled, 2);
351 assert_eq!(plan.assignments[0].agent_id, 10);
352 }
353
354 #[tokio::test]
355 async fn malformed_request_emits_diagnostic() {
356 let mut engine = Engine::new();
357 engine.register_suggestor(GreedySchedulerSuggestor);
358
359 let mut ctx = ContextState::new();
360 ctx.add_proposal(ProposedFact::new(
361 ContextKey::Seeds,
362 "scheduling-request:bad",
363 TextPayload::new("not a scheduling request"),
364 "test",
365 ))
366 .unwrap();
367
368 let result = engine.run(ctx).await.unwrap();
369 assert!(result.context.get(ContextKey::Strategies).is_empty());
370 assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
371 }
372
373 #[test]
374 fn non_dense_agent_ids_do_not_index_panic() {
375 let plan = solve_greedy_schedule(&req(
376 vec![task(1, "rust", 30, 0, 120)],
377 vec![agent(99, "alice", &["rust"])],
378 ));
379
380 assert_eq!(plan.tasks_scheduled, 1);
381 assert_eq!(plan.assignments[0].agent_id, 99);
382 }
383
384 proptest! {
385 #[test]
386 fn scheduled_tasks_respect_windows_and_agent_non_overlap(
387 durations in prop::collection::vec(1i64..20, 1..40)
388 ) {
389 let total_duration: i64 = durations.iter().sum();
390 let tasks: Vec<_> = durations
391 .iter()
392 .enumerate()
393 .map(|(i, duration)| task(i, "work", *duration, 0, total_duration + 100))
394 .collect();
395 let request = req(tasks.clone(), vec![agent(0, "a", &["work"]), agent(1, "b", &["work"])]);
396 let plan = solve_greedy_schedule(&request);
397
398 let by_task: BTreeMap<usize, &SchedulingTask> = tasks.iter().map(|t| (t.id, t)).collect();
399 let mut by_agent: BTreeMap<usize, Vec<&TaskAssignment>> = BTreeMap::new();
400
401 for assignment in &plan.assignments {
402 let original = by_task[&assignment.task_id];
403 prop_assert!(assignment.start_min >= original.release_min);
404 prop_assert!(assignment.end_min <= original.deadline_min);
405 prop_assert_eq!(assignment.end_min - assignment.start_min, original.duration_min);
406 by_agent.entry(assignment.agent_id).or_default().push(assignment);
407 }
408
409 for assignments in by_agent.values_mut() {
410 assignments.sort_by_key(|assignment| assignment.start_min);
411 for pair in assignments.windows(2) {
412 prop_assert!(pair[0].end_min <= pair[1].start_min);
413 }
414 }
415 }
416 }
417}