Skip to main content

oris_execution_runtime/
scheduler.rs

1//! Scheduler skeleton for Phase 1 runtime rollout.
2
3use std::sync::Arc;
4
5use chrono::Utc;
6
7use oris_kernel::event::KernelError;
8
9use super::circuit_breaker::CircuitBreaker;
10use super::models::AttemptDispatchRecord;
11use super::observability::RejectionReason;
12use super::repository::RuntimeRepository;
13
14const DISPATCH_SCAN_LIMIT: usize = 16;
15
16/// Context for context-aware dispatch (tenant, priority, plugin/worker capabilities).
17/// Used to route or filter work; concrete routing logic can be extended later.
18#[derive(Clone, Debug, Default)]
19pub struct DispatchContext {
20    pub tenant_id: Option<String>,
21    pub priority: Option<u32>,
22    /// Plugin type names required for this dispatch (e.g. node kinds).
23    pub plugin_requirements: Option<Vec<String>>,
24    /// Worker capability tags the scheduler may match against.
25    pub worker_capabilities: Option<Vec<String>>,
26    /// Maximum queue depth before backpressure is applied.
27    /// When the number of dispatchable candidates meets or exceeds this limit,
28    /// `dispatch_one_with_context` returns `SchedulerDecision::Backpressure`
29    /// instead of acquiring a new lease.
30    pub max_queue_depth: Option<usize>,
31}
32
33impl DispatchContext {
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
39        self.tenant_id = Some(tenant_id.into());
40        self
41    }
42
43    pub fn with_priority(mut self, priority: u32) -> Self {
44        self.priority = Some(priority);
45        self
46    }
47
48    pub fn with_max_queue_depth(mut self, limit: usize) -> Self {
49        self.max_queue_depth = Some(limit);
50        self
51    }
52}
53
54/// Scheduler dispatch decision.
55#[derive(Clone, Debug)]
56pub enum SchedulerDecision {
57    Dispatched {
58        attempt_id: String,
59        worker_id: String,
60    },
61    /// Backpressure applied: the queue depth has exceeded the configured limit.
62    Backpressure {
63        reason: RejectionReason,
64        queue_depth: usize,
65    },
66    Noop,
67}
68
69/// Compile-safe scheduler skeleton for queue -> lease dispatch.
70pub struct SkeletonScheduler<R: RuntimeRepository> {
71    repository: R,
72    /// Optional circuit breaker applied globally to all dispatch calls.
73    circuit_breaker: Option<Arc<CircuitBreaker>>,
74}
75
76impl<R: RuntimeRepository> SkeletonScheduler<R> {
77    pub fn new(repository: R) -> Self {
78        Self {
79            repository,
80            circuit_breaker: None,
81        }
82    }
83
84    /// Attach a shared circuit breaker to this scheduler.
85    ///
86    /// When the breaker is `Open`, all dispatch calls return
87    /// `SchedulerDecision::Backpressure` until the probe window elapses.
88    pub fn with_circuit_breaker(mut self, breaker: Arc<CircuitBreaker>) -> Self {
89        self.circuit_breaker = Some(breaker);
90        self
91    }
92
93    /// Attempt to dispatch one eligible attempt to `worker_id`.
94    pub fn dispatch_one(&self, worker_id: &str) -> Result<SchedulerDecision, KernelError> {
95        self.dispatch_one_with_context(worker_id, None)
96    }
97
98    /// Dispatch one attempt to `worker_id` with optional context for tenant/priority/capability routing.
99    /// Context is passed through for future filtering or sorting; current implementation
100    /// uses the same candidate list as `dispatch_one`.
101    ///
102    /// If `context.max_queue_depth` is set and the number of dispatchable candidates is
103    /// greater than or equal to that limit, returns `SchedulerDecision::Backpressure`
104    /// without acquiring any lease.
105    pub fn dispatch_one_with_context(
106        &self,
107        worker_id: &str,
108        context: Option<&DispatchContext>,
109    ) -> Result<SchedulerDecision, KernelError> {
110        let now = Utc::now();
111
112        // Circuit breaker gate: if the breaker is Open, reject dispatch.
113        if let Some(cb) = &self.circuit_breaker {
114            if cb.is_open() {
115                return Ok(SchedulerDecision::Backpressure {
116                    reason: RejectionReason::capacity_limit("circuit breaker open"),
117                    queue_depth: 0,
118                });
119            }
120        }
121
122        let candidates: Vec<AttemptDispatchRecord> = self
123            .repository
124            .list_dispatchable_attempts(now, DISPATCH_SCAN_LIMIT)?;
125
126        // Backpressure gate: if queue depth meets or exceeds the limit, reject dispatch.
127        if let Some(limit) = context.and_then(|c| c.max_queue_depth) {
128            if candidates.len() >= limit {
129                return Ok(SchedulerDecision::Backpressure {
130                    reason: RejectionReason::capacity_limit(format!(
131                        "queue depth {} >= limit {}",
132                        candidates.len(),
133                        limit
134                    )),
135                    queue_depth: candidates.len(),
136                });
137            }
138        }
139
140        let lease_expires_at = now + chrono::Duration::seconds(30);
141
142        for candidate in candidates {
143            if let Err(e) =
144                self.repository
145                    .upsert_lease(&candidate.attempt_id, worker_id, lease_expires_at)
146            {
147                let msg = e.to_string();
148                if msg.contains("active lease already exists") || msg.contains("not dispatchable") {
149                    continue;
150                }
151                return Err(e);
152            }
153
154            return Ok(SchedulerDecision::Dispatched {
155                attempt_id: candidate.attempt_id,
156                worker_id: worker_id.to_string(),
157            });
158        }
159
160        Ok(SchedulerDecision::Noop)
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use std::collections::HashSet;
167    use std::sync::{Arc, Mutex};
168
169    use chrono::{DateTime, Utc};
170
171    use super::*;
172    use oris_kernel::identity::{RunId, Seq};
173
174    use super::super::models::{AttemptExecutionStatus, LeaseRecord};
175
176    #[derive(Clone)]
177    struct FakeRepository {
178        attempts: Vec<AttemptDispatchRecord>,
179        conflict_attempts: Arc<Mutex<HashSet<String>>>,
180        claimed_attempts: Arc<Mutex<Vec<String>>>,
181    }
182
183    impl FakeRepository {
184        fn new(attempts: Vec<AttemptDispatchRecord>, conflict_attempts: &[&str]) -> Self {
185            Self {
186                attempts,
187                conflict_attempts: Arc::new(Mutex::new(
188                    conflict_attempts.iter().map(|s| (*s).to_string()).collect(),
189                )),
190                claimed_attempts: Arc::new(Mutex::new(Vec::new())),
191            }
192        }
193    }
194
195    impl RuntimeRepository for FakeRepository {
196        fn list_dispatchable_attempts(
197            &self,
198            _now: DateTime<Utc>,
199            _limit: usize,
200        ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
201            Ok(self.attempts.clone())
202        }
203
204        fn upsert_lease(
205            &self,
206            attempt_id: &str,
207            worker_id: &str,
208            lease_expires_at: DateTime<Utc>,
209        ) -> Result<LeaseRecord, KernelError> {
210            if self
211                .conflict_attempts
212                .lock()
213                .expect("conflict lock")
214                .contains(attempt_id)
215            {
216                return Err(KernelError::Driver(format!(
217                    "active lease already exists for attempt: {}",
218                    attempt_id
219                )));
220            }
221            self.claimed_attempts
222                .lock()
223                .expect("claimed lock")
224                .push(attempt_id.to_string());
225            Ok(LeaseRecord {
226                lease_id: format!("lease-{}", attempt_id),
227                attempt_id: attempt_id.to_string(),
228                worker_id: worker_id.to_string(),
229                lease_expires_at,
230                heartbeat_at: Utc::now(),
231                version: 1,
232            })
233        }
234
235        fn heartbeat_lease(
236            &self,
237            _lease_id: &str,
238            _heartbeat_at: DateTime<Utc>,
239            _lease_expires_at: DateTime<Utc>,
240        ) -> Result<(), KernelError> {
241            Ok(())
242        }
243
244        fn expire_leases_and_requeue(
245            &self,
246            _stale_before: DateTime<Utc>,
247        ) -> Result<u64, KernelError> {
248            Ok(0)
249        }
250
251        fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
252            Ok(0)
253        }
254
255        fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
256            Ok(())
257        }
258        fn get_bounty(
259            &self,
260            _: &str,
261        ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
262            Ok(None)
263        }
264        fn list_bounties(
265            &self,
266            _: Option<&str>,
267            _: usize,
268        ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
269            Ok(vec![])
270        }
271        fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
272            Ok(())
273        }
274        fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
275            Ok(())
276        }
277        fn upsert_swarm_decomposition(
278            &self,
279            _: &super::super::models::SwarmTaskRecord,
280        ) -> Result<(), KernelError> {
281            Ok(())
282        }
283        fn get_swarm_decomposition(
284            &self,
285            _: &str,
286        ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
287            Ok(None)
288        }
289        fn register_worker(
290            &self,
291            _: &super::super::models::WorkerRecord,
292        ) -> Result<(), KernelError> {
293            Ok(())
294        }
295        fn get_worker(
296            &self,
297            _: &str,
298        ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
299            Ok(None)
300        }
301        fn list_workers(
302            &self,
303            _: Option<&str>,
304            _: Option<&str>,
305            _: usize,
306        ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
307            Ok(vec![])
308        }
309        fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
310            Ok(())
311        }
312        fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
313            Ok(())
314        }
315        fn get_recipe(
316            &self,
317            _: &str,
318        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
319            Ok(None)
320        }
321        fn fork_recipe(
322            &self,
323            _: &str,
324            _: &str,
325            _: &str,
326        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
327            Ok(None)
328        }
329        fn list_recipes(
330            &self,
331            _: Option<&str>,
332            _: usize,
333        ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
334            Ok(vec![])
335        }
336        fn express_organism(
337            &self,
338            _: &super::super::models::OrganismRecord,
339        ) -> Result<(), KernelError> {
340            Ok(())
341        }
342        fn get_organism(
343            &self,
344            _: &str,
345        ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
346            Ok(None)
347        }
348        fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
349            Ok(())
350        }
351        fn create_session(
352            &self,
353            _: &super::super::models::SessionRecord,
354        ) -> Result<(), KernelError> {
355            Ok(())
356        }
357        fn get_session(
358            &self,
359            _: &str,
360        ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
361            Ok(None)
362        }
363        fn add_session_message(
364            &self,
365            _: &super::super::models::SessionMessageRecord,
366        ) -> Result<(), KernelError> {
367            Ok(())
368        }
369        fn get_session_history(
370            &self,
371            _: &str,
372            _: usize,
373        ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
374            Ok(vec![])
375        }
376        fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
377            Ok(())
378        }
379        fn get_dispute(
380            &self,
381            _: &str,
382        ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
383            Ok(None)
384        }
385        fn get_disputes_for_bounty(
386            &self,
387            _: &str,
388        ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
389            Ok(vec![])
390        }
391        fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
392            Ok(())
393        }
394    }
395
396    fn attempt(id: &str, attempt_no: u32) -> AttemptDispatchRecord {
397        AttemptDispatchRecord {
398            attempt_id: id.to_string(),
399            run_id: "run-scheduler-test".to_string(),
400            attempt_no,
401            status: AttemptExecutionStatus::Queued,
402            retry_at: None,
403        }
404    }
405
406    #[test]
407    fn dispatch_one_skips_conflicted_candidate_and_preserves_order() {
408        let repo = FakeRepository::new(
409            vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
410            &["attempt-a"],
411        );
412        let scheduler = SkeletonScheduler::new(repo.clone());
413
414        let decision = scheduler
415            .dispatch_one("worker-scheduler")
416            .expect("dispatch should succeed");
417
418        match decision {
419            SchedulerDecision::Dispatched {
420                attempt_id,
421                worker_id,
422            } => {
423                assert_eq!(attempt_id, "attempt-b");
424                assert_eq!(worker_id, "worker-scheduler");
425            }
426            SchedulerDecision::Noop | SchedulerDecision::Backpressure { .. } => {
427                panic!("expected a dispatch")
428            }
429        }
430
431        let claimed = repo.claimed_attempts.lock().expect("claimed lock");
432        assert_eq!(claimed.as_slice(), ["attempt-b"]);
433    }
434
435    #[test]
436    fn dispatch_one_returns_noop_when_all_candidates_conflict() {
437        let repo = FakeRepository::new(
438            vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
439            &["attempt-a", "attempt-b"],
440        );
441        let scheduler = SkeletonScheduler::new(repo);
442
443        let decision = scheduler
444            .dispatch_one("worker-scheduler")
445            .expect("conflicts should not surface as hard errors");
446
447        assert!(matches!(decision, SchedulerDecision::Noop));
448    }
449
450    #[test]
451    fn dispatch_one_with_context_none_same_as_dispatch_one() {
452        let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
453        let scheduler = SkeletonScheduler::new(repo.clone());
454
455        let with_ctx = scheduler
456            .dispatch_one_with_context("worker-1", None)
457            .expect("dispatch should succeed");
458        let without = scheduler
459            .dispatch_one("worker-1")
460            .expect("dispatch should succeed");
461
462        match (&with_ctx, &without) {
463            (
464                SchedulerDecision::Dispatched { attempt_id: a1, .. },
465                SchedulerDecision::Dispatched { attempt_id: a2, .. },
466            ) => assert_eq!(a1, a2),
467            _ => panic!("expected both dispatched"),
468        }
469    }
470
471    #[test]
472    fn dispatch_context_builder() {
473        let ctx = DispatchContext::new()
474            .with_tenant("tenant-1")
475            .with_priority(5);
476        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-1"));
477        assert_eq!(ctx.priority, Some(5));
478    }
479
480    #[test]
481    fn dispatch_one_with_context_applies_backpressure_when_queue_exceeds_limit() {
482        // 3 queued attempts, limit = 2  → backpressure should fire
483        let repo = FakeRepository::new(
484            vec![
485                attempt("attempt-a", 1),
486                attempt("attempt-b", 2),
487                attempt("attempt-c", 3),
488            ],
489            &[],
490        );
491        let scheduler = SkeletonScheduler::new(repo);
492        let ctx = DispatchContext::new().with_max_queue_depth(2);
493
494        let decision = scheduler
495            .dispatch_one_with_context("worker-1", Some(&ctx))
496            .expect("should not error on backpressure");
497
498        match decision {
499            SchedulerDecision::Backpressure { queue_depth, .. } => {
500                assert_eq!(queue_depth, 3);
501            }
502            other => panic!("expected Backpressure, got {:?}", other),
503        }
504    }
505
506    #[test]
507    fn dispatch_one_with_context_dispatches_when_queue_below_limit() {
508        // 1 queued attempt, limit = 5 → should dispatch normally
509        let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
510        let scheduler = SkeletonScheduler::new(repo);
511        let ctx = DispatchContext::new().with_max_queue_depth(5);
512
513        let decision = scheduler
514            .dispatch_one_with_context("worker-1", Some(&ctx))
515            .expect("dispatch should succeed");
516
517        assert!(
518            matches!(decision, SchedulerDecision::Dispatched { .. }),
519            "expected Dispatched, got {:?}",
520            decision
521        );
522    }
523
524    #[test]
525    fn open_circuit_breaker_returns_backpressure() {
526        use crate::circuit_breaker::CircuitBreaker;
527        use std::sync::Arc;
528
529        let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
530        let breaker = Arc::new(CircuitBreaker::new(30));
531        breaker.trip();
532
533        let scheduler = SkeletonScheduler::new(repo).with_circuit_breaker(breaker);
534
535        let decision = scheduler
536            .dispatch_one("worker-1")
537            .expect("should not error");
538
539        match decision {
540            SchedulerDecision::Backpressure { reason, .. } => {
541                let msg = format!("{:?}", reason);
542                assert!(
543                    msg.contains("circuit breaker open"),
544                    "unexpected reason: {}",
545                    msg
546                );
547            }
548            other => panic!("expected Backpressure, got {:?}", other),
549        }
550    }
551
552    #[test]
553    fn closed_circuit_breaker_allows_dispatch() {
554        use crate::circuit_breaker::CircuitBreaker;
555        use std::sync::Arc;
556
557        let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
558        let breaker = Arc::new(CircuitBreaker::new(30)); // starts Closed
559
560        let scheduler = SkeletonScheduler::new(repo).with_circuit_breaker(breaker);
561
562        let decision = scheduler
563            .dispatch_one("worker-1")
564            .expect("dispatch should succeed");
565
566        assert!(
567            matches!(decision, SchedulerDecision::Dispatched { .. }),
568            "expected Dispatched, got {:?}",
569            decision
570        );
571    }
572}