Skip to main content

converge_optimization/suggestors/
work_schedule.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Work scheduling via the list-scheduling heuristic.
5//!
6//! Reads a [`ScheduleRequest`] from context and proposes a [`SchedulePlan`]
7//! that assigns each task a concrete start time with no resource conflicts.
8//!
9//! # Formation role
10//!
11//! An assignment suggestor settles who does what; this suggestor settles when.
12//! If upstream capacity estimates change, a new request is seeded and the
13//! formation re-converges on an updated schedule.
14//!
15//! # Modes
16//!
17//! - **Disjunctive** (`capacity: None`) — single machine, tasks cannot overlap.
18//! - **Cumulative** (`capacity: Some(k)`) — up to k units of resource may be
19//!   active simultaneously (parallel machines / multi-resource).
20
21use async_trait::async_trait;
22use converge_pack::{AgentEffect, Context, ContextKey, ProposedFact, Suggestor};
23use serde::{Deserialize, Serialize};
24
25use crate::scheduling::{Interval, SchedulingProblem, list_schedule};
26
27// ── Request ───────────────────────────────────────────────────────────────────
28
29/// Seed under [`ContextKey::Seeds`] with id prefix `"schedule-request:"`.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ScheduleRequest {
32    pub id: String,
33    pub tasks: Vec<ScheduleTask>,
34    /// `None` → disjunctive (one machine). `Some(k)` → cumulative (k units).
35    pub capacity: Option<i64>,
36}
37
38/// A single task to be scheduled.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ScheduleTask {
41    pub label: String,
42    pub earliest_start: i64,
43    pub latest_end: i64,
44    pub duration: i64,
45}
46
47// ── Plan (output) ─────────────────────────────────────────────────────────────
48
49/// The schedule produced by the suggestor.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct SchedulePlan {
52    pub request_id: String,
53    pub scheduled: Vec<ScheduledTask>,
54    pub makespan: i64,
55    /// `1.0 - makespan / max_window` — higher means a tighter, more efficient schedule.
56    pub efficiency: f64,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ScheduledTask {
61    pub label: String,
62    pub start: i64,
63    pub end: i64,
64}
65
66// ── Suggestor ─────────────────────────────────────────────────────────────────
67
68const REQUEST_PREFIX: &str = "schedule-request:";
69const PLAN_PREFIX: &str = "schedule-plan:";
70const ERROR_PREFIX: &str = "schedule-request-error:";
71
72/// Schedules tasks respecting time windows and resource capacity constraints.
73pub struct WorkScheduleSuggestor;
74
75#[async_trait]
76impl Suggestor for WorkScheduleSuggestor {
77    fn name(&self) -> &str {
78        "WorkScheduleSuggestor"
79    }
80
81    fn dependencies(&self) -> &[ContextKey] {
82        &[ContextKey::Seeds]
83    }
84
85    fn complexity_hint(&self) -> Option<&'static str> {
86        Some("O(n log n) list scheduling — n = tasks; scales to thousands of tasks")
87    }
88
89    fn accepts(&self, ctx: &dyn Context) -> bool {
90        ctx.get(ContextKey::Seeds).iter().any(|f| {
91            f.id.starts_with(REQUEST_PREFIX)
92                && match serde_json::from_str::<ScheduleRequest>(&f.content) {
93                    Ok(_) => !plan_exists(ctx, req_id(&f.id)),
94                    Err(_) => !error_exists(ctx, &f.id),
95                }
96        })
97    }
98
99    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
100        let mut proposals = Vec::new();
101
102        for fact in ctx
103            .get(ContextKey::Seeds)
104            .iter()
105            .filter(|f| f.id.starts_with(REQUEST_PREFIX))
106        {
107            match serde_json::from_str::<ScheduleRequest>(&fact.content) {
108                Ok(req) => {
109                    if plan_exists(ctx, req_id(&fact.id)) {
110                        continue;
111                    }
112                    let plan = solve(&req);
113                    proposals.push(
114                        ProposedFact::new(
115                            ContextKey::Strategies,
116                            format!("{}{}", PLAN_PREFIX, plan.request_id),
117                            serde_json::to_string(&plan).unwrap_or_default(),
118                            self.name(),
119                        )
120                        .with_confidence(plan.efficiency.clamp(0.0, 1.0)),
121                    );
122                }
123                Err(e) => {
124                    if error_exists(ctx, &fact.id) {
125                        continue;
126                    }
127                    let diag = serde_json::json!({
128                        "request_fact_id": fact.id,
129                        "message": "malformed schedule request",
130                        "error": e.to_string(),
131                    });
132                    proposals.push(
133                        ProposedFact::new(
134                            ContextKey::Diagnostic,
135                            format!("{}{}", ERROR_PREFIX, fact.id),
136                            diag.to_string(),
137                            self.name(),
138                        )
139                        .with_confidence(1.0),
140                    );
141                }
142            }
143        }
144
145        if proposals.is_empty() {
146            AgentEffect::empty()
147        } else {
148            AgentEffect::with_proposals(proposals)
149        }
150    }
151}
152
153// ── Core logic ────────────────────────────────────────────────────────────────
154
155fn solve(req: &ScheduleRequest) -> SchedulePlan {
156    if req.tasks.is_empty() {
157        return SchedulePlan {
158            request_id: req.id.clone(),
159            scheduled: vec![],
160            makespan: 0,
161            efficiency: 1.0,
162        };
163    }
164
165    let intervals: Vec<Interval> = req
166        .tasks
167        .iter()
168        .enumerate()
169        .map(|(i, t)| Interval::new(i, t.earliest_start, t.latest_end, t.duration))
170        .collect();
171
172    let max_window = req.tasks.iter().map(|t| t.latest_end).max().unwrap_or(1);
173
174    let problem = match req.capacity {
175        None => SchedulingProblem::disjunctive(intervals),
176        Some(cap) => SchedulingProblem::cumulative(intervals, cap),
177    };
178
179    match list_schedule(&problem) {
180        Ok(sol) => {
181            let scheduled = sol
182                .schedule
183                .iter()
184                .map(|s| ScheduledTask {
185                    label: req
186                        .tasks
187                        .get(s.interval.id)
188                        .map(|t| t.label.clone())
189                        .unwrap_or_default(),
190                    start: s.start,
191                    end: s.end(),
192                })
193                .collect();
194            let efficiency = if max_window > 0 {
195                1.0 - sol.makespan as f64 / max_window as f64
196            } else {
197                1.0
198            };
199            SchedulePlan {
200                request_id: req.id.clone(),
201                scheduled,
202                makespan: sol.makespan,
203                efficiency: efficiency.clamp(0.0, 1.0),
204            }
205        }
206        Err(_) => {
207            // Infeasible schedules surface as a low-confidence plan with
208            // makespan = -1 so downstream suggestors can detect the failure.
209            SchedulePlan {
210                request_id: req.id.clone(),
211                scheduled: vec![],
212                makespan: -1,
213                efficiency: 0.0,
214            }
215        }
216    }
217}
218
219// ── Helpers ───────────────────────────────────────────────────────────────────
220
221fn req_id(fact_id: &str) -> &str {
222    fact_id.trim_start_matches(REQUEST_PREFIX)
223}
224
225fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
226    let id = format!("{}{}", PLAN_PREFIX, request_id);
227    ctx.get(ContextKey::Strategies).iter().any(|f| f.id == id)
228}
229
230fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
231    let id = format!("{}{}", ERROR_PREFIX, fact_id);
232    ctx.get(ContextKey::Diagnostic).iter().any(|f| f.id == id)
233}
234
235// ── Tests ─────────────────────────────────────────────────────────────────────
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use converge_core::{ContextState, Engine};
241
242    fn req_json(id: &str, tasks: Vec<(&str, i64, i64, i64)>, capacity: Option<i64>) -> String {
243        serde_json::to_string(&ScheduleRequest {
244            id: id.to_string(),
245            tasks: tasks
246                .into_iter()
247                .map(|(label, es, le, dur)| ScheduleTask {
248                    label: label.to_string(),
249                    earliest_start: es,
250                    latest_end: le,
251                    duration: dur,
252                })
253                .collect(),
254            capacity,
255        })
256        .unwrap()
257    }
258
259    #[tokio::test]
260    async fn three_tasks_makespan_equals_sum() {
261        let mut engine = Engine::new();
262        engine.register_suggestor(WorkScheduleSuggestor);
263
264        let mut ctx = ContextState::new();
265        ctx.add_input(
266            ContextKey::Seeds,
267            "schedule-request:r1",
268            req_json(
269                "r1",
270                vec![
271                    ("design", 0, 30, 5),
272                    ("build", 0, 30, 8),
273                    ("test", 0, 30, 3),
274                ],
275                None,
276            ),
277        )
278        .unwrap();
279
280        let result = engine.run(ctx).await.unwrap();
281        let facts = result.context.get(ContextKey::Strategies);
282        assert_eq!(facts.len(), 1);
283        let plan: SchedulePlan = serde_json::from_str(&facts[0].content).unwrap();
284        assert_eq!(plan.makespan, 16, "3 sequential tasks: 5+8+3=16");
285        assert_eq!(plan.scheduled.len(), 3);
286    }
287
288    #[tokio::test]
289    async fn result_is_idempotent() {
290        let mut engine = Engine::new();
291        engine.register_suggestor(WorkScheduleSuggestor);
292
293        let mut ctx = ContextState::new();
294        ctx.add_input(
295            ContextKey::Seeds,
296            "schedule-request:r1",
297            req_json("r1", vec![("a", 0, 20, 5), ("b", 0, 20, 3)], None),
298        )
299        .unwrap();
300
301        let first = engine.run(ctx).await.unwrap();
302        let mut engine2 = Engine::new();
303        engine2.register_suggestor(WorkScheduleSuggestor);
304        let second = engine2.run(first.context.clone()).await.unwrap();
305        assert_eq!(
306            second.context.get(ContextKey::Strategies).len(),
307            first.context.get(ContextKey::Strategies).len(),
308        );
309    }
310
311    #[tokio::test]
312    async fn malformed_request_emits_diagnostic() {
313        let mut engine = Engine::new();
314        engine.register_suggestor(WorkScheduleSuggestor);
315
316        let mut ctx = ContextState::new();
317        ctx.add_input(ContextKey::Seeds, "schedule-request:bad", "not-json")
318            .unwrap();
319
320        let result = engine.run(ctx).await.unwrap();
321        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
322        assert!(!result.context.has(ContextKey::Strategies));
323    }
324}