Skip to main content

oris_execution_runtime/
lease.rs

1//! Lease management for Phase 1: single-owner execution, expiry, and recovery.
2//!
3//! [WorkerLease] wraps [super::models::LeaseRecord] to strictly enforce single-owner
4//! execution: call [WorkerLease::verify_owner] and [WorkerLease::is_expired] before
5//! running work. Lease expiry and recovery are handled by [LeaseManager::tick]
6//! (expire stale leases, requeue attempts); replay-restart is re-dispatch after requeue.
7
8use std::collections::HashMap;
9use std::sync::{Arc, Mutex};
10
11use crate::circuit_breaker::CircuitBreaker;
12
13use chrono::{DateTime, Duration, Utc};
14
15use oris_kernel::event::KernelError;
16
17use super::models::LeaseRecord;
18use super::repository::RuntimeRepository;
19
20/// Strict single-owner execution guard for a lease. Verify ownership and expiry before executing.
21#[derive(Clone, Debug)]
22pub struct WorkerLease {
23    record: LeaseRecord,
24}
25
26impl WorkerLease {
27    /// Build a worker lease from a repository lease record (e.g. from `get_lease_for_attempt`).
28    pub fn from_record(record: LeaseRecord) -> Self {
29        Self { record }
30    }
31
32    /// Lease record for heartbeat or persistence.
33    pub fn record(&self) -> &LeaseRecord {
34        &self.record
35    }
36
37    pub fn lease_id(&self) -> &str {
38        &self.record.lease_id
39    }
40
41    pub fn attempt_id(&self) -> &str {
42        &self.record.attempt_id
43    }
44
45    pub fn worker_id(&self) -> &str {
46        &self.record.worker_id
47    }
48
49    /// Returns true if the lease has passed its expiry time (no heartbeat grace here).
50    pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
51        now >= self.record.lease_expires_at
52    }
53
54    /// Enforce single-owner: returns `Ok(())` only if `worker_id` matches the lease owner.
55    pub fn verify_owner(&self, worker_id: &str) -> Result<(), KernelError> {
56        if self.record.worker_id != worker_id {
57            return Err(KernelError::Driver(format!(
58                "lease {} is owned by {}, not {}",
59                self.record.lease_id, self.record.worker_id, worker_id
60            )));
61        }
62        Ok(())
63    }
64
65    /// Returns `Ok(())` if the given worker owns the lease and it is not yet expired.
66    pub fn check_execution_allowed(
67        &self,
68        worker_id: &str,
69        now: DateTime<Utc>,
70    ) -> Result<(), KernelError> {
71        self.verify_owner(worker_id)?;
72        if self.is_expired(now) {
73            return Err(KernelError::Driver(format!(
74                "lease {} expired at {}",
75                self.record.lease_id, self.record.lease_expires_at
76            )));
77        }
78        Ok(())
79    }
80}
81
82/// Lease behavior tuning knobs for scheduler/data-plane coordination.
83#[derive(Clone, Debug)]
84pub struct LeaseConfig {
85    pub lease_ttl: Duration,
86    pub heartbeat_grace: Duration,
87}
88
89impl Default for LeaseConfig {
90    fn default() -> Self {
91        Self {
92            lease_ttl: Duration::seconds(30),
93            heartbeat_grace: Duration::seconds(5),
94        }
95    }
96}
97
98/// Result of a periodic lease tick.
99#[derive(Clone, Debug, Default)]
100pub struct LeaseTickResult {
101    pub timed_out: u64,
102    pub expired_requeued: u64,
103}
104
105/// Lease manager abstraction.
106pub trait LeaseManager: Send + Sync {
107    fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError>;
108}
109
110/// Skeleton lease manager using `RuntimeRepository`.
111pub struct RepositoryLeaseManager<R: RuntimeRepository> {
112    repository: R,
113    config: LeaseConfig,
114}
115
116impl<R: RuntimeRepository> RepositoryLeaseManager<R> {
117    pub fn new(repository: R, config: LeaseConfig) -> Self {
118        Self { repository, config }
119    }
120}
121
122impl<R: RuntimeRepository> LeaseManager for RepositoryLeaseManager<R> {
123    fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError> {
124        let stale_before = now - self.config.heartbeat_grace;
125        let timed_out = self.repository.transition_timed_out_attempts(now)?;
126        let expired = self.repository.expire_leases_and_requeue(stale_before)?;
127        Ok(LeaseTickResult {
128            timed_out,
129            expired_requeued: expired,
130        })
131    }
132}
133
134// ---------------------------------------------------------------------------
135// WorkerHealthTracker — per-worker lease-expiry counter
136// ---------------------------------------------------------------------------
137
138/// Per-worker health statistics maintained by the scheduler or control plane.
139#[derive(Clone, Debug, Default)]
140pub struct WorkerHealth {
141    /// Total number of times this worker's leases have been expired and requeued.
142    pub lease_expiry_count: u64,
143    /// Last time a heartbeat was observed from this worker (epoch ms).
144    pub last_heartbeat_ms: Option<i64>,
145    /// Whether this worker is currently quarantined (too many consecutive expirations).
146    pub quarantined: bool,
147}
148
149/// Thread-safe tracker that records per-worker health statistics.
150///
151/// When `lease_expiry_count` for a worker reaches `quarantine_threshold`, the
152/// worker is marked `quarantined = true`. A quarantined worker should not
153/// receive new dispatch leases until it is explicitly cleared.
154///
155/// An optional [`CircuitBreaker`] can be wired in via
156/// [`WorkerHealthTracker::with_circuit_breaker`]. When a worker is quarantined,
157/// the breaker is tripped automatically.
158#[derive(Clone, Default)]
159pub struct WorkerHealthTracker {
160    inner: Arc<Mutex<HashMap<String, WorkerHealth>>>,
161    /// Number of consecutive lease expirations before a worker is quarantined.
162    /// Defaults to 5.
163    quarantine_threshold: u64,
164    /// Optional circuit breaker to trip when a worker is quarantined.
165    circuit_breaker: Option<Arc<CircuitBreaker>>,
166}
167
168impl WorkerHealthTracker {
169    pub fn new(quarantine_threshold: u64) -> Self {
170        Self {
171            inner: Arc::new(Mutex::new(HashMap::new())),
172            quarantine_threshold,
173            circuit_breaker: None,
174        }
175    }
176
177    /// Attach a shared circuit breaker to this tracker.
178    ///
179    /// The breaker will be tripped whenever a worker is quarantined.
180    pub fn with_circuit_breaker(mut self, breaker: Arc<CircuitBreaker>) -> Self {
181        self.circuit_breaker = Some(breaker);
182        self
183    }
184
185    /// Record one lease expiry for `worker_id`.
186    /// Returns `true` if the worker was just quarantined by this call.
187    ///
188    /// If a circuit breaker is attached, it is tripped when quarantine occurs.
189    pub fn record_expiry(&self, worker_id: &str) -> bool {
190        let just_quarantined = {
191            let mut map = self.inner.lock().expect("worker health lock poisoned");
192            let entry = map.entry(worker_id.to_string()).or_default();
193            entry.lease_expiry_count += 1;
194            if !entry.quarantined && entry.lease_expiry_count >= self.quarantine_threshold {
195                entry.quarantined = true;
196                true
197            } else {
198                false
199            }
200        };
201        if just_quarantined {
202            if let Some(cb) = &self.circuit_breaker {
203                cb.trip();
204            }
205        }
206        just_quarantined
207    }
208
209    /// Record a successful dispatch acknowledgement for `worker_id`.
210    ///
211    /// Clears quarantine, resets expiry counter, and resets the circuit breaker to `Closed`.
212    pub fn record_success(&self, worker_id: &str) {
213        {
214            let mut map = self.inner.lock().expect("worker health lock poisoned");
215            let entry = map.entry(worker_id.to_string()).or_default();
216            entry.lease_expiry_count = 0;
217            entry.quarantined = false;
218        }
219        if let Some(cb) = &self.circuit_breaker {
220            cb.record_success();
221        }
222    }
223
224    /// Record a heartbeat for `worker_id` and clear quarantine if set.
225    pub fn record_heartbeat(&self, worker_id: &str, heartbeat_ms: i64) {
226        let mut map = self.inner.lock().expect("worker health lock poisoned");
227        let entry = map.entry(worker_id.to_string()).or_default();
228        entry.last_heartbeat_ms = Some(heartbeat_ms);
229        // A successful heartbeat resets the expiry counter and lifts quarantine.
230        entry.lease_expiry_count = 0;
231        entry.quarantined = false;
232    }
233
234    /// Returns `true` if the worker is currently quarantined.
235    pub fn is_quarantined(&self, worker_id: &str) -> bool {
236        self.inner
237            .lock()
238            .expect("worker health lock poisoned")
239            .get(worker_id)
240            .map(|h| h.quarantined)
241            .unwrap_or(false)
242    }
243
244    /// Snapshot the health record for `worker_id`, or `None` if unknown.
245    pub fn get(&self, worker_id: &str) -> Option<WorkerHealth> {
246        self.inner
247            .lock()
248            .expect("worker health lock poisoned")
249            .get(worker_id)
250            .cloned()
251    }
252
253    /// Clear quarantine and reset expiry counter for `worker_id`.
254    pub fn clear_quarantine(&self, worker_id: &str) {
255        let mut map = self.inner.lock().expect("worker health lock poisoned");
256        if let Some(entry) = map.get_mut(worker_id) {
257            entry.quarantined = false;
258            entry.lease_expiry_count = 0;
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use std::sync::{Arc, Mutex};
266
267    use super::*;
268    use oris_kernel::identity::{RunId, Seq};
269
270    use super::super::models::{AttemptDispatchRecord, LeaseRecord};
271
272    #[derive(Clone)]
273    struct FakeRepository {
274        timed_out: u64,
275        expired: u64,
276        seen_cutoff: Arc<Mutex<Option<DateTime<Utc>>>>,
277    }
278
279    impl FakeRepository {
280        fn new(timed_out: u64, expired: u64) -> Self {
281            Self {
282                timed_out,
283                expired,
284                seen_cutoff: Arc::new(Mutex::new(None)),
285            }
286        }
287    }
288
289    impl RuntimeRepository for FakeRepository {
290        fn list_dispatchable_attempts(
291            &self,
292            _now: DateTime<Utc>,
293            _limit: usize,
294        ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
295            Ok(Vec::new())
296        }
297
298        fn upsert_lease(
299            &self,
300            _attempt_id: &str,
301            _worker_id: &str,
302            lease_expires_at: DateTime<Utc>,
303        ) -> Result<LeaseRecord, KernelError> {
304            Ok(LeaseRecord {
305                lease_id: "lease-test".to_string(),
306                attempt_id: "attempt-test".to_string(),
307                worker_id: "worker-test".to_string(),
308                lease_expires_at,
309                heartbeat_at: Utc::now(),
310                version: 1,
311            })
312        }
313
314        fn heartbeat_lease(
315            &self,
316            _lease_id: &str,
317            _heartbeat_at: DateTime<Utc>,
318            _lease_expires_at: DateTime<Utc>,
319        ) -> Result<(), KernelError> {
320            Ok(())
321        }
322
323        fn expire_leases_and_requeue(
324            &self,
325            stale_before: DateTime<Utc>,
326        ) -> Result<u64, KernelError> {
327            *self.seen_cutoff.lock().expect("cutoff lock") = Some(stale_before);
328            Ok(self.expired)
329        }
330
331        fn transition_timed_out_attempts(&self, _now: DateTime<Utc>) -> Result<u64, KernelError> {
332            Ok(self.timed_out)
333        }
334
335        fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
336            Ok(0)
337        }
338
339        fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
340            Ok(())
341        }
342        fn get_bounty(
343            &self,
344            _: &str,
345        ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
346            Ok(None)
347        }
348        fn list_bounties(
349            &self,
350            _: Option<&str>,
351            _: usize,
352        ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
353            Ok(vec![])
354        }
355        fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
356            Ok(())
357        }
358        fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
359            Ok(())
360        }
361        fn upsert_swarm_decomposition(
362            &self,
363            _: &super::super::models::SwarmTaskRecord,
364        ) -> Result<(), KernelError> {
365            Ok(())
366        }
367        fn get_swarm_decomposition(
368            &self,
369            _: &str,
370        ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
371            Ok(None)
372        }
373        fn register_worker(
374            &self,
375            _: &super::super::models::WorkerRecord,
376        ) -> Result<(), KernelError> {
377            Ok(())
378        }
379        fn get_worker(
380            &self,
381            _: &str,
382        ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
383            Ok(None)
384        }
385        fn list_workers(
386            &self,
387            _: Option<&str>,
388            _: Option<&str>,
389            _: usize,
390        ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
391            Ok(vec![])
392        }
393        fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
394            Ok(())
395        }
396        fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
397            Ok(())
398        }
399        fn get_recipe(
400            &self,
401            _: &str,
402        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
403            Ok(None)
404        }
405        fn fork_recipe(
406            &self,
407            _: &str,
408            _: &str,
409            _: &str,
410        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
411            Ok(None)
412        }
413        fn list_recipes(
414            &self,
415            _: Option<&str>,
416            _: usize,
417        ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
418            Ok(vec![])
419        }
420        fn express_organism(
421            &self,
422            _: &super::super::models::OrganismRecord,
423        ) -> Result<(), KernelError> {
424            Ok(())
425        }
426        fn get_organism(
427            &self,
428            _: &str,
429        ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
430            Ok(None)
431        }
432        fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
433            Ok(())
434        }
435        fn create_session(
436            &self,
437            _: &super::super::models::SessionRecord,
438        ) -> Result<(), KernelError> {
439            Ok(())
440        }
441        fn get_session(
442            &self,
443            _: &str,
444        ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
445            Ok(None)
446        }
447        fn add_session_message(
448            &self,
449            _: &super::super::models::SessionMessageRecord,
450        ) -> Result<(), KernelError> {
451            Ok(())
452        }
453        fn get_session_history(
454            &self,
455            _: &str,
456            _: usize,
457        ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
458            Ok(vec![])
459        }
460        fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
461            Ok(())
462        }
463        fn get_dispute(
464            &self,
465            _: &str,
466        ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
467            Ok(None)
468        }
469        fn get_disputes_for_bounty(
470            &self,
471            _: &str,
472        ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
473            Ok(vec![])
474        }
475        fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
476            Ok(())
477        }
478    }
479
480    #[test]
481    fn worker_lease_verify_owner_accepts_owner() {
482        let record = LeaseRecord {
483            lease_id: "L1".to_string(),
484            attempt_id: "A1".to_string(),
485            worker_id: "W1".to_string(),
486            lease_expires_at: Utc::now() + Duration::seconds(60),
487            heartbeat_at: Utc::now(),
488            version: 1,
489        };
490        let lease = WorkerLease::from_record(record);
491        assert!(lease.verify_owner("W1").is_ok());
492        assert!(lease.verify_owner("W2").is_err());
493    }
494
495    #[test]
496    fn worker_lease_is_expired() {
497        let now = Utc::now();
498        let record = LeaseRecord {
499            lease_id: "L1".to_string(),
500            attempt_id: "A1".to_string(),
501            worker_id: "W1".to_string(),
502            lease_expires_at: now - Duration::seconds(1),
503            heartbeat_at: now - Duration::seconds(2),
504            version: 1,
505        };
506        let lease = WorkerLease::from_record(record);
507        assert!(lease.is_expired(now));
508        assert!(!lease.is_expired(now - Duration::seconds(2)));
509    }
510
511    #[test]
512    fn worker_lease_check_execution_allowed() {
513        let now = Utc::now();
514        let record = LeaseRecord {
515            lease_id: "L1".to_string(),
516            attempt_id: "A1".to_string(),
517            worker_id: "W1".to_string(),
518            lease_expires_at: now + Duration::seconds(10),
519            heartbeat_at: now,
520            version: 1,
521        };
522        let lease = WorkerLease::from_record(record);
523        assert!(lease.check_execution_allowed("W1", now).is_ok());
524        assert!(lease.check_execution_allowed("W2", now).is_err());
525        assert!(lease
526            .check_execution_allowed("W1", now + Duration::seconds(11))
527            .is_err());
528    }
529
530    #[test]
531    fn tick_applies_heartbeat_grace_before_requeueing() {
532        let repo = FakeRepository::new(2, 3);
533        let config = LeaseConfig {
534            lease_ttl: Duration::seconds(30),
535            heartbeat_grace: Duration::seconds(7),
536        };
537        let manager = RepositoryLeaseManager::new(repo.clone(), config);
538        let now = Utc::now();
539
540        let result = manager.tick(now).expect("tick succeeds");
541
542        assert_eq!(result.timed_out, 2);
543        assert_eq!(result.expired_requeued, 3);
544        let seen_cutoff = repo
545            .seen_cutoff
546            .lock()
547            .expect("cutoff lock")
548            .expect("cutoff recorded");
549        assert_eq!(seen_cutoff, now - Duration::seconds(7));
550    }
551
552    // -----------------------------------------------------------------------
553    // WorkerHealthTracker tests
554    // -----------------------------------------------------------------------
555
556    #[test]
557    fn worker_health_tracker_quarantines_after_threshold() {
558        let tracker = WorkerHealthTracker::new(3);
559        assert!(!tracker.is_quarantined("w1"));
560
561        let just_quarantined = tracker.record_expiry("w1");
562        assert!(!just_quarantined); // 1 < 3
563        tracker.record_expiry("w1"); // 2 < 3
564        let just_quarantined = tracker.record_expiry("w1"); // 3 >= 3
565        assert!(just_quarantined);
566        assert!(tracker.is_quarantined("w1"));
567    }
568
569    #[test]
570    fn worker_health_tracker_heartbeat_clears_quarantine() {
571        let tracker = WorkerHealthTracker::new(2);
572        tracker.record_expiry("w1");
573        tracker.record_expiry("w1");
574        assert!(tracker.is_quarantined("w1"));
575
576        tracker.record_heartbeat("w1", 1_700_000_000_000);
577        assert!(!tracker.is_quarantined("w1"));
578
579        let health = tracker.get("w1").expect("health record exists");
580        assert_eq!(health.lease_expiry_count, 0);
581        assert_eq!(health.last_heartbeat_ms, Some(1_700_000_000_000));
582    }
583
584    #[test]
585    fn worker_health_tracker_clear_quarantine_explicit() {
586        let tracker = WorkerHealthTracker::new(1);
587        tracker.record_expiry("w1");
588        assert!(tracker.is_quarantined("w1"));
589
590        tracker.clear_quarantine("w1");
591        assert!(!tracker.is_quarantined("w1"));
592        assert_eq!(tracker.get("w1").map(|h| h.lease_expiry_count), Some(0));
593    }
594
595    #[test]
596    fn worker_health_tracker_unknown_worker_not_quarantined() {
597        let tracker = WorkerHealthTracker::new(3);
598        assert!(!tracker.is_quarantined("unknown-worker"));
599        assert!(tracker.get("unknown-worker").is_none());
600    }
601}