Skip to main content

converge_optimization/suggestors/
task_scheduling.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Greedy multi-agent task scheduling with skills and time windows.
5//!
6//! Reads a [`SchedulingRequest`] from context and proposes a
7//! [`SchedulingPlan`] under `ContextKey::Strategies`. This is the portable,
8//! pure Rust baseline for the stronger native CP-SAT scheduler in Ferrox.
9
10use async_trait::async_trait;
11use converge_pack::Provenance;
12use converge_pack::ProvenanceSource;
13use converge_pack::{
14    AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
15};
16use serde::{Deserialize, Serialize};
17use std::time::Instant;
18
19// -- Request -----------------------------------------------------------------
20
21/// An agent that can execute tasks requiring one of its declared capabilities.
22#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
23#[serde(deny_unknown_fields)]
24pub struct SchedulingAgent {
25    pub id: usize,
26    pub name: String,
27    pub capabilities: Vec<String>,
28}
29
30/// A unit of work to be scheduled.
31#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
32#[serde(deny_unknown_fields)]
33pub struct SchedulingTask {
34    pub id: usize,
35    pub name: String,
36    pub required_capability: String,
37    pub duration_min: i64,
38    pub release_min: i64,
39    pub deadline_min: i64,
40}
41
42/// Seed under [`ContextKey::Seeds`] with id prefix `"scheduling-request:"`.
43#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44#[serde(deny_unknown_fields)]
45pub struct SchedulingRequest {
46    pub id: String,
47    pub agents: Vec<SchedulingAgent>,
48    pub tasks: Vec<SchedulingTask>,
49    pub horizon_min: i64,
50    #[serde(default = "default_time_limit")]
51    pub time_limit_seconds: f64,
52}
53
54impl FactPayload for SchedulingRequest {
55    const FAMILY: &'static str = "converge.optimization.scheduling.request";
56    const VERSION: u16 = 1;
57}
58
59fn default_time_limit() -> f64 {
60    30.0
61}
62
63// -- Plan --------------------------------------------------------------------
64
65/// A single task-to-agent assignment with resolved timing.
66#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
67#[serde(deny_unknown_fields)]
68pub struct TaskAssignment {
69    pub task_id: usize,
70    pub task_name: String,
71    pub agent_id: usize,
72    pub agent_name: String,
73    pub start_min: i64,
74    pub end_min: i64,
75}
76
77/// Written to [`ContextKey::Strategies`] with id prefix
78/// `"scheduling-plan-greedy:"`.
79#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
80#[serde(deny_unknown_fields)]
81pub struct SchedulingPlan {
82    pub request_id: String,
83    pub assignments: Vec<TaskAssignment>,
84    pub tasks_total: usize,
85    pub tasks_scheduled: usize,
86    pub makespan_min: i64,
87    pub solver: String,
88    pub status: String,
89    pub wall_time_seconds: f64,
90}
91
92impl FactPayload for SchedulingPlan {
93    const FAMILY: &'static str = "converge.optimization.scheduling.plan";
94    const VERSION: u16 = 1;
95}
96
97impl SchedulingPlan {
98    #[allow(clippy::cast_precision_loss)]
99    pub fn throughput_ratio(&self) -> f64 {
100        if self.tasks_total == 0 {
101            return 0.0;
102        }
103        self.tasks_scheduled as f64 / self.tasks_total as f64
104    }
105}
106
107// -- Suggestor ---------------------------------------------------------------
108
109const REQUEST_PREFIX: &str = "scheduling-request:";
110const PLAN_PREFIX: &str = "scheduling-plan-greedy:";
111const ERROR_PREFIX: &str = "scheduling-request-error:";
112
113/// Schedules tasks via earliest-deadline-first plus earliest-available skilled
114/// agent assignment.
115pub struct GreedySchedulerSuggestor;
116
117#[async_trait]
118impl Suggestor for GreedySchedulerSuggestor {
119    fn name(&self) -> &str {
120        "GreedySchedulerSuggestor"
121    }
122
123    fn dependencies(&self) -> &[ContextKey] {
124        &[ContextKey::Seeds]
125    }
126
127    fn complexity_hint(&self) -> Option<&'static str> {
128        Some("O(n*m*log n) EDF scheduling, n = tasks, m = agents")
129    }
130
131    fn accepts(&self, ctx: &dyn Context) -> bool {
132        ctx.get(ContextKey::Seeds).iter().any(|f| {
133            f.id().as_str().starts_with(REQUEST_PREFIX)
134                && match f.payload::<SchedulingRequest>() {
135                    Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
136                    None => !error_exists(ctx, f.id().as_str()),
137                }
138        })
139    }
140
141    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
142        let mut proposals = Vec::new();
143
144        for fact in ctx
145            .get(ContextKey::Seeds)
146            .iter()
147            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
148        {
149            match fact.payload::<SchedulingRequest>() {
150                Some(req) => {
151                    if plan_exists(ctx, req_id(fact.id().as_str())) {
152                        continue;
153                    }
154                    let plan = solve_greedy_schedule(req);
155                    let confidence = (plan.throughput_ratio() * 0.65).min(0.65);
156                    proposals.push(
157                        ProposedFact::new(
158                            ContextKey::Strategies,
159                            format!("{}{}", PLAN_PREFIX, plan.request_id),
160                            plan.clone(),
161                            self.provenance(),
162                        )
163                        .with_confidence(confidence),
164                    );
165                }
166                None => {
167                    if error_exists(ctx, fact.id().as_str()) {
168                        continue;
169                    }
170                    proposals.push(
171                        ProposedFact::new(
172                            ContextKey::Diagnostic,
173                            format!("{}{}", ERROR_PREFIX, fact.id()),
174                            DiagnosticPayload::new(
175                                self.name(),
176                                format!(
177                                    "malformed scheduling request '{}': expected {} v{} payload",
178                                    fact.id(),
179                                    SchedulingRequest::FAMILY,
180                                    SchedulingRequest::VERSION
181                                ),
182                            ),
183                            self.provenance(),
184                        )
185                        .with_confidence(1.0),
186                    );
187                }
188            }
189        }
190
191        if proposals.is_empty() {
192            AgentEffect::empty()
193        } else {
194            AgentEffect::with_proposals(proposals)
195        }
196    }
197
198    fn provenance(&self) -> Provenance {
199        crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE.provenance()
200    }
201}
202
203// -- Core logic --------------------------------------------------------------
204
205/// Pure EDF plus earliest-available scheduling. No native solver dependency.
206pub fn solve_greedy_schedule(req: &SchedulingRequest) -> SchedulingPlan {
207    let t0 = Instant::now();
208
209    let mut ordered: Vec<_> = req.tasks.iter().collect();
210    ordered.sort_by_key(|t| (t.deadline_min, t.release_min, t.id));
211
212    let mut next_free = vec![0i64; req.agents.len()];
213    let mut assignments = Vec::new();
214
215    for task in &ordered {
216        if task.duration_min <= 0 || task.deadline_min < task.release_min + task.duration_min {
217            continue;
218        }
219
220        let best = req
221            .agents
222            .iter()
223            .enumerate()
224            .filter(|(_, agent)| {
225                agent
226                    .capabilities
227                    .iter()
228                    .any(|cap| cap == &task.required_capability)
229            })
230            .map(|(agent_idx, agent)| {
231                let start = next_free[agent_idx].max(task.release_min);
232                (agent_idx, agent, start)
233            })
234            .filter(|(_, _, start)| start + task.duration_min <= task.deadline_min)
235            .min_by_key(|(_, agent, start)| (*start, agent.id));
236
237        if let Some((agent_idx, agent, start)) = best {
238            let end = start + task.duration_min;
239            next_free[agent_idx] = end;
240            assignments.push(TaskAssignment {
241                task_id: task.id,
242                task_name: task.name.clone(),
243                agent_id: agent.id,
244                agent_name: agent.name.clone(),
245                start_min: start,
246                end_min: end,
247            });
248        }
249    }
250
251    assignments.sort_by_key(|a| (a.start_min, a.agent_id, a.task_id));
252    let makespan = assignments.iter().map(|a| a.end_min).max().unwrap_or(0);
253    let tasks_scheduled = assignments.len();
254    let status = if req.tasks.is_empty() || tasks_scheduled > 0 {
255        "feasible"
256    } else {
257        "infeasible"
258    };
259
260    SchedulingPlan {
261        request_id: req.id.clone(),
262        assignments,
263        tasks_total: req.tasks.len(),
264        tasks_scheduled,
265        makespan_min: makespan,
266        solver: "greedy-edf".to_string(),
267        status: status.to_string(),
268        wall_time_seconds: t0.elapsed().as_secs_f64(),
269    }
270}
271
272fn req_id(fact_id: &str) -> &str {
273    fact_id.trim_start_matches(REQUEST_PREFIX)
274}
275
276fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
277    let id = format!("{}{}", PLAN_PREFIX, request_id);
278    ctx.get(ContextKey::Strategies)
279        .iter()
280        .any(|f| f.id().as_str() == id)
281}
282
283fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
284    let id = format!("{}{}", ERROR_PREFIX, fact_id);
285    ctx.get(ContextKey::Diagnostic)
286        .iter()
287        .any(|f| f.id().as_str() == id)
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use converge_core::{ContextState, Engine};
294    use converge_pack::TextPayload;
295    use proptest::prelude::*;
296    use std::collections::BTreeMap;
297
298    fn agent(id: usize, name: &str, caps: &[&str]) -> SchedulingAgent {
299        SchedulingAgent {
300            id,
301            name: name.to_string(),
302            capabilities: caps.iter().map(|cap| (*cap).to_string()).collect(),
303        }
304    }
305
306    fn task(id: usize, cap: &str, duration: i64, release: i64, deadline: i64) -> SchedulingTask {
307        SchedulingTask {
308            id,
309            name: format!("task-{id}"),
310            required_capability: cap.to_string(),
311            duration_min: duration,
312            release_min: release,
313            deadline_min: deadline,
314        }
315    }
316
317    fn req(tasks: Vec<SchedulingTask>, agents: Vec<SchedulingAgent>) -> SchedulingRequest {
318        SchedulingRequest {
319            id: "sched-1".to_string(),
320            agents,
321            tasks,
322            horizon_min: 480,
323            time_limit_seconds: 1.0,
324        }
325    }
326
327    #[tokio::test]
328    async fn suggestor_emits_greedy_schedule() {
329        let request = req(
330            vec![task(1, "rust", 30, 0, 120), task(2, "rust", 30, 0, 120)],
331            vec![agent(10, "alice", &["rust"])],
332        );
333
334        let mut engine = Engine::new();
335        engine.register_suggestor(GreedySchedulerSuggestor);
336
337        let mut ctx = ContextState::new();
338        ctx.add_proposal(ProposedFact::new(
339            ContextKey::Seeds,
340            "scheduling-request:sched-1",
341            request,
342            converge_pack::ProvenanceSource::provenance(
343                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
344            ),
345        ))
346        .unwrap();
347
348        let result = engine.run(ctx).await.unwrap();
349        let facts = result.context.get(ContextKey::Strategies);
350        assert_eq!(facts.len(), 1);
351        assert_eq!(facts[0].id().as_str(), "scheduling-plan-greedy:sched-1");
352        let plan = facts[0].require_payload::<SchedulingPlan>().unwrap();
353        assert_eq!(plan.tasks_scheduled, 2);
354        assert_eq!(plan.assignments[0].agent_id, 10);
355    }
356
357    #[tokio::test]
358    async fn malformed_request_emits_diagnostic() {
359        let mut engine = Engine::new();
360        engine.register_suggestor(GreedySchedulerSuggestor);
361
362        let mut ctx = ContextState::new();
363        ctx.add_proposal(ProposedFact::new(
364            ContextKey::Seeds,
365            "scheduling-request:bad",
366            TextPayload::new("not a scheduling request"),
367            converge_pack::ProvenanceSource::provenance(
368                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
369            ),
370        ))
371        .unwrap();
372
373        let result = engine.run(ctx).await.unwrap();
374        assert!(result.context.get(ContextKey::Strategies).is_empty());
375        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
376    }
377
378    #[test]
379    fn non_dense_agent_ids_do_not_index_panic() {
380        let plan = solve_greedy_schedule(&req(
381            vec![task(1, "rust", 30, 0, 120)],
382            vec![agent(99, "alice", &["rust"])],
383        ));
384
385        assert_eq!(plan.tasks_scheduled, 1);
386        assert_eq!(plan.assignments[0].agent_id, 99);
387    }
388
389    proptest! {
390        #[test]
391        fn scheduled_tasks_respect_windows_and_agent_non_overlap(
392            durations in prop::collection::vec(1i64..20, 1..40)
393        ) {
394            let total_duration: i64 = durations.iter().sum();
395            let tasks: Vec<_> = durations
396                .iter()
397                .enumerate()
398                .map(|(i, duration)| task(i, "work", *duration, 0, total_duration + 100))
399                .collect();
400            let request = req(tasks.clone(), vec![agent(0, "a", &["work"]), agent(1, "b", &["work"])]);
401            let plan = solve_greedy_schedule(&request);
402
403            let by_task: BTreeMap<usize, &SchedulingTask> = tasks.iter().map(|t| (t.id, t)).collect();
404            let mut by_agent: BTreeMap<usize, Vec<&TaskAssignment>> = BTreeMap::new();
405
406            for assignment in &plan.assignments {
407                let original = by_task[&assignment.task_id];
408                prop_assert!(assignment.start_min >= original.release_min);
409                prop_assert!(assignment.end_min <= original.deadline_min);
410                prop_assert_eq!(assignment.end_min - assignment.start_min, original.duration_min);
411                by_agent.entry(assignment.agent_id).or_default().push(assignment);
412            }
413
414            for assignments in by_agent.values_mut() {
415                assignments.sort_by_key(|assignment| assignment.start_min);
416                for pair in assignments.windows(2) {
417                    prop_assert!(pair[0].end_min <= pair[1].start_min);
418                }
419            }
420        }
421    }
422}