Skip to main content

oris_execution_runtime/
scheduler.rs

1//! Scheduler skeleton for Phase 1 runtime rollout.
2
3use chrono::Utc;
4
5use oris_kernel::event::KernelError;
6
7use super::models::AttemptDispatchRecord;
8use super::repository::RuntimeRepository;
9
10const DISPATCH_SCAN_LIMIT: usize = 16;
11
12/// Context for context-aware dispatch (tenant, priority, plugin/worker capabilities).
13/// Used to route or filter work; concrete routing logic can be extended later.
14#[derive(Clone, Debug, Default)]
15pub struct DispatchContext {
16    pub tenant_id: Option<String>,
17    pub priority: Option<u32>,
18    /// Plugin type names required for this dispatch (e.g. node kinds).
19    pub plugin_requirements: Option<Vec<String>>,
20    /// Worker capability tags the scheduler may match against.
21    pub worker_capabilities: Option<Vec<String>>,
22}
23
24impl DispatchContext {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
30        self.tenant_id = Some(tenant_id.into());
31        self
32    }
33
34    pub fn with_priority(mut self, priority: u32) -> Self {
35        self.priority = Some(priority);
36        self
37    }
38}
39
40/// Scheduler dispatch decision.
41#[derive(Clone, Debug)]
42pub enum SchedulerDecision {
43    Dispatched {
44        attempt_id: String,
45        worker_id: String,
46    },
47    Noop,
48}
49
50/// Compile-safe scheduler skeleton for queue -> lease dispatch.
51pub struct SkeletonScheduler<R: RuntimeRepository> {
52    repository: R,
53}
54
55impl<R: RuntimeRepository> SkeletonScheduler<R> {
56    pub fn new(repository: R) -> Self {
57        Self { repository }
58    }
59
60    /// Attempt to dispatch one eligible attempt to `worker_id`.
61    pub fn dispatch_one(&self, worker_id: &str) -> Result<SchedulerDecision, KernelError> {
62        self.dispatch_one_with_context(worker_id, None)
63    }
64
65    /// Dispatch one attempt to `worker_id` with optional context for tenant/priority/capability routing.
66    /// Context is passed through for future filtering or sorting; current implementation
67    /// uses the same candidate list as `dispatch_one`.
68    pub fn dispatch_one_with_context(
69        &self,
70        worker_id: &str,
71        context: Option<&DispatchContext>,
72    ) -> Result<SchedulerDecision, KernelError> {
73        let now = Utc::now();
74        let candidates: Vec<AttemptDispatchRecord> = self
75            .repository
76            .list_dispatchable_attempts(now, DISPATCH_SCAN_LIMIT)?;
77        // Optional: apply context-based sort (e.g. by priority when available on attempts).
78        if context.as_ref().and_then(|c| c.priority).is_some() {
79            // Placeholder: could sort by attempt priority when AttemptDispatchRecord carries it.
80        }
81        let lease_expires_at = now + chrono::Duration::seconds(30);
82
83        for candidate in candidates {
84            if let Err(e) =
85                self.repository
86                    .upsert_lease(&candidate.attempt_id, worker_id, lease_expires_at)
87            {
88                let msg = e.to_string();
89                if msg.contains("active lease already exists") || msg.contains("not dispatchable") {
90                    continue;
91                }
92                return Err(e);
93            }
94
95            return Ok(SchedulerDecision::Dispatched {
96                attempt_id: candidate.attempt_id,
97                worker_id: worker_id.to_string(),
98            });
99        }
100
101        Ok(SchedulerDecision::Noop)
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::collections::HashSet;
108    use std::sync::{Arc, Mutex};
109
110    use chrono::{DateTime, Utc};
111
112    use super::*;
113    use oris_kernel::identity::{RunId, Seq};
114
115    use super::super::models::{AttemptExecutionStatus, LeaseRecord};
116
117    #[derive(Clone)]
118    struct FakeRepository {
119        attempts: Vec<AttemptDispatchRecord>,
120        conflict_attempts: Arc<Mutex<HashSet<String>>>,
121        claimed_attempts: Arc<Mutex<Vec<String>>>,
122    }
123
124    impl FakeRepository {
125        fn new(attempts: Vec<AttemptDispatchRecord>, conflict_attempts: &[&str]) -> Self {
126            Self {
127                attempts,
128                conflict_attempts: Arc::new(Mutex::new(
129                    conflict_attempts.iter().map(|s| (*s).to_string()).collect(),
130                )),
131                claimed_attempts: Arc::new(Mutex::new(Vec::new())),
132            }
133        }
134    }
135
136    impl RuntimeRepository for FakeRepository {
137        fn list_dispatchable_attempts(
138            &self,
139            _now: DateTime<Utc>,
140            _limit: usize,
141        ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
142            Ok(self.attempts.clone())
143        }
144
145        fn upsert_lease(
146            &self,
147            attempt_id: &str,
148            worker_id: &str,
149            lease_expires_at: DateTime<Utc>,
150        ) -> Result<LeaseRecord, KernelError> {
151            if self
152                .conflict_attempts
153                .lock()
154                .expect("conflict lock")
155                .contains(attempt_id)
156            {
157                return Err(KernelError::Driver(format!(
158                    "active lease already exists for attempt: {}",
159                    attempt_id
160                )));
161            }
162            self.claimed_attempts
163                .lock()
164                .expect("claimed lock")
165                .push(attempt_id.to_string());
166            Ok(LeaseRecord {
167                lease_id: format!("lease-{}", attempt_id),
168                attempt_id: attempt_id.to_string(),
169                worker_id: worker_id.to_string(),
170                lease_expires_at,
171                heartbeat_at: Utc::now(),
172                version: 1,
173            })
174        }
175
176        fn heartbeat_lease(
177            &self,
178            _lease_id: &str,
179            _heartbeat_at: DateTime<Utc>,
180            _lease_expires_at: DateTime<Utc>,
181        ) -> Result<(), KernelError> {
182            Ok(())
183        }
184
185        fn expire_leases_and_requeue(
186            &self,
187            _stale_before: DateTime<Utc>,
188        ) -> Result<u64, KernelError> {
189            Ok(0)
190        }
191
192        fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
193            Ok(0)
194        }
195
196        fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
197            Ok(())
198        }
199        fn get_bounty(
200            &self,
201            _: &str,
202        ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
203            Ok(None)
204        }
205        fn list_bounties(
206            &self,
207            _: Option<&str>,
208            _: usize,
209        ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
210            Ok(vec![])
211        }
212        fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
213            Ok(())
214        }
215        fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
216            Ok(())
217        }
218        fn upsert_swarm_decomposition(
219            &self,
220            _: &super::super::models::SwarmTaskRecord,
221        ) -> Result<(), KernelError> {
222            Ok(())
223        }
224        fn get_swarm_decomposition(
225            &self,
226            _: &str,
227        ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
228            Ok(None)
229        }
230        fn register_worker(
231            &self,
232            _: &super::super::models::WorkerRecord,
233        ) -> Result<(), KernelError> {
234            Ok(())
235        }
236        fn get_worker(
237            &self,
238            _: &str,
239        ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
240            Ok(None)
241        }
242        fn list_workers(
243            &self,
244            _: Option<&str>,
245            _: Option<&str>,
246            _: usize,
247        ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
248            Ok(vec![])
249        }
250        fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
251            Ok(())
252        }
253        fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
254            Ok(())
255        }
256        fn get_recipe(
257            &self,
258            _: &str,
259        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
260            Ok(None)
261        }
262        fn fork_recipe(
263            &self,
264            _: &str,
265            _: &str,
266            _: &str,
267        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
268            Ok(None)
269        }
270        fn list_recipes(
271            &self,
272            _: Option<&str>,
273            _: usize,
274        ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
275            Ok(vec![])
276        }
277        fn express_organism(
278            &self,
279            _: &super::super::models::OrganismRecord,
280        ) -> Result<(), KernelError> {
281            Ok(())
282        }
283        fn get_organism(
284            &self,
285            _: &str,
286        ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
287            Ok(None)
288        }
289        fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
290            Ok(())
291        }
292        fn create_session(
293            &self,
294            _: &super::super::models::SessionRecord,
295        ) -> Result<(), KernelError> {
296            Ok(())
297        }
298        fn get_session(
299            &self,
300            _: &str,
301        ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
302            Ok(None)
303        }
304        fn add_session_message(
305            &self,
306            _: &super::super::models::SessionMessageRecord,
307        ) -> Result<(), KernelError> {
308            Ok(())
309        }
310        fn get_session_history(
311            &self,
312            _: &str,
313            _: usize,
314        ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
315            Ok(vec![])
316        }
317        fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
318            Ok(())
319        }
320        fn get_dispute(
321            &self,
322            _: &str,
323        ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
324            Ok(None)
325        }
326        fn get_disputes_for_bounty(
327            &self,
328            _: &str,
329        ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
330            Ok(vec![])
331        }
332        fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
333            Ok(())
334        }
335    }
336
337    fn attempt(id: &str, attempt_no: u32) -> AttemptDispatchRecord {
338        AttemptDispatchRecord {
339            attempt_id: id.to_string(),
340            run_id: "run-scheduler-test".to_string(),
341            attempt_no,
342            status: AttemptExecutionStatus::Queued,
343            retry_at: None,
344        }
345    }
346
347    #[test]
348    fn dispatch_one_skips_conflicted_candidate_and_preserves_order() {
349        let repo = FakeRepository::new(
350            vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
351            &["attempt-a"],
352        );
353        let scheduler = SkeletonScheduler::new(repo.clone());
354
355        let decision = scheduler
356            .dispatch_one("worker-scheduler")
357            .expect("dispatch should succeed");
358
359        match decision {
360            SchedulerDecision::Dispatched {
361                attempt_id,
362                worker_id,
363            } => {
364                assert_eq!(attempt_id, "attempt-b");
365                assert_eq!(worker_id, "worker-scheduler");
366            }
367            SchedulerDecision::Noop => panic!("expected a dispatch"),
368        }
369
370        let claimed = repo.claimed_attempts.lock().expect("claimed lock");
371        assert_eq!(claimed.as_slice(), ["attempt-b"]);
372    }
373
374    #[test]
375    fn dispatch_one_returns_noop_when_all_candidates_conflict() {
376        let repo = FakeRepository::new(
377            vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
378            &["attempt-a", "attempt-b"],
379        );
380        let scheduler = SkeletonScheduler::new(repo);
381
382        let decision = scheduler
383            .dispatch_one("worker-scheduler")
384            .expect("conflicts should not surface as hard errors");
385
386        assert!(matches!(decision, SchedulerDecision::Noop));
387    }
388
389    #[test]
390    fn dispatch_one_with_context_none_same_as_dispatch_one() {
391        let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
392        let scheduler = SkeletonScheduler::new(repo.clone());
393
394        let with_ctx = scheduler
395            .dispatch_one_with_context("worker-1", None)
396            .expect("dispatch should succeed");
397        let without = scheduler
398            .dispatch_one("worker-1")
399            .expect("dispatch should succeed");
400
401        match (&with_ctx, &without) {
402            (
403                SchedulerDecision::Dispatched { attempt_id: a1, .. },
404                SchedulerDecision::Dispatched { attempt_id: a2, .. },
405            ) => assert_eq!(a1, a2),
406            _ => panic!("expected both dispatched"),
407        }
408    }
409
410    #[test]
411    fn dispatch_context_builder() {
412        let ctx = DispatchContext::new()
413            .with_tenant("tenant-1")
414            .with_priority(5);
415        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-1"));
416        assert_eq!(ctx.priority, Some(5));
417    }
418}