Skip to main content

oris_execution_runtime/
scheduler.rs

1//! Scheduler skeleton for Phase 1 runtime rollout.
2
3use chrono::Utc;
4
5use oris_kernel::event::KernelError;
6
7use super::models::AttemptDispatchRecord;
8use super::repository::RuntimeRepository;
9
10const DISPATCH_SCAN_LIMIT: usize = 16;
11
12/// Context for context-aware dispatch (tenant, priority, plugin/worker capabilities).
13/// Used to route or filter work; concrete routing logic can be extended later.
14#[derive(Clone, Debug, Default)]
15pub struct DispatchContext {
16    pub tenant_id: Option<String>,
17    pub priority: Option<u32>,
18    /// Plugin type names required for this dispatch (e.g. node kinds).
19    pub plugin_requirements: Option<Vec<String>>,
20    /// Worker capability tags the scheduler may match against.
21    pub worker_capabilities: Option<Vec<String>>,
22}
23
24impl DispatchContext {
25    pub fn new() -> Self {
26        Self::default()
27    }
28
29    pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
30        self.tenant_id = Some(tenant_id.into());
31        self
32    }
33
34    pub fn with_priority(mut self, priority: u32) -> Self {
35        self.priority = Some(priority);
36        self
37    }
38}
39
40/// Scheduler dispatch decision.
41#[derive(Clone, Debug)]
42pub enum SchedulerDecision {
43    Dispatched {
44        attempt_id: String,
45        worker_id: String,
46    },
47    Noop,
48}
49
50/// Compile-safe scheduler skeleton for queue -> lease dispatch.
51pub struct SkeletonScheduler<R: RuntimeRepository> {
52    repository: R,
53}
54
55impl<R: RuntimeRepository> SkeletonScheduler<R> {
56    pub fn new(repository: R) -> Self {
57        Self { repository }
58    }
59
60    /// Attempt to dispatch one eligible attempt to `worker_id`.
61    pub fn dispatch_one(&self, worker_id: &str) -> Result<SchedulerDecision, KernelError> {
62        self.dispatch_one_with_context(worker_id, None)
63    }
64
65    /// Dispatch one attempt to `worker_id` with optional context for tenant/priority/capability routing.
66    /// Context is passed through for future filtering or sorting; current implementation
67    /// uses the same candidate list as `dispatch_one`.
68    pub fn dispatch_one_with_context(
69        &self,
70        worker_id: &str,
71        context: Option<&DispatchContext>,
72    ) -> Result<SchedulerDecision, KernelError> {
73        let now = Utc::now();
74        let candidates: Vec<AttemptDispatchRecord> = self
75            .repository
76            .list_dispatchable_attempts(now, DISPATCH_SCAN_LIMIT)?;
77        // Optional: apply context-based sort (e.g. by priority when available on attempts).
78        if context.as_ref().and_then(|c| c.priority).is_some() {
79            // Placeholder: could sort by attempt priority when AttemptDispatchRecord carries it.
80        }
81        let lease_expires_at = now + chrono::Duration::seconds(30);
82
83        for candidate in candidates {
84            if let Err(e) =
85                self.repository
86                    .upsert_lease(&candidate.attempt_id, worker_id, lease_expires_at)
87            {
88                let msg = e.to_string();
89                if msg.contains("active lease already exists") || msg.contains("not dispatchable") {
90                    continue;
91                }
92                return Err(e);
93            }
94
95            return Ok(SchedulerDecision::Dispatched {
96                attempt_id: candidate.attempt_id,
97                worker_id: worker_id.to_string(),
98            });
99        }
100
101        Ok(SchedulerDecision::Noop)
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::collections::HashSet;
108    use std::sync::{Arc, Mutex};
109
110    use chrono::{DateTime, Utc};
111
112    use super::*;
113    use oris_kernel::identity::{RunId, Seq};
114
115    use super::super::models::{AttemptExecutionStatus, LeaseRecord};
116
117    #[derive(Clone)]
118    struct FakeRepository {
119        attempts: Vec<AttemptDispatchRecord>,
120        conflict_attempts: Arc<Mutex<HashSet<String>>>,
121        claimed_attempts: Arc<Mutex<Vec<String>>>,
122    }
123
124    impl FakeRepository {
125        fn new(attempts: Vec<AttemptDispatchRecord>, conflict_attempts: &[&str]) -> Self {
126            Self {
127                attempts,
128                conflict_attempts: Arc::new(Mutex::new(
129                    conflict_attempts.iter().map(|s| (*s).to_string()).collect(),
130                )),
131                claimed_attempts: Arc::new(Mutex::new(Vec::new())),
132            }
133        }
134    }
135
136    impl RuntimeRepository for FakeRepository {
137        fn list_dispatchable_attempts(
138            &self,
139            _now: DateTime<Utc>,
140            _limit: usize,
141        ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
142            Ok(self.attempts.clone())
143        }
144
145        fn upsert_lease(
146            &self,
147            attempt_id: &str,
148            worker_id: &str,
149            lease_expires_at: DateTime<Utc>,
150        ) -> Result<LeaseRecord, KernelError> {
151            if self
152                .conflict_attempts
153                .lock()
154                .expect("conflict lock")
155                .contains(attempt_id)
156            {
157                return Err(KernelError::Driver(format!(
158                    "active lease already exists for attempt: {}",
159                    attempt_id
160                )));
161            }
162            self.claimed_attempts
163                .lock()
164                .expect("claimed lock")
165                .push(attempt_id.to_string());
166            Ok(LeaseRecord {
167                lease_id: format!("lease-{}", attempt_id),
168                attempt_id: attempt_id.to_string(),
169                worker_id: worker_id.to_string(),
170                lease_expires_at,
171                heartbeat_at: Utc::now(),
172                version: 1,
173            })
174        }
175
176        fn heartbeat_lease(
177            &self,
178            _lease_id: &str,
179            _heartbeat_at: DateTime<Utc>,
180            _lease_expires_at: DateTime<Utc>,
181        ) -> Result<(), KernelError> {
182            Ok(())
183        }
184
185        fn expire_leases_and_requeue(
186            &self,
187            _stale_before: DateTime<Utc>,
188        ) -> Result<u64, KernelError> {
189            Ok(0)
190        }
191
192        fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
193            Ok(0)
194        }
195    }
196
197    fn attempt(id: &str, attempt_no: u32) -> AttemptDispatchRecord {
198        AttemptDispatchRecord {
199            attempt_id: id.to_string(),
200            run_id: "run-scheduler-test".to_string(),
201            attempt_no,
202            status: AttemptExecutionStatus::Queued,
203            retry_at: None,
204        }
205    }
206
207    #[test]
208    fn dispatch_one_skips_conflicted_candidate_and_preserves_order() {
209        let repo = FakeRepository::new(
210            vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
211            &["attempt-a"],
212        );
213        let scheduler = SkeletonScheduler::new(repo.clone());
214
215        let decision = scheduler
216            .dispatch_one("worker-scheduler")
217            .expect("dispatch should succeed");
218
219        match decision {
220            SchedulerDecision::Dispatched {
221                attempt_id,
222                worker_id,
223            } => {
224                assert_eq!(attempt_id, "attempt-b");
225                assert_eq!(worker_id, "worker-scheduler");
226            }
227            SchedulerDecision::Noop => panic!("expected a dispatch"),
228        }
229
230        let claimed = repo.claimed_attempts.lock().expect("claimed lock");
231        assert_eq!(claimed.as_slice(), ["attempt-b"]);
232    }
233
234    #[test]
235    fn dispatch_one_returns_noop_when_all_candidates_conflict() {
236        let repo = FakeRepository::new(
237            vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
238            &["attempt-a", "attempt-b"],
239        );
240        let scheduler = SkeletonScheduler::new(repo);
241
242        let decision = scheduler
243            .dispatch_one("worker-scheduler")
244            .expect("conflicts should not surface as hard errors");
245
246        assert!(matches!(decision, SchedulerDecision::Noop));
247    }
248
249    #[test]
250    fn dispatch_one_with_context_none_same_as_dispatch_one() {
251        let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
252        let scheduler = SkeletonScheduler::new(repo.clone());
253
254        let with_ctx = scheduler
255            .dispatch_one_with_context("worker-1", None)
256            .expect("dispatch should succeed");
257        let without = scheduler
258            .dispatch_one("worker-1")
259            .expect("dispatch should succeed");
260
261        match (&with_ctx, &without) {
262            (
263                SchedulerDecision::Dispatched { attempt_id: a1, .. },
264                SchedulerDecision::Dispatched { attempt_id: a2, .. },
265            ) => assert_eq!(a1, a2),
266            _ => panic!("expected both dispatched"),
267        }
268    }
269
270    #[test]
271    fn dispatch_context_builder() {
272        let ctx = DispatchContext::new()
273            .with_tenant("tenant-1")
274            .with_priority(5);
275        assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-1"));
276        assert_eq!(ctx.priority, Some(5));
277    }
278}