1use chrono::Utc;
4
5use oris_kernel::event::KernelError;
6
7use super::models::AttemptDispatchRecord;
8use super::repository::RuntimeRepository;
9
10const DISPATCH_SCAN_LIMIT: usize = 16;
11
12#[derive(Clone, Debug, Default)]
15pub struct DispatchContext {
16 pub tenant_id: Option<String>,
17 pub priority: Option<u32>,
18 pub plugin_requirements: Option<Vec<String>>,
20 pub worker_capabilities: Option<Vec<String>>,
22}
23
24impl DispatchContext {
25 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
30 self.tenant_id = Some(tenant_id.into());
31 self
32 }
33
34 pub fn with_priority(mut self, priority: u32) -> Self {
35 self.priority = Some(priority);
36 self
37 }
38}
39
40#[derive(Clone, Debug)]
42pub enum SchedulerDecision {
43 Dispatched {
44 attempt_id: String,
45 worker_id: String,
46 },
47 Noop,
48}
49
50pub struct SkeletonScheduler<R: RuntimeRepository> {
52 repository: R,
53}
54
55impl<R: RuntimeRepository> SkeletonScheduler<R> {
56 pub fn new(repository: R) -> Self {
57 Self { repository }
58 }
59
60 pub fn dispatch_one(&self, worker_id: &str) -> Result<SchedulerDecision, KernelError> {
62 self.dispatch_one_with_context(worker_id, None)
63 }
64
65 pub fn dispatch_one_with_context(
69 &self,
70 worker_id: &str,
71 context: Option<&DispatchContext>,
72 ) -> Result<SchedulerDecision, KernelError> {
73 let now = Utc::now();
74 let candidates: Vec<AttemptDispatchRecord> = self
75 .repository
76 .list_dispatchable_attempts(now, DISPATCH_SCAN_LIMIT)?;
77 if context.as_ref().and_then(|c| c.priority).is_some() {
79 }
81 let lease_expires_at = now + chrono::Duration::seconds(30);
82
83 for candidate in candidates {
84 if let Err(e) =
85 self.repository
86 .upsert_lease(&candidate.attempt_id, worker_id, lease_expires_at)
87 {
88 let msg = e.to_string();
89 if msg.contains("active lease already exists") || msg.contains("not dispatchable") {
90 continue;
91 }
92 return Err(e);
93 }
94
95 return Ok(SchedulerDecision::Dispatched {
96 attempt_id: candidate.attempt_id,
97 worker_id: worker_id.to_string(),
98 });
99 }
100
101 Ok(SchedulerDecision::Noop)
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::collections::HashSet;
108 use std::sync::{Arc, Mutex};
109
110 use chrono::{DateTime, Utc};
111
112 use super::*;
113 use oris_kernel::identity::{RunId, Seq};
114
115 use super::super::models::{AttemptExecutionStatus, LeaseRecord};
116
117 #[derive(Clone)]
118 struct FakeRepository {
119 attempts: Vec<AttemptDispatchRecord>,
120 conflict_attempts: Arc<Mutex<HashSet<String>>>,
121 claimed_attempts: Arc<Mutex<Vec<String>>>,
122 }
123
124 impl FakeRepository {
125 fn new(attempts: Vec<AttemptDispatchRecord>, conflict_attempts: &[&str]) -> Self {
126 Self {
127 attempts,
128 conflict_attempts: Arc::new(Mutex::new(
129 conflict_attempts.iter().map(|s| (*s).to_string()).collect(),
130 )),
131 claimed_attempts: Arc::new(Mutex::new(Vec::new())),
132 }
133 }
134 }
135
136 impl RuntimeRepository for FakeRepository {
137 fn list_dispatchable_attempts(
138 &self,
139 _now: DateTime<Utc>,
140 _limit: usize,
141 ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
142 Ok(self.attempts.clone())
143 }
144
145 fn upsert_lease(
146 &self,
147 attempt_id: &str,
148 worker_id: &str,
149 lease_expires_at: DateTime<Utc>,
150 ) -> Result<LeaseRecord, KernelError> {
151 if self
152 .conflict_attempts
153 .lock()
154 .expect("conflict lock")
155 .contains(attempt_id)
156 {
157 return Err(KernelError::Driver(format!(
158 "active lease already exists for attempt: {}",
159 attempt_id
160 )));
161 }
162 self.claimed_attempts
163 .lock()
164 .expect("claimed lock")
165 .push(attempt_id.to_string());
166 Ok(LeaseRecord {
167 lease_id: format!("lease-{}", attempt_id),
168 attempt_id: attempt_id.to_string(),
169 worker_id: worker_id.to_string(),
170 lease_expires_at,
171 heartbeat_at: Utc::now(),
172 version: 1,
173 })
174 }
175
176 fn heartbeat_lease(
177 &self,
178 _lease_id: &str,
179 _heartbeat_at: DateTime<Utc>,
180 _lease_expires_at: DateTime<Utc>,
181 ) -> Result<(), KernelError> {
182 Ok(())
183 }
184
185 fn expire_leases_and_requeue(
186 &self,
187 _stale_before: DateTime<Utc>,
188 ) -> Result<u64, KernelError> {
189 Ok(0)
190 }
191
192 fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
193 Ok(0)
194 }
195 }
196
197 fn attempt(id: &str, attempt_no: u32) -> AttemptDispatchRecord {
198 AttemptDispatchRecord {
199 attempt_id: id.to_string(),
200 run_id: "run-scheduler-test".to_string(),
201 attempt_no,
202 status: AttemptExecutionStatus::Queued,
203 retry_at: None,
204 }
205 }
206
207 #[test]
208 fn dispatch_one_skips_conflicted_candidate_and_preserves_order() {
209 let repo = FakeRepository::new(
210 vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
211 &["attempt-a"],
212 );
213 let scheduler = SkeletonScheduler::new(repo.clone());
214
215 let decision = scheduler
216 .dispatch_one("worker-scheduler")
217 .expect("dispatch should succeed");
218
219 match decision {
220 SchedulerDecision::Dispatched {
221 attempt_id,
222 worker_id,
223 } => {
224 assert_eq!(attempt_id, "attempt-b");
225 assert_eq!(worker_id, "worker-scheduler");
226 }
227 SchedulerDecision::Noop => panic!("expected a dispatch"),
228 }
229
230 let claimed = repo.claimed_attempts.lock().expect("claimed lock");
231 assert_eq!(claimed.as_slice(), ["attempt-b"]);
232 }
233
234 #[test]
235 fn dispatch_one_returns_noop_when_all_candidates_conflict() {
236 let repo = FakeRepository::new(
237 vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
238 &["attempt-a", "attempt-b"],
239 );
240 let scheduler = SkeletonScheduler::new(repo);
241
242 let decision = scheduler
243 .dispatch_one("worker-scheduler")
244 .expect("conflicts should not surface as hard errors");
245
246 assert!(matches!(decision, SchedulerDecision::Noop));
247 }
248
249 #[test]
250 fn dispatch_one_with_context_none_same_as_dispatch_one() {
251 let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
252 let scheduler = SkeletonScheduler::new(repo.clone());
253
254 let with_ctx = scheduler
255 .dispatch_one_with_context("worker-1", None)
256 .expect("dispatch should succeed");
257 let without = scheduler
258 .dispatch_one("worker-1")
259 .expect("dispatch should succeed");
260
261 match (&with_ctx, &without) {
262 (
263 SchedulerDecision::Dispatched { attempt_id: a1, .. },
264 SchedulerDecision::Dispatched { attempt_id: a2, .. },
265 ) => assert_eq!(a1, a2),
266 _ => panic!("expected both dispatched"),
267 }
268 }
269
270 #[test]
271 fn dispatch_context_builder() {
272 let ctx = DispatchContext::new()
273 .with_tenant("tenant-1")
274 .with_priority(5);
275 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-1"));
276 assert_eq!(ctx.priority, Some(5));
277 }
278}