1use std::sync::Arc;
4
5use chrono::Utc;
6
7use oris_kernel::event::KernelError;
8
9use super::circuit_breaker::CircuitBreaker;
10use super::models::AttemptDispatchRecord;
11use super::observability::RejectionReason;
12use super::repository::RuntimeRepository;
13
14const DISPATCH_SCAN_LIMIT: usize = 16;
15
16#[derive(Clone, Debug, Default)]
19pub struct DispatchContext {
20 pub tenant_id: Option<String>,
21 pub priority: Option<u32>,
22 pub plugin_requirements: Option<Vec<String>>,
24 pub worker_capabilities: Option<Vec<String>>,
26 pub max_queue_depth: Option<usize>,
31}
32
33impl DispatchContext {
34 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn with_tenant(mut self, tenant_id: impl Into<String>) -> Self {
39 self.tenant_id = Some(tenant_id.into());
40 self
41 }
42
43 pub fn with_priority(mut self, priority: u32) -> Self {
44 self.priority = Some(priority);
45 self
46 }
47
48 pub fn with_max_queue_depth(mut self, limit: usize) -> Self {
49 self.max_queue_depth = Some(limit);
50 self
51 }
52}
53
54#[derive(Clone, Debug)]
56pub enum SchedulerDecision {
57 Dispatched {
58 attempt_id: String,
59 worker_id: String,
60 },
61 Backpressure {
63 reason: RejectionReason,
64 queue_depth: usize,
65 },
66 Noop,
67}
68
69pub struct SkeletonScheduler<R: RuntimeRepository> {
71 repository: R,
72 circuit_breaker: Option<Arc<CircuitBreaker>>,
74}
75
76impl<R: RuntimeRepository> SkeletonScheduler<R> {
77 pub fn new(repository: R) -> Self {
78 Self {
79 repository,
80 circuit_breaker: None,
81 }
82 }
83
84 pub fn with_circuit_breaker(mut self, breaker: Arc<CircuitBreaker>) -> Self {
89 self.circuit_breaker = Some(breaker);
90 self
91 }
92
93 pub fn dispatch_one(&self, worker_id: &str) -> Result<SchedulerDecision, KernelError> {
95 self.dispatch_one_with_context(worker_id, None)
96 }
97
98 pub fn dispatch_one_with_context(
106 &self,
107 worker_id: &str,
108 context: Option<&DispatchContext>,
109 ) -> Result<SchedulerDecision, KernelError> {
110 let now = Utc::now();
111
112 if let Some(cb) = &self.circuit_breaker {
114 if cb.is_open() {
115 return Ok(SchedulerDecision::Backpressure {
116 reason: RejectionReason::capacity_limit("circuit breaker open"),
117 queue_depth: 0,
118 });
119 }
120 }
121
122 let candidates: Vec<AttemptDispatchRecord> = self
123 .repository
124 .list_dispatchable_attempts(now, DISPATCH_SCAN_LIMIT)?;
125
126 if let Some(limit) = context.and_then(|c| c.max_queue_depth) {
128 if candidates.len() >= limit {
129 return Ok(SchedulerDecision::Backpressure {
130 reason: RejectionReason::capacity_limit(format!(
131 "queue depth {} >= limit {}",
132 candidates.len(),
133 limit
134 )),
135 queue_depth: candidates.len(),
136 });
137 }
138 }
139
140 let lease_expires_at = now + chrono::Duration::seconds(30);
141
142 for candidate in candidates {
143 if let Err(e) =
144 self.repository
145 .upsert_lease(&candidate.attempt_id, worker_id, lease_expires_at)
146 {
147 let msg = e.to_string();
148 if msg.contains("active lease already exists") || msg.contains("not dispatchable") {
149 continue;
150 }
151 return Err(e);
152 }
153
154 return Ok(SchedulerDecision::Dispatched {
155 attempt_id: candidate.attempt_id,
156 worker_id: worker_id.to_string(),
157 });
158 }
159
160 Ok(SchedulerDecision::Noop)
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use std::collections::HashSet;
167 use std::sync::{Arc, Mutex};
168
169 use chrono::{DateTime, Utc};
170
171 use super::*;
172 use oris_kernel::identity::{RunId, Seq};
173
174 use super::super::models::{AttemptExecutionStatus, LeaseRecord};
175
176 #[derive(Clone)]
177 struct FakeRepository {
178 attempts: Vec<AttemptDispatchRecord>,
179 conflict_attempts: Arc<Mutex<HashSet<String>>>,
180 claimed_attempts: Arc<Mutex<Vec<String>>>,
181 }
182
183 impl FakeRepository {
184 fn new(attempts: Vec<AttemptDispatchRecord>, conflict_attempts: &[&str]) -> Self {
185 Self {
186 attempts,
187 conflict_attempts: Arc::new(Mutex::new(
188 conflict_attempts.iter().map(|s| (*s).to_string()).collect(),
189 )),
190 claimed_attempts: Arc::new(Mutex::new(Vec::new())),
191 }
192 }
193 }
194
195 impl RuntimeRepository for FakeRepository {
196 fn list_dispatchable_attempts(
197 &self,
198 _now: DateTime<Utc>,
199 _limit: usize,
200 ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
201 Ok(self.attempts.clone())
202 }
203
204 fn upsert_lease(
205 &self,
206 attempt_id: &str,
207 worker_id: &str,
208 lease_expires_at: DateTime<Utc>,
209 ) -> Result<LeaseRecord, KernelError> {
210 if self
211 .conflict_attempts
212 .lock()
213 .expect("conflict lock")
214 .contains(attempt_id)
215 {
216 return Err(KernelError::Driver(format!(
217 "active lease already exists for attempt: {}",
218 attempt_id
219 )));
220 }
221 self.claimed_attempts
222 .lock()
223 .expect("claimed lock")
224 .push(attempt_id.to_string());
225 Ok(LeaseRecord {
226 lease_id: format!("lease-{}", attempt_id),
227 attempt_id: attempt_id.to_string(),
228 worker_id: worker_id.to_string(),
229 lease_expires_at,
230 heartbeat_at: Utc::now(),
231 version: 1,
232 })
233 }
234
235 fn heartbeat_lease(
236 &self,
237 _lease_id: &str,
238 _heartbeat_at: DateTime<Utc>,
239 _lease_expires_at: DateTime<Utc>,
240 ) -> Result<(), KernelError> {
241 Ok(())
242 }
243
244 fn expire_leases_and_requeue(
245 &self,
246 _stale_before: DateTime<Utc>,
247 ) -> Result<u64, KernelError> {
248 Ok(0)
249 }
250
251 fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
252 Ok(0)
253 }
254
255 fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
256 Ok(())
257 }
258 fn get_bounty(
259 &self,
260 _: &str,
261 ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
262 Ok(None)
263 }
264 fn list_bounties(
265 &self,
266 _: Option<&str>,
267 _: usize,
268 ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
269 Ok(vec![])
270 }
271 fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
272 Ok(())
273 }
274 fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
275 Ok(())
276 }
277 fn upsert_swarm_decomposition(
278 &self,
279 _: &super::super::models::SwarmTaskRecord,
280 ) -> Result<(), KernelError> {
281 Ok(())
282 }
283 fn get_swarm_decomposition(
284 &self,
285 _: &str,
286 ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
287 Ok(None)
288 }
289 fn register_worker(
290 &self,
291 _: &super::super::models::WorkerRecord,
292 ) -> Result<(), KernelError> {
293 Ok(())
294 }
295 fn get_worker(
296 &self,
297 _: &str,
298 ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
299 Ok(None)
300 }
301 fn list_workers(
302 &self,
303 _: Option<&str>,
304 _: Option<&str>,
305 _: usize,
306 ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
307 Ok(vec![])
308 }
309 fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
310 Ok(())
311 }
312 fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
313 Ok(())
314 }
315 fn get_recipe(
316 &self,
317 _: &str,
318 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
319 Ok(None)
320 }
321 fn fork_recipe(
322 &self,
323 _: &str,
324 _: &str,
325 _: &str,
326 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
327 Ok(None)
328 }
329 fn list_recipes(
330 &self,
331 _: Option<&str>,
332 _: usize,
333 ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
334 Ok(vec![])
335 }
336 fn express_organism(
337 &self,
338 _: &super::super::models::OrganismRecord,
339 ) -> Result<(), KernelError> {
340 Ok(())
341 }
342 fn get_organism(
343 &self,
344 _: &str,
345 ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
346 Ok(None)
347 }
348 fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
349 Ok(())
350 }
351 fn create_session(
352 &self,
353 _: &super::super::models::SessionRecord,
354 ) -> Result<(), KernelError> {
355 Ok(())
356 }
357 fn get_session(
358 &self,
359 _: &str,
360 ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
361 Ok(None)
362 }
363 fn add_session_message(
364 &self,
365 _: &super::super::models::SessionMessageRecord,
366 ) -> Result<(), KernelError> {
367 Ok(())
368 }
369 fn get_session_history(
370 &self,
371 _: &str,
372 _: usize,
373 ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
374 Ok(vec![])
375 }
376 fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
377 Ok(())
378 }
379 fn get_dispute(
380 &self,
381 _: &str,
382 ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
383 Ok(None)
384 }
385 fn get_disputes_for_bounty(
386 &self,
387 _: &str,
388 ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
389 Ok(vec![])
390 }
391 fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
392 Ok(())
393 }
394 }
395
396 fn attempt(id: &str, attempt_no: u32) -> AttemptDispatchRecord {
397 AttemptDispatchRecord {
398 attempt_id: id.to_string(),
399 run_id: "run-scheduler-test".to_string(),
400 attempt_no,
401 status: AttemptExecutionStatus::Queued,
402 retry_at: None,
403 }
404 }
405
406 #[test]
407 fn dispatch_one_skips_conflicted_candidate_and_preserves_order() {
408 let repo = FakeRepository::new(
409 vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
410 &["attempt-a"],
411 );
412 let scheduler = SkeletonScheduler::new(repo.clone());
413
414 let decision = scheduler
415 .dispatch_one("worker-scheduler")
416 .expect("dispatch should succeed");
417
418 match decision {
419 SchedulerDecision::Dispatched {
420 attempt_id,
421 worker_id,
422 } => {
423 assert_eq!(attempt_id, "attempt-b");
424 assert_eq!(worker_id, "worker-scheduler");
425 }
426 SchedulerDecision::Noop | SchedulerDecision::Backpressure { .. } => {
427 panic!("expected a dispatch")
428 }
429 }
430
431 let claimed = repo.claimed_attempts.lock().expect("claimed lock");
432 assert_eq!(claimed.as_slice(), ["attempt-b"]);
433 }
434
435 #[test]
436 fn dispatch_one_returns_noop_when_all_candidates_conflict() {
437 let repo = FakeRepository::new(
438 vec![attempt("attempt-a", 1), attempt("attempt-b", 2)],
439 &["attempt-a", "attempt-b"],
440 );
441 let scheduler = SkeletonScheduler::new(repo);
442
443 let decision = scheduler
444 .dispatch_one("worker-scheduler")
445 .expect("conflicts should not surface as hard errors");
446
447 assert!(matches!(decision, SchedulerDecision::Noop));
448 }
449
450 #[test]
451 fn dispatch_one_with_context_none_same_as_dispatch_one() {
452 let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
453 let scheduler = SkeletonScheduler::new(repo.clone());
454
455 let with_ctx = scheduler
456 .dispatch_one_with_context("worker-1", None)
457 .expect("dispatch should succeed");
458 let without = scheduler
459 .dispatch_one("worker-1")
460 .expect("dispatch should succeed");
461
462 match (&with_ctx, &without) {
463 (
464 SchedulerDecision::Dispatched { attempt_id: a1, .. },
465 SchedulerDecision::Dispatched { attempt_id: a2, .. },
466 ) => assert_eq!(a1, a2),
467 _ => panic!("expected both dispatched"),
468 }
469 }
470
471 #[test]
472 fn dispatch_context_builder() {
473 let ctx = DispatchContext::new()
474 .with_tenant("tenant-1")
475 .with_priority(5);
476 assert_eq!(ctx.tenant_id.as_deref(), Some("tenant-1"));
477 assert_eq!(ctx.priority, Some(5));
478 }
479
480 #[test]
481 fn dispatch_one_with_context_applies_backpressure_when_queue_exceeds_limit() {
482 let repo = FakeRepository::new(
484 vec![
485 attempt("attempt-a", 1),
486 attempt("attempt-b", 2),
487 attempt("attempt-c", 3),
488 ],
489 &[],
490 );
491 let scheduler = SkeletonScheduler::new(repo);
492 let ctx = DispatchContext::new().with_max_queue_depth(2);
493
494 let decision = scheduler
495 .dispatch_one_with_context("worker-1", Some(&ctx))
496 .expect("should not error on backpressure");
497
498 match decision {
499 SchedulerDecision::Backpressure { queue_depth, .. } => {
500 assert_eq!(queue_depth, 3);
501 }
502 other => panic!("expected Backpressure, got {:?}", other),
503 }
504 }
505
506 #[test]
507 fn dispatch_one_with_context_dispatches_when_queue_below_limit() {
508 let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
510 let scheduler = SkeletonScheduler::new(repo);
511 let ctx = DispatchContext::new().with_max_queue_depth(5);
512
513 let decision = scheduler
514 .dispatch_one_with_context("worker-1", Some(&ctx))
515 .expect("dispatch should succeed");
516
517 assert!(
518 matches!(decision, SchedulerDecision::Dispatched { .. }),
519 "expected Dispatched, got {:?}",
520 decision
521 );
522 }
523
524 #[test]
525 fn open_circuit_breaker_returns_backpressure() {
526 use crate::circuit_breaker::CircuitBreaker;
527 use std::sync::Arc;
528
529 let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
530 let breaker = Arc::new(CircuitBreaker::new(30));
531 breaker.trip();
532
533 let scheduler = SkeletonScheduler::new(repo).with_circuit_breaker(breaker);
534
535 let decision = scheduler
536 .dispatch_one("worker-1")
537 .expect("should not error");
538
539 match decision {
540 SchedulerDecision::Backpressure { reason, .. } => {
541 let msg = format!("{:?}", reason);
542 assert!(
543 msg.contains("circuit breaker open"),
544 "unexpected reason: {}",
545 msg
546 );
547 }
548 other => panic!("expected Backpressure, got {:?}", other),
549 }
550 }
551
552 #[test]
553 fn closed_circuit_breaker_allows_dispatch() {
554 use crate::circuit_breaker::CircuitBreaker;
555 use std::sync::Arc;
556
557 let repo = FakeRepository::new(vec![attempt("attempt-a", 1)], &[]);
558 let breaker = Arc::new(CircuitBreaker::new(30)); let scheduler = SkeletonScheduler::new(repo).with_circuit_breaker(breaker);
561
562 let decision = scheduler
563 .dispatch_one("worker-1")
564 .expect("dispatch should succeed");
565
566 assert!(
567 matches!(decision, SchedulerDecision::Dispatched { .. }),
568 "expected Dispatched, got {:?}",
569 decision
570 );
571 }
572}