Skip to main content

converge_optimization/suggestors/
work_schedule.rs

1// Copyright 2024-2026 Reflective Labs
2// SPDX-License-Identifier: MIT
3
4//! Work scheduling via the list-scheduling heuristic.
5//!
6//! Reads a [`ScheduleRequest`] from context and proposes a [`SchedulePlan`]
7//! that assigns each task a concrete start time with no resource conflicts.
8//!
9//! # Formation role
10//!
11//! An assignment suggestor settles who does what; this suggestor settles when.
12//! If upstream capacity estimates change, a new request is seeded and the
13//! formation re-converges on an updated schedule.
14//!
15//! # Modes
16//!
17//! - **Disjunctive** (`capacity: None`) — single machine, tasks cannot overlap.
18//! - **Cumulative** (`capacity: Some(k)`) — up to k units of resource may be
19//!   active simultaneously (parallel machines / multi-resource).
20
21use async_trait::async_trait;
22use converge_pack::{AgentEffect, Context, ContextKey, ProposedFact, Suggestor};
23use serde::{Deserialize, Serialize};
24
25use crate::scheduling::{Interval, SchedulingProblem, list_schedule};
26
27// ── Request ───────────────────────────────────────────────────────────────────
28
29/// Seed under [`ContextKey::Seeds`] with id prefix `"schedule-request:"`.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct ScheduleRequest {
32    pub id: String,
33    pub tasks: Vec<ScheduleTask>,
34    /// `None` → disjunctive (one machine). `Some(k)` → cumulative (k units).
35    pub capacity: Option<i64>,
36}
37
38/// A single task to be scheduled.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct ScheduleTask {
41    pub label: String,
42    pub earliest_start: i64,
43    pub latest_end: i64,
44    pub duration: i64,
45}
46
47// ── Plan (output) ─────────────────────────────────────────────────────────────
48
49/// The schedule produced by the suggestor.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct SchedulePlan {
52    pub request_id: String,
53    pub scheduled: Vec<ScheduledTask>,
54    pub makespan: i64,
55    /// `1.0 - makespan / max_window` — higher means a tighter, more efficient schedule.
56    pub efficiency: f64,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ScheduledTask {
61    pub label: String,
62    pub start: i64,
63    pub end: i64,
64}
65
66// ── Suggestor ─────────────────────────────────────────────────────────────────
67
68const REQUEST_PREFIX: &str = "schedule-request:";
69const PLAN_PREFIX: &str = "schedule-plan:";
70const ERROR_PREFIX: &str = "schedule-request-error:";
71
72/// Schedules tasks respecting time windows and resource capacity constraints.
73pub struct WorkScheduleSuggestor;
74
75#[async_trait]
76impl Suggestor for WorkScheduleSuggestor {
77    fn name(&self) -> &str {
78        "WorkScheduleSuggestor"
79    }
80
81    fn dependencies(&self) -> &[ContextKey] {
82        &[ContextKey::Seeds]
83    }
84
85    fn complexity_hint(&self) -> Option<&'static str> {
86        Some("O(n log n) list scheduling — n = tasks; scales to thousands of tasks")
87    }
88
89    fn accepts(&self, ctx: &dyn Context) -> bool {
90        ctx.get(ContextKey::Seeds).iter().any(|f| {
91            f.id().as_str().starts_with(REQUEST_PREFIX)
92                && match serde_json::from_str::<ScheduleRequest>(f.content()) {
93                    Ok(_) => !plan_exists(ctx, req_id(f.id().as_str())),
94                    Err(_) => !error_exists(ctx, f.id().as_str()),
95                }
96        })
97    }
98
99    async fn execute(&self, ctx: &dyn Context) -> AgentEffect {
100        let mut proposals = Vec::new();
101
102        for fact in ctx
103            .get(ContextKey::Seeds)
104            .iter()
105            .filter(|f| f.id().as_str().starts_with(REQUEST_PREFIX))
106        {
107            match serde_json::from_str::<ScheduleRequest>(fact.content()) {
108                Ok(req) => {
109                    if plan_exists(ctx, req_id(fact.id().as_str())) {
110                        continue;
111                    }
112                    let plan = solve(&req);
113                    proposals.push(
114                        ProposedFact::new(
115                            ContextKey::Strategies,
116                            format!("{}{}", PLAN_PREFIX, plan.request_id),
117                            serde_json::to_string(&plan).unwrap_or_default(),
118                            self.name(),
119                        )
120                        .with_confidence(plan.efficiency.clamp(0.0, 1.0)),
121                    );
122                }
123                Err(e) => {
124                    if error_exists(ctx, fact.id().as_str()) {
125                        continue;
126                    }
127                    let diag = serde_json::json!({
128                        "request_fact_id": fact.id(),
129                        "message": "malformed schedule request",
130                        "error": e.to_string(),
131                    });
132                    proposals.push(
133                        ProposedFact::new(
134                            ContextKey::Diagnostic,
135                            format!("{}{}", ERROR_PREFIX, fact.id()),
136                            diag.to_string(),
137                            self.name(),
138                        )
139                        .with_confidence(1.0),
140                    );
141                }
142            }
143        }
144
145        if proposals.is_empty() {
146            AgentEffect::empty()
147        } else {
148            AgentEffect::with_proposals(proposals)
149        }
150    }
151}
152
153// ── Core logic ────────────────────────────────────────────────────────────────
154
155fn solve(req: &ScheduleRequest) -> SchedulePlan {
156    if req.tasks.is_empty() {
157        return SchedulePlan {
158            request_id: req.id.clone(),
159            scheduled: vec![],
160            makespan: 0,
161            efficiency: 1.0,
162        };
163    }
164
165    let intervals: Vec<Interval> = req
166        .tasks
167        .iter()
168        .enumerate()
169        .map(|(i, t)| Interval::new(i, t.earliest_start, t.latest_end, t.duration))
170        .collect();
171
172    let max_window = req.tasks.iter().map(|t| t.latest_end).max().unwrap_or(1);
173
174    let problem = match req.capacity {
175        None => SchedulingProblem::disjunctive(intervals),
176        Some(cap) => SchedulingProblem::cumulative(intervals, cap),
177    };
178
179    match list_schedule(&problem) {
180        Ok(sol) => {
181            let scheduled = sol
182                .schedule
183                .iter()
184                .map(|s| ScheduledTask {
185                    label: req
186                        .tasks
187                        .get(s.interval.id)
188                        .map(|t| t.label.clone())
189                        .unwrap_or_default(),
190                    start: s.start,
191                    end: s.end(),
192                })
193                .collect();
194            let efficiency = if max_window > 0 {
195                1.0 - sol.makespan as f64 / max_window as f64
196            } else {
197                1.0
198            };
199            SchedulePlan {
200                request_id: req.id.clone(),
201                scheduled,
202                makespan: sol.makespan,
203                efficiency: efficiency.clamp(0.0, 1.0),
204            }
205        }
206        Err(_) => {
207            // Infeasible schedules surface as a low-confidence plan with
208            // makespan = -1 so downstream suggestors can detect the failure.
209            SchedulePlan {
210                request_id: req.id.clone(),
211                scheduled: vec![],
212                makespan: -1,
213                efficiency: 0.0,
214            }
215        }
216    }
217}
218
219// ── Helpers ───────────────────────────────────────────────────────────────────
220
221fn req_id(fact_id: &str) -> &str {
222    fact_id.trim_start_matches(REQUEST_PREFIX)
223}
224
225fn plan_exists(ctx: &dyn Context, request_id: &str) -> bool {
226    let id = format!("{}{}", PLAN_PREFIX, request_id);
227    ctx.get(ContextKey::Strategies)
228        .iter()
229        .any(|f| f.id().as_str() == id)
230}
231
232fn error_exists(ctx: &dyn Context, fact_id: &str) -> bool {
233    let id = format!("{}{}", ERROR_PREFIX, fact_id);
234    ctx.get(ContextKey::Diagnostic)
235        .iter()
236        .any(|f| f.id().as_str() == id)
237}
238
239// ── Tests ─────────────────────────────────────────────────────────────────────
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use converge_core::{ContextState, Engine};
245
246    fn req_json(id: &str, tasks: Vec<(&str, i64, i64, i64)>, capacity: Option<i64>) -> String {
247        serde_json::to_string(&ScheduleRequest {
248            id: id.to_string(),
249            tasks: tasks
250                .into_iter()
251                .map(|(label, es, le, dur)| ScheduleTask {
252                    label: label.to_string(),
253                    earliest_start: es,
254                    latest_end: le,
255                    duration: dur,
256                })
257                .collect(),
258            capacity,
259        })
260        .unwrap()
261    }
262
263    #[tokio::test]
264    async fn three_tasks_makespan_equals_sum() {
265        let mut engine = Engine::new();
266        engine.register_suggestor(WorkScheduleSuggestor);
267
268        let mut ctx = ContextState::new();
269        ctx.add_input(
270            ContextKey::Seeds,
271            "schedule-request:r1",
272            req_json(
273                "r1",
274                vec![
275                    ("design", 0, 30, 5),
276                    ("build", 0, 30, 8),
277                    ("test", 0, 30, 3),
278                ],
279                None,
280            ),
281        )
282        .unwrap();
283
284        let result = engine.run(ctx).await.unwrap();
285        let facts = result.context.get(ContextKey::Strategies);
286        assert_eq!(facts.len(), 1);
287        let plan: SchedulePlan = serde_json::from_str(facts[0].content()).unwrap();
288        assert_eq!(plan.makespan, 16, "3 sequential tasks: 5+8+3=16");
289        assert_eq!(plan.scheduled.len(), 3);
290    }
291
292    #[tokio::test]
293    async fn result_is_idempotent() {
294        let mut engine = Engine::new();
295        engine.register_suggestor(WorkScheduleSuggestor);
296
297        let mut ctx = ContextState::new();
298        ctx.add_input(
299            ContextKey::Seeds,
300            "schedule-request:r1",
301            req_json("r1", vec![("a", 0, 20, 5), ("b", 0, 20, 3)], None),
302        )
303        .unwrap();
304
305        let first = engine.run(ctx).await.unwrap();
306        let mut engine2 = Engine::new();
307        engine2.register_suggestor(WorkScheduleSuggestor);
308        let second = engine2.run(first.context.clone()).await.unwrap();
309        assert_eq!(
310            second.context.get(ContextKey::Strategies).len(),
311            first.context.get(ContextKey::Strategies).len(),
312        );
313    }
314
315    #[tokio::test]
316    async fn malformed_request_emits_diagnostic() {
317        let mut engine = Engine::new();
318        engine.register_suggestor(WorkScheduleSuggestor);
319
320        let mut ctx = ContextState::new();
321        ctx.add_input(ContextKey::Seeds, "schedule-request:bad", "not-json")
322            .unwrap();
323
324        let result = engine.run(ctx).await.unwrap();
325        assert_eq!(result.context.get(ContextKey::Diagnostic).len(), 1);
326        assert!(!result.context.has(ContextKey::Strategies));
327    }
328}