Skip to main content

converge_optimization/suggestors/
task_scheduling.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Greedy multi-agent task scheduling with skills and time windows.
5//!
6//! Reads a [`SchedulingRequest`] from context and proposes a
7//! [`SchedulingPlan`] under `ContextKey::Strategies`. This is the portable,
8//! pure Rust baseline for the stronger native CP-SAT scheduler in Ferrox.
9
10use async_trait::async_trait;
11use converge_pack::ProvenanceSource;
12use converge_pack::{
13    AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
14};
15use serde::{Deserialize, Serialize};
16use std::time::Instant;
17
18// -- Request -----------------------------------------------------------------
19
20/// An agent that can execute tasks requiring one of its declared capabilities.
21#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
22#[serde(deny_unknown_fields)]
23pub struct SchedulingAgent {
24    pub id: usize,
25    pub name: String,
26    pub capabilities: Vec<String>,
27}
28
29/// A unit of work to be scheduled.
30#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
31#[serde(deny_unknown_fields)]
32pub struct SchedulingTask {
33    pub id: usize,
34    pub name: String,
35    pub required_capability: String,
36    pub duration_min: i64,
37    pub release_min: i64,
38    pub deadline_min: i64,
39}
40
41/// Seed under [`ContextKey::Seeds`] with id prefix `"scheduling-request:"`.
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43#[serde(deny_unknown_fields)]
44pub struct SchedulingRequest {
45    pub id: String,
46    pub agents: Vec<SchedulingAgent>,
47    pub tasks: Vec<SchedulingTask>,
48    pub horizon_min: i64,
49    #[serde(default = "default_time_limit")]
50    pub time_limit_seconds: f64,
51}
52
53impl FactPayload for SchedulingRequest {
54    const FAMILY: &'static str = "converge.optimization.scheduling.request";
55    const VERSION: u16 = 1;
56}
57
58fn default_time_limit() -> f64 {
59    30.0
60}
61
62// -- Plan --------------------------------------------------------------------
63
64/// A single task-to-agent assignment with resolved timing.
65#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66#[serde(deny_unknown_fields)]
67pub struct TaskAssignment {
68    pub task_id: usize,
69    pub task_name: String,
70    pub agent_id: usize,
71    pub agent_name: String,
72    pub start_min: i64,
73    pub end_min: i64,
74}
75
76/// Written to [`ContextKey::Strategies`] with id prefix
77/// `"scheduling-plan-greedy:"`.
78#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
79#[serde(deny_unknown_fields)]
80pub struct SchedulingPlan {
81    pub request_id: String,
82    pub assignments: Vec<TaskAssignment>,
83    pub tasks_total: usize,
84    pub tasks_scheduled: usize,
85    pub makespan_min: i64,
86    pub solver: String,
87    pub status: String,
88    pub wall_time_seconds: f64,
89}
90
91impl FactPayload for SchedulingPlan {
92    const FAMILY: &'static str = "converge.optimization.scheduling.plan";
93    const VERSION: u16 = 1;
94}
95
96impl SchedulingPlan {
97    #[allow(clippy::cast_precision_loss)]
98    pub fn throughput_ratio(&self) -> f64 {
99        if self.tasks_total == 0 {
100            return 0.0;
101        }
102        self.tasks_scheduled as f64 / self.tasks_total as f64
103    }
104}
105
106// -- Suggestor ---------------------------------------------------------------
107
108const REQUEST_PREFIX: &str = "scheduling-request:";
109const PLAN_PREFIX: &str = "scheduling-plan-greedy:";
110const ERROR_PREFIX: &str = "scheduling-request-error:";
111
112/// Schedules tasks via earliest-deadline-first plus earliest-available skilled
113/// agent assignment.
114pub struct GreedySchedulerSuggestor;
115
116#[async_trait]
117impl Suggestor for GreedySchedulerSuggestor {
118    fn name(&self) -> &str {
119        "GreedySchedulerSuggestor"
120    }
121
122    fn dependencies(&self) -> &[ContextKey] {
123        &[ContextKey::Seeds]
124    }
125
126    fn complexity_hint(&self) -> Option<&'static str> {
127        Some("O(n*m*log n) EDF scheduling, n = tasks, m = agents")
128    }
129
130    fn accepts(&self, ctx: &dyn Context) -> bool {
131        ctx.get(ContextKey::Seeds).iter().any(|f| {
132            f.id().as_str().starts_with(REQUEST_PREFIX)
133                && match f.payload::<SchedulingRequest>() {
134                    Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
135                    None => !error_exists(ctx, f.id().as_str()),
136                }
137        })
138    }
139
140    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
141        let mut proposals = Vec::new();
142
143        for fact in ctx
144            .get(ContextKey::Seeds)
145            .iter()
146            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
147        {
148            match fact.payload::<SchedulingRequest>() {
149                Some(req) => {
150                    if plan_exists(ctx, req_id(fact.id().as_str())) {
151                        continue;
152                    }
153                    let plan = solve_greedy_schedule(req);
154                    let confidence = (plan.throughput_ratio() * 0.65).min(0.65);
155                    proposals.push(
156                        ProposedFact::new(
157                            ContextKey::Strategies,
158                            format!("{}{}", PLAN_PREFIX, plan.request_id),
159                            plan.clone(),
160                            self.name().to_string(),
161                        )
162                        .with_confidence(confidence),
163                    );
164                }
165                None => {
166                    if error_exists(ctx, fact.id().as_str()) {
167                        continue;
168                    }
169                    proposals.push(
170                        ProposedFact::new(
171                            ContextKey::Diagnostic,
172                            format!("{}{}", ERROR_PREFIX, fact.id()),
173                            DiagnosticPayload::new(
174                                self.name(),
175                                format!(
176                                    "malformed scheduling request '{}': expected {} v{} payload",
177                                    fact.id(),
178                                    SchedulingRequest::FAMILY,
179                                    SchedulingRequest::VERSION
180                                ),
181                            ),
182                            self.name().to_string(),
183                        )
184                        .with_confidence(1.0),
185                    );
186                }
187            }
188        }
189
190        if proposals.is_empty() {
191            AgentEffect::empty()
192        } else {
193            AgentEffect::with_proposals(proposals)
194        }
195    }
196
197    fn provenance(&self) -> &'static str {
198        super::CONVERGE_OPTIMIZATION_PROVENANCE.as_str()
199    }
200}
201
202// -- Core logic --------------------------------------------------------------
203
204/// Pure EDF plus earliest-available scheduling. No native solver dependency.
205pub fn solve_greedy_schedule(req: &SchedulingRequest) -> SchedulingPlan {
206    let t0 = Instant::now();
207
208    let mut ordered: Vec<_> = req.tasks.iter().collect();
209    ordered.sort_by_key(|t| (t.deadline_min, t.release_min, t.id));
210
211    let mut next_free = vec![0i64; req.agents.len()];
212    let mut assignments = Vec::new();
213
214    for task in &ordered {
215        if task.duration_min <= 0 || task.deadline_min < task.release_min + task.duration_min {
216            continue;
217        }
218
219        let best = req
220            .agents
221            .iter()
222            .enumerate()
223            .filter(|(_, agent)| {
224                agent
225                    .capabilities
226                    .iter()
227                    .any(|cap| cap == &task.required_capability)
228            })
229            .map(|(agent_idx, agent)| {
230                let start = next_free[agent_idx].max(task.release_min);
231                (agent_idx, agent, start)
232            })
233            .filter(|(_, _, start)| start + task.duration_min <= task.deadline_min)
234            .min_by_key(|(_, agent, start)| (*start, agent.id));
235
236        if let Some((agent_idx, agent, start)) = best {
237            let end = start + task.duration_min;
238            next_free[agent_idx] = end;
239            assignments.push(TaskAssignment {
240                task_id: task.id,
241                task_name: task.name.clone(),
242                agent_id: agent.id,
243                agent_name: agent.name.clone(),
244                start_min: start,
245                end_min: end,
246            });
247        }
248    }
249
250    assignments.sort_by_key(|a| (a.start_min, a.agent_id, a.task_id));
251    let makespan = assignments.iter().map(|a| a.end_min).max().unwrap_or(0);
252    let tasks_scheduled = assignments.len();
253    let status = if req.tasks.is_empty() || tasks_scheduled > 0 {
254        "feasible"
255    } else {
256        "infeasible"
257    };
258
259    SchedulingPlan {
260        request_id: req.id.clone(),
261        assignments,
262        tasks_total: req.tasks.len(),
263        tasks_scheduled,
264        makespan_min: makespan,
265        solver: "greedy-edf".to_string(),
266        status: status.to_string(),
267        wall_time_seconds: t0.elapsed().as_secs_f64(),
268    }
269}
270
271fn req_id(fact_id: &str) -> &str {
272    fact_id.trim_start_matches(REQUEST_PREFIX)
273}
274
275fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
276    let id = format!("{}{}", PLAN_PREFIX, request_id);
277    ctx.get(ContextKey::Strategies)
278        .iter()
279        .any(|f| f.id().as_str() == id)
280}
281
282fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
283    let id = format!("{}{}", ERROR_PREFIX, fact_id);
284    ctx.get(ContextKey::Diagnostic)
285        .iter()
286        .any(|f| f.id().as_str() == id)
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use converge_core::{ContextState, Engine};
293    use converge_pack::TextPayload;
294    use proptest::prelude::*;
295    use std::collections::BTreeMap;
296
297    fn agent(id: usize, name: &str, caps: &[&str]) -> SchedulingAgent {
298        SchedulingAgent {
299            id,
300            name: name.to_string(),
301            capabilities: caps.iter().map(|cap| (*cap).to_string()).collect(),
302        }
303    }
304
305    fn task(id: usize, cap: &str, duration: i64, release: i64, deadline: i64) -> SchedulingTask {
306        SchedulingTask {
307            id,
308            name: format!("task-{id}"),
309            required_capability: cap.to_string(),
310            duration_min: duration,
311            release_min: release,
312            deadline_min: deadline,
313        }
314    }
315
316    fn req(tasks: Vec<SchedulingTask>, agents: Vec<SchedulingAgent>) -> SchedulingRequest {
317        SchedulingRequest {
318            id: "sched-1".to_string(),
319            agents,
320            tasks,
321            horizon_min: 480,
322            time_limit_seconds: 1.0,
323        }
324    }
325
326    #[tokio::test]
327    async fn suggestor_emits_greedy_schedule() {
328        let request = req(
329            vec![task(1, "rust", 30, 0, 120), task(2, "rust", 30, 0, 120)],
330            vec![agent(10, "alice", &["rust"])],
331        );
332
333        let mut engine = Engine::new();
334        engine.register_suggestor(GreedySchedulerSuggestor);
335
336        let mut ctx = ContextState::new();
337        ctx.add_proposal(ProposedFact::new(
338            ContextKey::Seeds,
339            "scheduling-request:sched-1",
340            request,
341            "test",
342        ))
343        .unwrap();
344
345        let result = engine.run(ctx).await.unwrap();
346        let facts = result.context.get(ContextKey::Strategies);
347        assert_eq!(facts.len(), 1);
348        assert_eq!(facts[0].id().as_str(), "scheduling-plan-greedy:sched-1");
349        let plan = facts[0].require_payload::<SchedulingPlan>().unwrap();
350        assert_eq!(plan.tasks_scheduled, 2);
351        assert_eq!(plan.assignments[0].agent_id, 10);
352    }
353
354    #[tokio::test]
355    async fn malformed_request_emits_diagnostic() {
356        let mut engine = Engine::new();
357        engine.register_suggestor(GreedySchedulerSuggestor);
358
359        let mut ctx = ContextState::new();
360        ctx.add_proposal(ProposedFact::new(
361            ContextKey::Seeds,
362            "scheduling-request:bad",
363            TextPayload::new("not a scheduling request"),
364            "test",
365        ))
366        .unwrap();
367
368        let result = engine.run(ctx).await.unwrap();
369        assert!(result.context.get(ContextKey::Strategies).is_empty());
370        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
371    }
372
373    #[test]
374    fn non_dense_agent_ids_do_not_index_panic() {
375        let plan = solve_greedy_schedule(&req(
376            vec![task(1, "rust", 30, 0, 120)],
377            vec![agent(99, "alice", &["rust"])],
378        ));
379
380        assert_eq!(plan.tasks_scheduled, 1);
381        assert_eq!(plan.assignments[0].agent_id, 99);
382    }
383
384    proptest! {
385        #[test]
386        fn scheduled_tasks_respect_windows_and_agent_non_overlap(
387            durations in prop::collection::vec(1i64..20, 1..40)
388        ) {
389            let total_duration: i64 = durations.iter().sum();
390            let tasks: Vec<_> = durations
391                .iter()
392                .enumerate()
393                .map(|(i, duration)| task(i, "work", *duration, 0, total_duration + 100))
394                .collect();
395            let request = req(tasks.clone(), vec![agent(0, "a", &["work"]), agent(1, "b", &["work"])]);
396            let plan = solve_greedy_schedule(&request);
397
398            let by_task: BTreeMap<usize, &SchedulingTask> = tasks.iter().map(|t| (t.id, t)).collect();
399            let mut by_agent: BTreeMap<usize, Vec<&TaskAssignment>> = BTreeMap::new();
400
401            for assignment in &plan.assignments {
402                let original = by_task[&assignment.task_id];
403                prop_assert!(assignment.start_min >= original.release_min);
404                prop_assert!(assignment.end_min <= original.deadline_min);
405                prop_assert_eq!(assignment.end_min - assignment.start_min, original.duration_min);
406                by_agent.entry(assignment.agent_id).or_default().push(assignment);
407            }
408
409            for assignments in by_agent.values_mut() {
410                assignments.sort_by_key(|assignment| assignment.start_min);
411                for pair in assignments.windows(2) {
412                    prop_assert!(pair[0].end_min <= pair[1].start_min);
413                }
414            }
415        }
416    }
417}