Skip to main content

converge_optimization/suggestors/
work_schedule.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Work scheduling via the list-scheduling heuristic.
5//!
6//! Reads a [`ScheduleRequest`] from context and proposes a [`SchedulePlan`]
7//! that assigns each task a concrete start time with no resource conflicts.
8//!
9//! # Formation role
10//!
11//! An assignment suggestor settles who does what; this suggestor settles when.
12//! If upstream capacity estimates change, a new request is seeded and the
13//! formation re-converges on an updated schedule.
14//!
15//! # Modes
16//!
17//! - **Disjunctive** (`capacity: None`) — single machine, tasks cannot overlap.
18//! - **Cumulative** (`capacity: Some(k)`) — up to k units of resource may be
19//!   active simultaneously (parallel machines / multi-resource).
20
21use async_trait::async_trait;
22use converge_pack::Provenance;
23use converge_pack::ProvenanceSource;
24use converge_pack::{
25    AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
26};
27use serde::{Deserialize, Serialize};
28
29use crate::scheduling::{Interval, SchedulingProblem, list_schedule};
30
31// ── Request ───────────────────────────────────────────────────────────────────
32
33/// Seed under [`ContextKey::Seeds`] with id prefix `"schedule-request:"`.
34#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
35#[serde(deny_unknown_fields)]
36pub struct ScheduleRequest {
37    pub id: String,
38    pub tasks: Vec<ScheduleTask>,
39    /// `None` → disjunctive (one machine). `Some(k)` → cumulative (k units).
40    pub capacity: Option<i64>,
41}
42
43impl FactPayload for ScheduleRequest {
44    const FAMILY: &'static str = "converge.optimization.schedule.request";
45    const VERSION: u16 = 1;
46}
47
48/// A single task to be scheduled.
49#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
50#[serde(deny_unknown_fields)]
51pub struct ScheduleTask {
52    pub label: String,
53    pub earliest_start: i64,
54    pub latest_end: i64,
55    pub duration: i64,
56}
57
58// ── Plan (output) ─────────────────────────────────────────────────────────────
59
60/// The schedule produced by the suggestor.
61#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
62#[serde(deny_unknown_fields)]
63pub struct SchedulePlan {
64    pub request_id: String,
65    pub scheduled: Vec<ScheduledTask>,
66    pub makespan: i64,
67    /// `1.0 - makespan / max_window` — higher means a tighter, more efficient schedule.
68    pub efficiency: f64,
69}
70
71impl FactPayload for SchedulePlan {
72    const FAMILY: &'static str = "converge.optimization.schedule.plan";
73    const VERSION: u16 = 1;
74}
75
76#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
77#[serde(deny_unknown_fields)]
78pub struct ScheduledTask {
79    pub label: String,
80    pub start: i64,
81    pub end: i64,
82}
83
84// ── Suggestor ─────────────────────────────────────────────────────────────────
85
86const REQUEST_PREFIX: &str = "schedule-request:";
87const PLAN_PREFIX: &str = "schedule-plan:";
88const ERROR_PREFIX: &str = "schedule-request-error:";
89
90/// Schedules tasks respecting time windows and resource capacity constraints.
91pub struct WorkScheduleSuggestor;
92
93#[async_trait]
94impl Suggestor for WorkScheduleSuggestor {
95    fn name(&self) -> &str {
96        "WorkScheduleSuggestor"
97    }
98
99    fn dependencies(&self) -> &[ContextKey] {
100        &[ContextKey::Seeds]
101    }
102
103    fn complexity_hint(&self) -> Option<&'static str> {
104        Some("O(n log n) list scheduling — n = tasks; scales to thousands of tasks")
105    }
106
107    fn accepts(&self, ctx: &dyn Context) -> bool {
108        ctx.get(ContextKey::Seeds).iter().any(|f| {
109            f.id().as_str().starts_with(REQUEST_PREFIX)
110                && match f.payload::<ScheduleRequest>() {
111                    Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
112                    None => !error_exists(ctx, f.id().as_str()),
113                }
114        })
115    }
116
117    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
118        let mut proposals = Vec::new();
119
120        for fact in ctx
121            .get(ContextKey::Seeds)
122            .iter()
123            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
124        {
125            match fact.payload::<ScheduleRequest>() {
126                Some(req) => {
127                    if plan_exists(ctx, req_id(fact.id().as_str())) {
128                        continue;
129                    }
130                    let plan = solve(req);
131                    proposals.push(
132                        ProposedFact::new(
133                            ContextKey::Strategies,
134                            format!("{}{}", PLAN_PREFIX, plan.request_id),
135                            plan.clone(),
136                            self.provenance(),
137                        )
138                        .with_confidence(plan.efficiency.clamp(0.0, 1.0)),
139                    );
140                }
141                None => {
142                    if error_exists(ctx, fact.id().as_str()) {
143                        continue;
144                    }
145                    proposals.push(
146                        ProposedFact::new(
147                            ContextKey::Diagnostic,
148                            format!("{}{}", ERROR_PREFIX, fact.id()),
149                            DiagnosticPayload::new(
150                                self.name(),
151                                format!(
152                                    "malformed schedule request '{}': expected {} v{} payload",
153                                    fact.id(),
154                                    ScheduleRequest::FAMILY,
155                                    ScheduleRequest::VERSION
156                                ),
157                            ),
158                            self.provenance(),
159                        )
160                        .with_confidence(1.0),
161                    );
162                }
163            }
164        }
165
166        if proposals.is_empty() {
167            AgentEffect::empty()
168        } else {
169            AgentEffect::with_proposals(proposals)
170        }
171    }
172
173    fn provenance(&self) -> Provenance {
174        crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE.provenance()
175    }
176}
177
178// ── Core logic ────────────────────────────────────────────────────────────────
179
180fn solve(req: &ScheduleRequest) -> SchedulePlan {
181    if req.tasks.is_empty() {
182        return SchedulePlan {
183            request_id: req.id.clone(),
184            scheduled: vec![],
185            makespan: 0,
186            efficiency: 1.0,
187        };
188    }
189
190    let intervals: Vec<Interval> = req
191        .tasks
192        .iter()
193        .enumerate()
194        .map(|(i, t)| Interval::new(i, t.earliest_start, t.latest_end, t.duration))
195        .collect();
196
197    let max_window = req.tasks.iter().map(|t| t.latest_end).max().unwrap_or(1);
198
199    let problem = match req.capacity {
200        None => SchedulingProblem::disjunctive(intervals),
201        Some(cap) => SchedulingProblem::cumulative(intervals, cap),
202    };
203
204    match list_schedule(&problem) {
205        Ok(sol) => {
206            let scheduled = sol
207                .schedule
208                .iter()
209                .map(|s| ScheduledTask {
210                    label: req
211                        .tasks
212                        .get(s.interval.id)
213                        .map(|t| t.label.clone())
214                        .unwrap_or_default(),
215                    start: s.start,
216                    end: s.end(),
217                })
218                .collect();
219            let efficiency = if max_window > 0 {
220                1.0 - sol.makespan as f64 / max_window as f64
221            } else {
222                1.0
223            };
224            SchedulePlan {
225                request_id: req.id.clone(),
226                scheduled,
227                makespan: sol.makespan,
228                efficiency: efficiency.clamp(0.0, 1.0),
229            }
230        }
231        Err(_) => {
232            // Infeasible schedules surface as a low-confidence plan with
233            // makespan = -1 so downstream suggestors can detect the failure.
234            SchedulePlan {
235                request_id: req.id.clone(),
236                scheduled: vec![],
237                makespan: -1,
238                efficiency: 0.0,
239            }
240        }
241    }
242}
243
244// ── Helpers ───────────────────────────────────────────────────────────────────
245
246fn req_id(fact_id: &str) -> &str {
247    fact_id.trim_start_matches(REQUEST_PREFIX)
248}
249
250fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
251    let id = format!("{}{}", PLAN_PREFIX, request_id);
252    ctx.get(ContextKey::Strategies)
253        .iter()
254        .any(|f| f.id().as_str() == id)
255}
256
257fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
258    let id = format!("{}{}", ERROR_PREFIX, fact_id);
259    ctx.get(ContextKey::Diagnostic)
260        .iter()
261        .any(|f| f.id().as_str() == id)
262}
263
264// ── Tests ─────────────────────────────────────────────────────────────────────
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use converge_core::{ContextState, Engine};
270    use converge_pack::TextPayload;
271
272    fn req(id: &str, tasks: Vec<(&str, i64, i64, i64)>, capacity: Option<i64>) -> ScheduleRequest {
273        ScheduleRequest {
274            id: id.to_string(),
275            tasks: tasks
276                .into_iter()
277                .map(|(label, es, le, dur)| ScheduleTask {
278                    label: label.to_string(),
279                    earliest_start: es,
280                    latest_end: le,
281                    duration: dur,
282                })
283                .collect(),
284            capacity,
285        }
286    }
287
288    #[tokio::test]
289    async fn three_tasks_makespan_equals_sum() {
290        let mut engine = Engine::new();
291        engine.register_suggestor(WorkScheduleSuggestor);
292
293        let mut ctx = ContextState::new();
294        ctx.add_proposal(ProposedFact::new(
295            ContextKey::Seeds,
296            "schedule-request:r1",
297            req(
298                "r1",
299                vec![
300                    ("design", 0, 30, 5),
301                    ("build", 0, 30, 8),
302                    ("test", 0, 30, 3),
303                ],
304                None,
305            ),
306            converge_pack::ProvenanceSource::provenance(
307                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
308            ),
309        ))
310        .unwrap();
311
312        let result = engine.run(ctx).await.unwrap();
313        let facts = result.context.get(ContextKey::Strategies);
314        assert_eq!(facts.len(), 1);
315        let plan = facts[0].require_payload::<SchedulePlan>().unwrap();
316        assert_eq!(plan.makespan, 16, "3 sequential tasks: 5+8+3=16");
317        assert_eq!(plan.scheduled.len(), 3);
318    }
319
320    #[tokio::test]
321    async fn result_is_idempotent() {
322        let mut engine = Engine::new();
323        engine.register_suggestor(WorkScheduleSuggestor);
324
325        let mut ctx = ContextState::new();
326        ctx.add_proposal(ProposedFact::new(
327            ContextKey::Seeds,
328            "schedule-request:r1",
329            req("r1", vec![("a", 0, 20, 5), ("b", 0, 20, 3)], None),
330            converge_pack::ProvenanceSource::provenance(
331                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
332            ),
333        ))
334        .unwrap();
335
336        let first = engine.run(ctx).await.unwrap();
337        let mut engine2 = Engine::new();
338        engine2.register_suggestor(WorkScheduleSuggestor);
339        let second = engine2.run(first.context.clone()).await.unwrap();
340        assert_eq!(
341            second.context.get(ContextKey::Strategies).len(),
342            first.context.get(ContextKey::Strategies).len(),
343        );
344    }
345
346    #[tokio::test]
347    async fn malformed_request_emits_diagnostic() {
348        let mut engine = Engine::new();
349        engine.register_suggestor(WorkScheduleSuggestor);
350
351        let mut ctx = ContextState::new();
352        ctx.add_proposal(ProposedFact::new(
353            ContextKey::Seeds,
354            "schedule-request:bad",
355            TextPayload::new("not a schedule request"),
356            converge_pack::ProvenanceSource::provenance(
357                crate::suggestors::CONVERGE_OPTIMIZATION_PROVENANCE,
358            ),
359        ))
360        .unwrap();
361
362        let result = engine.run(ctx).await.unwrap();
363        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
364        assert!(!result.context.has(ContextKey::Strategies));
365    }
366}