1use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11use crate::circuit_breaker::CircuitBreaker;
12
13use chrono::{DateTime, Duration, Utc};
14
15use oris_kernel::event::KernelError;
16
17use super::models::LeaseRecord;
18use super::repository::RuntimeRepository;
19
20#[derive(Clone, Debug)]
22pub struct WorkerLease {
23 record: LeaseRecord,
24}
25
26impl WorkerLease {
27 pub fn from_record(record: LeaseRecord) -> Self {
29 Self { record }
30 }
31
32 pub fn record(&self) -> &LeaseRecord {
34 &self.record
35 }
36
37 pub fn lease_id(&self) -> &str {
38 &self.record.lease_id
39 }
40
41 pub fn attempt_id(&self) -> &str {
42 &self.record.attempt_id
43 }
44
45 pub fn worker_id(&self) -> &str {
46 &self.record.worker_id
47 }
48
49 pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
51 now >= self.record.lease_expires_at
52 }
53
54 pub fn verify_owner(&self, worker_id: &str) -> Result<(), KernelError> {
56 if self.record.worker_id != worker_id {
57 return Err(KernelError::Driver(format!(
58 "lease {} is owned by {}, not {}",
59 self.record.lease_id, self.record.worker_id, worker_id
60 )));
61 }
62 Ok(())
63 }
64
65 pub fn check_execution_allowed(
67 &self,
68 worker_id: &str,
69 now: DateTime<Utc>,
70 ) -> Result<(), KernelError> {
71 self.verify_owner(worker_id)?;
72 if self.is_expired(now) {
73 return Err(KernelError::Driver(format!(
74 "lease {} expired at {}",
75 self.record.lease_id, self.record.lease_expires_at
76 )));
77 }
78 Ok(())
79 }
80}
81
82#[derive(Clone, Debug)]
84pub struct LeaseConfig {
85 pub lease_ttl: Duration,
86 pub heartbeat_grace: Duration,
87}
88
89impl Default for LeaseConfig {
90 fn default() -> Self {
91 Self {
92 lease_ttl: Duration::seconds(30),
93 heartbeat_grace: Duration::seconds(5),
94 }
95 }
96}
97
98#[derive(Clone, Debug, Default)]
100pub struct LeaseTickResult {
101 pub timed_out: u64,
102 pub expired_requeued: u64,
103}
104
105pub trait LeaseManager: Send + Sync {
107 fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError>;
108}
109
110pub struct RepositoryLeaseManager<R: RuntimeRepository> {
112 repository: R,
113 config: LeaseConfig,
114}
115
116impl<R: RuntimeRepository> RepositoryLeaseManager<R> {
117 pub fn new(repository: R, config: LeaseConfig) -> Self {
118 Self { repository, config }
119 }
120}
121
122impl<R: RuntimeRepository> LeaseManager for RepositoryLeaseManager<R> {
123 fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError> {
124 let stale_before = now - self.config.heartbeat_grace;
125 let timed_out = self.repository.transition_timed_out_attempts(now)?;
126 let expired = self.repository.expire_leases_and_requeue(stale_before)?;
127 Ok(LeaseTickResult {
128 timed_out,
129 expired_requeued: expired,
130 })
131 }
132}
133
134#[derive(Clone, Debug, Default)]
140pub struct WorkerHealth {
141 pub lease_expiry_count: u64,
143 pub last_heartbeat_ms: Option<i64>,
145 pub quarantined: bool,
147}
148
149#[derive(Clone, Default)]
159pub struct WorkerHealthTracker {
160 inner: Arc<Mutex<HashMap<String, WorkerHealth>>>,
161 quarantine_threshold: u64,
164 circuit_breaker: Option<Arc<CircuitBreaker>>,
166}
167
168impl WorkerHealthTracker {
169 pub fn new(quarantine_threshold: u64) -> Self {
170 Self {
171 inner: Arc::new(Mutex::new(HashMap::new())),
172 quarantine_threshold,
173 circuit_breaker: None,
174 }
175 }
176
177 pub fn with_circuit_breaker(mut self, breaker: Arc<CircuitBreaker>) -> Self {
181 self.circuit_breaker = Some(breaker);
182 self
183 }
184
185 pub fn record_expiry(&self, worker_id: &str) -> bool {
190 let just_quarantined = {
191 let mut map = self.inner.lock().expect("worker health lock poisoned");
192 let entry = map.entry(worker_id.to_string()).or_default();
193 entry.lease_expiry_count += 1;
194 if !entry.quarantined && entry.lease_expiry_count >= self.quarantine_threshold {
195 entry.quarantined = true;
196 true
197 } else {
198 false
199 }
200 };
201 if just_quarantined {
202 if let Some(cb) = &self.circuit_breaker {
203 cb.trip();
204 }
205 }
206 just_quarantined
207 }
208
209 pub fn record_success(&self, worker_id: &str) {
213 {
214 let mut map = self.inner.lock().expect("worker health lock poisoned");
215 let entry = map.entry(worker_id.to_string()).or_default();
216 entry.lease_expiry_count = 0;
217 entry.quarantined = false;
218 }
219 if let Some(cb) = &self.circuit_breaker {
220 cb.record_success();
221 }
222 }
223
224 pub fn record_heartbeat(&self, worker_id: &str, heartbeat_ms: i64) {
226 let mut map = self.inner.lock().expect("worker health lock poisoned");
227 let entry = map.entry(worker_id.to_string()).or_default();
228 entry.last_heartbeat_ms = Some(heartbeat_ms);
229 entry.lease_expiry_count = 0;
231 entry.quarantined = false;
232 }
233
234 pub fn is_quarantined(&self, worker_id: &str) -> bool {
236 self.inner
237 .lock()
238 .expect("worker health lock poisoned")
239 .get(worker_id)
240 .map(|h| h.quarantined)
241 .unwrap_or(false)
242 }
243
244 pub fn get(&self, worker_id: &str) -> Option<WorkerHealth> {
246 self.inner
247 .lock()
248 .expect("worker health lock poisoned")
249 .get(worker_id)
250 .cloned()
251 }
252
253 pub fn clear_quarantine(&self, worker_id: &str) {
255 let mut map = self.inner.lock().expect("worker health lock poisoned");
256 if let Some(entry) = map.get_mut(worker_id) {
257 entry.quarantined = false;
258 entry.lease_expiry_count = 0;
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use std::sync::{Arc, Mutex};
266
267 use super::*;
268 use oris_kernel::identity::{RunId, Seq};
269
270 use super::super::models::{AttemptDispatchRecord, LeaseRecord};
271
272 #[derive(Clone)]
273 struct FakeRepository {
274 timed_out: u64,
275 expired: u64,
276 seen_cutoff: Arc<Mutex<Option<DateTime<Utc>>>>,
277 }
278
279 impl FakeRepository {
280 fn new(timed_out: u64, expired: u64) -> Self {
281 Self {
282 timed_out,
283 expired,
284 seen_cutoff: Arc::new(Mutex::new(None)),
285 }
286 }
287 }
288
289 impl RuntimeRepository for FakeRepository {
290 fn list_dispatchable_attempts(
291 &self,
292 _now: DateTime<Utc>,
293 _limit: usize,
294 ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
295 Ok(Vec::new())
296 }
297
298 fn upsert_lease(
299 &self,
300 _attempt_id: &str,
301 _worker_id: &str,
302 lease_expires_at: DateTime<Utc>,
303 ) -> Result<LeaseRecord, KernelError> {
304 Ok(LeaseRecord {
305 lease_id: "lease-test".to_string(),
306 attempt_id: "attempt-test".to_string(),
307 worker_id: "worker-test".to_string(),
308 lease_expires_at,
309 heartbeat_at: Utc::now(),
310 version: 1,
311 })
312 }
313
314 fn heartbeat_lease(
315 &self,
316 _lease_id: &str,
317 _heartbeat_at: DateTime<Utc>,
318 _lease_expires_at: DateTime<Utc>,
319 ) -> Result<(), KernelError> {
320 Ok(())
321 }
322
323 fn expire_leases_and_requeue(
324 &self,
325 stale_before: DateTime<Utc>,
326 ) -> Result<u64, KernelError> {
327 *self.seen_cutoff.lock().expect("cutoff lock") = Some(stale_before);
328 Ok(self.expired)
329 }
330
331 fn transition_timed_out_attempts(&self, _now: DateTime<Utc>) -> Result<u64, KernelError> {
332 Ok(self.timed_out)
333 }
334
335 fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
336 Ok(0)
337 }
338
339 fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
340 Ok(())
341 }
342 fn get_bounty(
343 &self,
344 _: &str,
345 ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
346 Ok(None)
347 }
348 fn list_bounties(
349 &self,
350 _: Option<&str>,
351 _: usize,
352 ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
353 Ok(vec![])
354 }
355 fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
356 Ok(())
357 }
358 fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
359 Ok(())
360 }
361 fn upsert_swarm_decomposition(
362 &self,
363 _: &super::super::models::SwarmTaskRecord,
364 ) -> Result<(), KernelError> {
365 Ok(())
366 }
367 fn get_swarm_decomposition(
368 &self,
369 _: &str,
370 ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
371 Ok(None)
372 }
373 fn register_worker(
374 &self,
375 _: &super::super::models::WorkerRecord,
376 ) -> Result<(), KernelError> {
377 Ok(())
378 }
379 fn get_worker(
380 &self,
381 _: &str,
382 ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
383 Ok(None)
384 }
385 fn list_workers(
386 &self,
387 _: Option<&str>,
388 _: Option<&str>,
389 _: usize,
390 ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
391 Ok(vec![])
392 }
393 fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
394 Ok(())
395 }
396 fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
397 Ok(())
398 }
399 fn get_recipe(
400 &self,
401 _: &str,
402 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
403 Ok(None)
404 }
405 fn fork_recipe(
406 &self,
407 _: &str,
408 _: &str,
409 _: &str,
410 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
411 Ok(None)
412 }
413 fn list_recipes(
414 &self,
415 _: Option<&str>,
416 _: usize,
417 ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
418 Ok(vec![])
419 }
420 fn express_organism(
421 &self,
422 _: &super::super::models::OrganismRecord,
423 ) -> Result<(), KernelError> {
424 Ok(())
425 }
426 fn get_organism(
427 &self,
428 _: &str,
429 ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
430 Ok(None)
431 }
432 fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
433 Ok(())
434 }
435 fn create_session(
436 &self,
437 _: &super::super::models::SessionRecord,
438 ) -> Result<(), KernelError> {
439 Ok(())
440 }
441 fn get_session(
442 &self,
443 _: &str,
444 ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
445 Ok(None)
446 }
447 fn add_session_message(
448 &self,
449 _: &super::super::models::SessionMessageRecord,
450 ) -> Result<(), KernelError> {
451 Ok(())
452 }
453 fn get_session_history(
454 &self,
455 _: &str,
456 _: usize,
457 ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
458 Ok(vec![])
459 }
460 fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
461 Ok(())
462 }
463 fn get_dispute(
464 &self,
465 _: &str,
466 ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
467 Ok(None)
468 }
469 fn get_disputes_for_bounty(
470 &self,
471 _: &str,
472 ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
473 Ok(vec![])
474 }
475 fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
476 Ok(())
477 }
478 }
479
480 #[test]
481 fn worker_lease_verify_owner_accepts_owner() {
482 let record = LeaseRecord {
483 lease_id: "L1".to_string(),
484 attempt_id: "A1".to_string(),
485 worker_id: "W1".to_string(),
486 lease_expires_at: Utc::now() + Duration::seconds(60),
487 heartbeat_at: Utc::now(),
488 version: 1,
489 };
490 let lease = WorkerLease::from_record(record);
491 assert!(lease.verify_owner("W1").is_ok());
492 assert!(lease.verify_owner("W2").is_err());
493 }
494
495 #[test]
496 fn worker_lease_is_expired() {
497 let now = Utc::now();
498 let record = LeaseRecord {
499 lease_id: "L1".to_string(),
500 attempt_id: "A1".to_string(),
501 worker_id: "W1".to_string(),
502 lease_expires_at: now - Duration::seconds(1),
503 heartbeat_at: now - Duration::seconds(2),
504 version: 1,
505 };
506 let lease = WorkerLease::from_record(record);
507 assert!(lease.is_expired(now));
508 assert!(!lease.is_expired(now - Duration::seconds(2)));
509 }
510
511 #[test]
512 fn worker_lease_check_execution_allowed() {
513 let now = Utc::now();
514 let record = LeaseRecord {
515 lease_id: "L1".to_string(),
516 attempt_id: "A1".to_string(),
517 worker_id: "W1".to_string(),
518 lease_expires_at: now + Duration::seconds(10),
519 heartbeat_at: now,
520 version: 1,
521 };
522 let lease = WorkerLease::from_record(record);
523 assert!(lease.check_execution_allowed("W1", now).is_ok());
524 assert!(lease.check_execution_allowed("W2", now).is_err());
525 assert!(lease
526 .check_execution_allowed("W1", now + Duration::seconds(11))
527 .is_err());
528 }
529
530 #[test]
531 fn tick_applies_heartbeat_grace_before_requeueing() {
532 let repo = FakeRepository::new(2, 3);
533 let config = LeaseConfig {
534 lease_ttl: Duration::seconds(30),
535 heartbeat_grace: Duration::seconds(7),
536 };
537 let manager = RepositoryLeaseManager::new(repo.clone(), config);
538 let now = Utc::now();
539
540 let result = manager.tick(now).expect("tick succeeds");
541
542 assert_eq!(result.timed_out, 2);
543 assert_eq!(result.expired_requeued, 3);
544 let seen_cutoff = repo
545 .seen_cutoff
546 .lock()
547 .expect("cutoff lock")
548 .expect("cutoff recorded");
549 assert_eq!(seen_cutoff, now - Duration::seconds(7));
550 }
551
552 #[test]
557 fn worker_health_tracker_quarantines_after_threshold() {
558 let tracker = WorkerHealthTracker::new(3);
559 assert!(!tracker.is_quarantined("w1"));
560
561 let just_quarantined = tracker.record_expiry("w1");
562 assert!(!just_quarantined); tracker.record_expiry("w1"); let just_quarantined = tracker.record_expiry("w1"); assert!(just_quarantined);
566 assert!(tracker.is_quarantined("w1"));
567 }
568
569 #[test]
570 fn worker_health_tracker_heartbeat_clears_quarantine() {
571 let tracker = WorkerHealthTracker::new(2);
572 tracker.record_expiry("w1");
573 tracker.record_expiry("w1");
574 assert!(tracker.is_quarantined("w1"));
575
576 tracker.record_heartbeat("w1", 1_700_000_000_000);
577 assert!(!tracker.is_quarantined("w1"));
578
579 let health = tracker.get("w1").expect("health record exists");
580 assert_eq!(health.lease_expiry_count, 0);
581 assert_eq!(health.last_heartbeat_ms, Some(1_700_000_000_000));
582 }
583
584 #[test]
585 fn worker_health_tracker_clear_quarantine_explicit() {
586 let tracker = WorkerHealthTracker::new(1);
587 tracker.record_expiry("w1");
588 assert!(tracker.is_quarantined("w1"));
589
590 tracker.clear_quarantine("w1");
591 assert!(!tracker.is_quarantined("w1"));
592 assert_eq!(tracker.get("w1").map(|h| h.lease_expiry_count), Some(0));
593 }
594
595 #[test]
596 fn worker_health_tracker_unknown_worker_not_quarantined() {
597 let tracker = WorkerHealthTracker::new(3);
598 assert!(!tracker.is_quarantined("unknown-worker"));
599 assert!(tracker.get("unknown-worker").is_none());
600 }
601}