1use chrono::Utc;
4
5use oris_kernel::event::KernelError;
6
7use super::models::AttemptDispatchRecord;
8use super::repository::RuntimeRepository;
9
10const DISPATCH_SCAN_LIMIT: usize = 16;
11
12#[derive(Clone, Debug, Default)]
15pub struct DispatchContext {
16 pub tenant_id: Option<String>,
17 pub priority: Option<u32>,
18 pub plugin_requirements: Option<Vec<String>>,
20 pub worker_capabilities: Option<Vec<String>>,
22}
23
24impl DispatchContext {
25 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
30 self.tenant_id = Some(tenant_id.into());
31 self
32 }
33
34 pub fn with_priority(mut self, priority: u32) -> Self {
35 self.priority = Some(priority);
36 self
37 }
38}
39
40#[derive(Clone, Debug)]
42pub enum SchedulerDecision {
43 Dispatched {
44 attempt_id: String,
45 worker_id: String,
46 },
47 Noop,
48}
49
50pub struct SkeletonScheduler<R: RuntimeRepository> {
52 repository: R,
53}
54
55impl<R: RuntimeRepository> SkeletonScheduler<R> {
56 pub fn new(repository: R) -> Self {
57 Self { repository }
58 }
59
60 pub fn dispatch_one(&self, worker_id: &str) -> Result<SchedulerDecision, KernelError> {
62 self.dispatch_one_with_context(worker_id, None)
63 }
64
65 pub fn dispatch_one_with_context(
69 &self,
70 worker_id: &str,
71 context: Option<&DispatchContext>,
72 ) -> Result<SchedulerDecision, KernelError> {
73 let now = Utc::now();
74 let candidates: Vec<AttemptDispatchRecord> = self
75 .repository
76 .list_dispatchable_attempts(now, DISPATCH_SCAN_LIMIT)?;
77 if context.as_ref().and_then(|c| c.priority).is_some() {
79 }
81 let lease_expires_at = now + chrono::Duration::seconds(30);
82
83 for candidate in candidates {
84 if let Err(e) =
85 self.repository
86 .upsert_lease(&candidate.attempt_id, worker_id, lease_expires_at)
87 {
88 let msg = e.to_string();
89 if msg.contains("active lease already exists") || msg.contains("not dispatchable") {
90 continue;
91 }
92 return Err(e);
93 }
94
95 return Ok(SchedulerDecision::Dispatched {
96 attempt_id: candidate.attempt_id,
97 worker_id: worker_id.to_string(),
98 });
99 }
100
101 Ok(SchedulerDecision::Noop)
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::collections::HashSet;
108 use std::sync::{Arc, Mutex};
109
110 use chrono::{DateTime, Utc};
111
112 use super::*;
113 use oris_kernel::identity::{RunId, Seq};
114
115 use super::super::models::{AttemptExecutionStatus, LeaseRecord};
116
117 #[derive(Clone)]
118 struct FakeRepository {
119 attempts: Vec<AttemptDispatchRecord>,
120 conflict_attempts: Arc<Mutex<HashSet<String>>>,
121 claimed_attempts: Arc<Mutex<Vec<String>>>,
122 }
123
124 impl FakeRepository {
125 fn new(attempts: Vec<AttemptDispatchRecord>, conflict_attempts: &[&str]) -> Self {
126 Self {
127 attempts,
128 conflict_attempts: Arc::new(Mutex::new(
129 conflict_attempts.iter().map(|s| (*s).to_string()).collect(),
130 )),
131 claimed_attempts: Arc::new(Mutex::new(Vec::new())),
132 }
133 }
134 }
135
136 impl RuntimeRepository for FakeRepository {
137 fn list_dispatchable_attempts(
138 &self,
139 _now: DateTime<Utc>,
140 _limit: usize,
141 ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
142 Ok(self.attempts.clone())
143 }
144
145 fn upsert_lease(
146 &self,
147 attempt_id: &str,
148 worker_id: &str,
149 lease_expires_at: DateTime<Utc>,
150 ) -> Result<LeaseRecord, KernelError> {
151 if self
152 .conflict_attempts
153 .lock()
154 .expect("conflict lock")
155 .contains(attempt_id)
156 {
157 return Err(KernelError::Driver(format!(
158 "active lease already exists for attempt: {}",
159 attempt_id
160 )));
161 }
162 self.claimed_attempts
163 .lock()
164 .expect("claimed lock")
165 .push(attempt_id.to_string());
166 Ok(LeaseRecord {
167 lease_id: format!("lease-{}", attempt_id),
168 attempt_id: attempt_id.to_string(),
169 worker_id: worker_id.to_string(),
170 lease_expires_at,
171 heartbeat_at: Utc::now(),
172 version: 1,
173 })
174 }
175
176 fn heartbeat_lease(
177 &self,
178 _lease_id: &str,
179 _heartbeat_at: DateTime<Utc>,
180 _lease_expires_at: DateTime<Utc>,
181 ) -> Result<(), KernelError> {
182 Ok(())
183 }
184
185 fn expire_leases_and_requeue(
186 &self,
187 _stale_before: DateTime<Utc>,
188 ) -> Result<u64, KernelError> {
189 Ok(0)
190 }
191
192 fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
193 Ok(0)
194 }
195
196 fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
197 Ok(())
198 }
199 fn get_bounty(
200 &self,
201 _: &str,
202 ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
203 Ok(None)
204 }
205 fn list_bounties(
206 &self,
207 _: Option<&str>,
208 _: usize,
209 ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
210 Ok(vec![])
211 }
212 fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
213 Ok(())
214 }
215 fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
216 Ok(())
217 }
218 fn upsert_swarm_decomposition(
219 &self,
220 _: &super::super::models::SwarmTaskRecord,
221 ) -> Result<(), KernelError> {
222 Ok(())
223 }
224 fn get_swarm_decomposition(
225 &self,
226 _: &str,
227 ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
228 Ok(None)
229 }
230 fn register_worker(
231 &self,
232 _: &super::super::models::WorkerRecord,
233 ) -> Result<(), KernelError> {
234 Ok(())
235 }
236 fn get_worker(
237 &self,
238 _: &str,
239 ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
240 Ok(None)
241 }
242 fn list_workers(
243 &self,
244 _: Option<&str>,
245 _: Option<&str>,
246 _: usize,
247 ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
248 Ok(vec![])
249 }
250 fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
251 Ok(())
252 }
253 fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
254 Ok(())
255 }
256 fn get_recipe(
257 &self,
258 _: &str,
259 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
260 Ok(None)
261 }
262 fn fork_recipe(
263 &self,
264 _: &str,
265 _: &str,
266 _: &str,
267 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
268 Ok(None)
269 }
270 fn list_recipes(
271 &self,
272 _: Option<&str>,
273 _: usize,
274 ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
275 Ok(vec![])
276 }
277 fn express_organism(
278 &self,
279 _: &super::super::models::OrganismRecord,
280 ) -> Result<(), KernelError> {
281 Ok(())
282 }
283 fn get_organism(
284 &self,
285 _: &str,
286 ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
287 Ok(None)
288 }
289 fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
290 Ok(())
291 }
292 fn create_session(
293 &self,
294 _: &super::super::models::SessionRecord,
295 ) -> Result<(), KernelError> {
296 Ok(())
297 }
298 fn get_session(
299 &self,
300 _: &str,
301 ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
302 Ok(None)
303 }
304 fn add_session_message(
305 &self,
306 _: &super::super::models::SessionMessageRecord,
307 ) -> Result<(), KernelError> {
308 Ok(())
309 }
310 fn get_session_history(
311 &self,
312 _: &str,
313 _: usize,
314 ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
315 Ok(vec![])
316 }
317 fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
318 Ok(())
319 }
320 fn get_dispute(
321 &self,
322 _: &str,
323 ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
324 Ok(None)
325 }
326 fn get_disputes_for_bounty(
327 &self,
328 _: &str,
329 ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
330 Ok(vec![])
331 }
332 fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
333 Ok(())
334 }
335 }
336
337 fn attempt(id: &str, attempt_no: u32) -> AttemptDispatchRecord {
338 AttemptDispatchRecord {
339 attempt_id: id.to_string(),
340 run_id: "run-scheduler-test".to_string(),
341 attempt_no,
342 status: AttemptExecutionStatus::Queued,
343 retry_at: None,
344 }
345 }
346
347 #[test]
348 fn dispatch_one_skips_conflicted_candidate_and_preserves_order() {
349 let repo = FakeRepository::new(
350 vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
351 &["attempt-a"],
352 );
353 let scheduler = SkeletonScheduler::new(repo.clone());
354
355 let decision = scheduler
356 .dispatch_one("worker-scheduler")
357 .expect("dispatch should succeed");
358
359 match decision {
360 SchedulerDecision::Dispatched {
361 attempt_id,
362 worker_id,
363 } => {
364 assert_eq!(attempt_id, "attempt-b");
365 assert_eq!(worker_id, "worker-scheduler");
366 }
367 SchedulerDecision::Noop => panic!("expected a dispatch"),
368 }
369
370 let claimed = repo.claimed_attempts.lock().expect("claimed lock");
371 assert_eq!(claimed.as_slice(), ["attempt-b"]);
372 }
373
374 #[test]
375 fn dispatch_one_returns_noop_when_all_candidates_conflict() {
376 let repo = FakeRepository::new(
377 vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
378 &["attempt-a", "attempt-b"],
379 );
380 let scheduler = SkeletonScheduler::new(repo);
381
382 let decision = scheduler
383 .dispatch_one("worker-scheduler")
384 .expect("conflicts should not surface as hard errors");
385
386 assert!(matches!(decision, SchedulerDecision::Noop));
387 }
388
389 #[test]
390 fn dispatch_one_with_context_none_same_as_dispatch_one() {
391 let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
392 let scheduler = SkeletonScheduler::new(repo.clone());
393
394 let with_ctx = scheduler
395 .dispatch_one_with_context("worker-1", None)
396 .expect("dispatch should succeed");
397 let without = scheduler
398 .dispatch_one("worker-1")
399 .expect("dispatch should succeed");
400
401 match (&with_ctx, &without) {
402 (
403 SchedulerDecision::Dispatched { attempt_id: a1, .. },
404 SchedulerDecision::Dispatched { attempt_id: a2, .. },
405 ) => assert_eq!(a1, a2),
406 _ => panic!("expected both dispatched"),
407 }
408 }
409
410 #[test]
411 fn dispatch_context_builder() {
412 let ctx = DispatchContext::new()
413 .with_tenant("tenant-1")
414 .with_priority(5);
415 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-1"));
416 assert_eq!(ctx.priority, Some(5));
417 }
418}