Skip to main content

converge_optimization/suggestors/
work_schedule.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Work scheduling via the list-scheduling heuristic.
5//!
6//! Reads a [`ScheduleRequest`] from context and proposes a [`SchedulePlan`]
7//! that assigns each task a concrete start time with no resource conflicts.
8//!
9//! # Formation role
10//!
11//! An assignment suggestor settles who does what; this suggestor settles when.
12//! If upstream capacity estimates change, a new request is seeded and the
13//! formation re-converges on an updated schedule.
14//!
15//! # Modes
16//!
17//! - **Disjunctive** (`capacity: None`) — single machine, tasks cannot overlap.
18//! - **Cumulative** (`capacity: Some(k)`) — up to k units of resource may be
19//!   active simultaneously (parallel machines / multi-resource).
20
21use async_trait::async_trait;
22use converge_pack::ProvenanceSource;
23use converge_pack::{
24    AgentEffect, Context, ContextKey, DiagnosticPayload, FactPayload, ProposedFact, Suggestor,
25};
26use serde::{Deserialize, Serialize};
27
28use crate::scheduling::{Interval, SchedulingProblem, list_schedule};
29
30// ── Request ───────────────────────────────────────────────────────────────────
31
32/// Seed under [`ContextKey::Seeds`] with id prefix `"schedule-request:"`.
33#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
34#[serde(deny_unknown_fields)]
35pub struct ScheduleRequest {
36    pub id: String,
37    pub tasks: Vec<ScheduleTask>,
38    /// `None` → disjunctive (one machine). `Some(k)` → cumulative (k units).
39    pub capacity: Option<i64>,
40}
41
42impl FactPayload for ScheduleRequest {
43    const FAMILY: &'static str = "converge.optimization.schedule.request";
44    const VERSION: u16 = 1;
45}
46
47/// A single task to be scheduled.
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
49#[serde(deny_unknown_fields)]
50pub struct ScheduleTask {
51    pub label: String,
52    pub earliest_start: i64,
53    pub latest_end: i64,
54    pub duration: i64,
55}
56
57// ── Plan (output) ─────────────────────────────────────────────────────────────
58
59/// The schedule produced by the suggestor.
60#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
61#[serde(deny_unknown_fields)]
62pub struct SchedulePlan {
63    pub request_id: String,
64    pub scheduled: Vec<ScheduledTask>,
65    pub makespan: i64,
66    /// `1.0 - makespan / max_window` — higher means a tighter, more efficient schedule.
67    pub efficiency: f64,
68}
69
70impl FactPayload for SchedulePlan {
71    const FAMILY: &'static str = "converge.optimization.schedule.plan";
72    const VERSION: u16 = 1;
73}
74
75#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
76#[serde(deny_unknown_fields)]
77pub struct ScheduledTask {
78    pub label: String,
79    pub start: i64,
80    pub end: i64,
81}
82
83// ── Suggestor ─────────────────────────────────────────────────────────────────
84
85const REQUEST_PREFIX: &str = "schedule-request:";
86const PLAN_PREFIX: &str = "schedule-plan:";
87const ERROR_PREFIX: &str = "schedule-request-error:";
88
89/// Schedules tasks respecting time windows and resource capacity constraints.
90pub struct WorkScheduleSuggestor;
91
92#[async_trait]
93impl Suggestor for WorkScheduleSuggestor {
94    fn name(&self) -> &str {
95        "WorkScheduleSuggestor"
96    }
97
98    fn dependencies(&self) -> &[ContextKey] {
99        &[ContextKey::Seeds]
100    }
101
102    fn complexity_hint(&self) -> Option<&'static str> {
103        Some("O(n log n) list scheduling — n = tasks; scales to thousands of tasks")
104    }
105
106    fn accepts(&self, ctx: &dyn Context) -> bool {
107        ctx.get(ContextKey::Seeds).iter().any(|f| {
108            f.id().as_str().starts_with(REQUEST_PREFIX)
109                && match f.payload::<ScheduleRequest>() {
110                    Some(_) => !plan_exists(ctx, req_id(f.id().as_str())),
111                    None => !error_exists(ctx, f.id().as_str()),
112                }
113        })
114    }
115
116    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
117        let mut proposals = Vec::new();
118
119        for fact in ctx
120            .get(ContextKey::Seeds)
121            .iter()
122            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
123        {
124            match fact.payload::<ScheduleRequest>() {
125                Some(req) => {
126                    if plan_exists(ctx, req_id(fact.id().as_str())) {
127                        continue;
128                    }
129                    let plan = solve(req);
130                    proposals.push(
131                        ProposedFact::new(
132                            ContextKey::Strategies,
133                            format!("{}{}", PLAN_PREFIX, plan.request_id),
134                            plan.clone(),
135                            self.name().to_string(),
136                        )
137                        .with_confidence(plan.efficiency.clamp(0.0, 1.0)),
138                    );
139                }
140                None => {
141                    if error_exists(ctx, fact.id().as_str()) {
142                        continue;
143                    }
144                    proposals.push(
145                        ProposedFact::new(
146                            ContextKey::Diagnostic,
147                            format!("{}{}", ERROR_PREFIX, fact.id()),
148                            DiagnosticPayload::new(
149                                self.name(),
150                                format!(
151                                    "malformed schedule request '{}': expected {} v{} payload",
152                                    fact.id(),
153                                    ScheduleRequest::FAMILY,
154                                    ScheduleRequest::VERSION
155                                ),
156                            ),
157                            self.name().to_string(),
158                        )
159                        .with_confidence(1.0),
160                    );
161                }
162            }
163        }
164
165        if proposals.is_empty() {
166            AgentEffect::empty()
167        } else {
168            AgentEffect::with_proposals(proposals)
169        }
170    }
171
172    fn provenance(&self) -> &'static str {
173        super::CONVERGE_OPTIMIZATION_PROVENANCE.as_str()
174    }
175}
176
177// ── Core logic ────────────────────────────────────────────────────────────────
178
179fn solve(req: &ScheduleRequest) -> SchedulePlan {
180    if req.tasks.is_empty() {
181        return SchedulePlan {
182            request_id: req.id.clone(),
183            scheduled: vec![],
184            makespan: 0,
185            efficiency: 1.0,
186        };
187    }
188
189    let intervals: Vec<Interval> = req
190        .tasks
191        .iter()
192        .enumerate()
193        .map(|(i, t)| Interval::new(i, t.earliest_start, t.latest_end, t.duration))
194        .collect();
195
196    let max_window = req.tasks.iter().map(|t| t.latest_end).max().unwrap_or(1);
197
198    let problem = match req.capacity {
199        None => SchedulingProblem::disjunctive(intervals),
200        Some(cap) => SchedulingProblem::cumulative(intervals, cap),
201    };
202
203    match list_schedule(&problem) {
204        Ok(sol) => {
205            let scheduled = sol
206                .schedule
207                .iter()
208                .map(|s| ScheduledTask {
209                    label: req
210                        .tasks
211                        .get(s.interval.id)
212                        .map(|t| t.label.clone())
213                        .unwrap_or_default(),
214                    start: s.start,
215                    end: s.end(),
216                })
217                .collect();
218            let efficiency = if max_window > 0 {
219                1.0 - sol.makespan as f64 / max_window as f64
220            } else {
221                1.0
222            };
223            SchedulePlan {
224                request_id: req.id.clone(),
225                scheduled,
226                makespan: sol.makespan,
227                efficiency: efficiency.clamp(0.0, 1.0),
228            }
229        }
230        Err(_) => {
231            // Infeasible schedules surface as a low-confidence plan with
232            // makespan = -1 so downstream suggestors can detect the failure.
233            SchedulePlan {
234                request_id: req.id.clone(),
235                scheduled: vec![],
236                makespan: -1,
237                efficiency: 0.0,
238            }
239        }
240    }
241}
242
243// ── Helpers ───────────────────────────────────────────────────────────────────
244
245fn req_id(fact_id: &str) -> &str {
246    fact_id.trim_start_matches(REQUEST_PREFIX)
247}
248
249fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
250    let id = format!("{}{}", PLAN_PREFIX, request_id);
251    ctx.get(ContextKey::Strategies)
252        .iter()
253        .any(|f| f.id().as_str() == id)
254}
255
256fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
257    let id = format!("{}{}", ERROR_PREFIX, fact_id);
258    ctx.get(ContextKey::Diagnostic)
259        .iter()
260        .any(|f| f.id().as_str() == id)
261}
262
263// ── Tests ─────────────────────────────────────────────────────────────────────
264
265#[cfg(test)]
266mod tests {
267    use super::*;
268    use converge_core::{ContextState, Engine};
269    use converge_pack::TextPayload;
270
271    fn req(id: &str, tasks: Vec<(&str, i64, i64, i64)>, capacity: Option<i64>) -> ScheduleRequest {
272        ScheduleRequest {
273            id: id.to_string(),
274            tasks: tasks
275                .into_iter()
276                .map(|(label, es, le, dur)| ScheduleTask {
277                    label: label.to_string(),
278                    earliest_start: es,
279                    latest_end: le,
280                    duration: dur,
281                })
282                .collect(),
283            capacity,
284        }
285    }
286
287    #[tokio::test]
288    async fn three_tasks_makespan_equals_sum() {
289        let mut engine = Engine::new();
290        engine.register_suggestor(WorkScheduleSuggestor);
291
292        let mut ctx = ContextState::new();
293        ctx.add_proposal(ProposedFact::new(
294            ContextKey::Seeds,
295            "schedule-request:r1",
296            req(
297                "r1",
298                vec![
299                    ("design", 0, 30, 5),
300                    ("build", 0, 30, 8),
301                    ("test", 0, 30, 3),
302                ],
303                None,
304            ),
305            "test",
306        ))
307        .unwrap();
308
309        let result = engine.run(ctx).await.unwrap();
310        let facts = result.context.get(ContextKey::Strategies);
311        assert_eq!(facts.len(), 1);
312        let plan = facts[0].require_payload::<SchedulePlan>().unwrap();
313        assert_eq!(plan.makespan, 16, "3 sequential tasks: 5+8+3=16");
314        assert_eq!(plan.scheduled.len(), 3);
315    }
316
317    #[tokio::test]
318    async fn result_is_idempotent() {
319        let mut engine = Engine::new();
320        engine.register_suggestor(WorkScheduleSuggestor);
321
322        let mut ctx = ContextState::new();
323        ctx.add_proposal(ProposedFact::new(
324            ContextKey::Seeds,
325            "schedule-request:r1",
326            req("r1", vec![("a", 0, 20, 5), ("b", 0, 20, 3)], None),
327            "test",
328        ))
329        .unwrap();
330
331        let first = engine.run(ctx).await.unwrap();
332        let mut engine2 = Engine::new();
333        engine2.register_suggestor(WorkScheduleSuggestor);
334        let second = engine2.run(first.context.clone()).await.unwrap();
335        assert_eq!(
336            second.context.get(ContextKey::Strategies).len(),
337            first.context.get(ContextKey::Strategies).len(),
338        );
339    }
340
341    #[tokio::test]
342    async fn malformed_request_emits_diagnostic() {
343        let mut engine = Engine::new();
344        engine.register_suggestor(WorkScheduleSuggestor);
345
346        let mut ctx = ContextState::new();
347        ctx.add_proposal(ProposedFact::new(
348            ContextKey::Seeds,
349            "schedule-request:bad",
350            TextPayload::new("not a schedule request"),
351            "test",
352        ))
353        .unwrap();
354
355        let result = engine.run(ctx).await.unwrap();
356        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
357        assert!(!result.context.has(ContextKey::Strategies));
358    }
359}