Skip to main content

oris_execution_runtime/
lease.rs

1//! Lease management for Phase 1: single-owner execution, expiry, and recovery.
2//!
3//! [WorkerLease] wraps [super::models::LeaseRecord] to strictly enforce single-owner
4//! execution: call [WorkerLease::verify_owner] and [WorkerLease::is_expired] before
5//! running work. Lease expiry and recovery are handled by [LeaseManager::tick]
6//! (expire stale leases, requeue attempts); replay-restart is re-dispatch after requeue.
7
8use chrono::{DateTime, Duration, Utc};
9
10use oris_kernel::event::KernelError;
11
12use super::models::LeaseRecord;
13use super::repository::RuntimeRepository;
14
15/// Strict single-owner execution guard for a lease. Verify ownership and expiry before executing.
16#[derive(Clone, Debug)]
17pub struct WorkerLease {
18    record: LeaseRecord,
19}
20
21impl WorkerLease {
22    /// Build a worker lease from a repository lease record (e.g. from `get_lease_for_attempt`).
23    pub fn from_record(record: LeaseRecord) -> Self {
24        Self { record }
25    }
26
27    /// Lease record for heartbeat or persistence.
28    pub fn record(&self) -> &LeaseRecord {
29        &self.record
30    }
31
32    pub fn lease_id(&self) -> &str {
33        &self.record.lease_id
34    }
35
36    pub fn attempt_id(&self) -> &str {
37        &self.record.attempt_id
38    }
39
40    pub fn worker_id(&self) -> &str {
41        &self.record.worker_id
42    }
43
44    /// Returns true if the lease has passed its expiry time (no heartbeat grace here).
45    pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
46        now >= self.record.lease_expires_at
47    }
48
49    /// Enforce single-owner: returns `Ok(())` only if `worker_id` matches the lease owner.
50    pub fn verify_owner(&self, worker_id: &str) -> Result<(), KernelError> {
51        if self.record.worker_id != worker_id {
52            return Err(KernelError::Driver(format!(
53                "lease {} is owned by {}, not {}",
54                self.record.lease_id, self.record.worker_id, worker_id
55            )));
56        }
57        Ok(())
58    }
59
60    /// Returns `Ok(())` if the given worker owns the lease and it is not yet expired.
61    pub fn check_execution_allowed(
62        &self,
63        worker_id: &str,
64        now: DateTime<Utc>,
65    ) -> Result<(), KernelError> {
66        self.verify_owner(worker_id)?;
67        if self.is_expired(now) {
68            return Err(KernelError::Driver(format!(
69                "lease {} expired at {}",
70                self.record.lease_id, self.record.lease_expires_at
71            )));
72        }
73        Ok(())
74    }
75}
76
77/// Lease behavior tuning knobs for scheduler/data-plane coordination.
78#[derive(Clone, Debug)]
79pub struct LeaseConfig {
80    pub lease_ttl: Duration,
81    pub heartbeat_grace: Duration,
82}
83
84impl Default for LeaseConfig {
85    fn default() -> Self {
86        Self {
87            lease_ttl: Duration::seconds(30),
88            heartbeat_grace: Duration::seconds(5),
89        }
90    }
91}
92
93/// Result of a periodic lease tick.
94#[derive(Clone, Debug, Default)]
95pub struct LeaseTickResult {
96    pub timed_out: u64,
97    pub expired_requeued: u64,
98}
99
100/// Lease manager abstraction.
101pub trait LeaseManager: Send + Sync {
102    fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError>;
103}
104
105/// Skeleton lease manager using `RuntimeRepository`.
106pub struct RepositoryLeaseManager<R: RuntimeRepository> {
107    repository: R,
108    config: LeaseConfig,
109}
110
111impl<R: RuntimeRepository> RepositoryLeaseManager<R> {
112    pub fn new(repository: R, config: LeaseConfig) -> Self {
113        Self { repository, config }
114    }
115}
116
117impl<R: RuntimeRepository> LeaseManager for RepositoryLeaseManager<R> {
118    fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError> {
119        let stale_before = now - self.config.heartbeat_grace;
120        let timed_out = self.repository.transition_timed_out_attempts(now)?;
121        let expired = self.repository.expire_leases_and_requeue(stale_before)?;
122        Ok(LeaseTickResult {
123            timed_out,
124            expired_requeued: expired,
125        })
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use std::sync::{Arc, Mutex};
132
133    use super::*;
134    use oris_kernel::identity::{RunId, Seq};
135
136    use super::super::models::{AttemptDispatchRecord, LeaseRecord};
137
138    #[derive(Clone)]
139    struct FakeRepository {
140        timed_out: u64,
141        expired: u64,
142        seen_cutoff: Arc<Mutex<Option<DateTime<Utc>>>>,
143    }
144
145    impl FakeRepository {
146        fn new(timed_out: u64, expired: u64) -> Self {
147            Self {
148                timed_out,
149                expired,
150                seen_cutoff: Arc::new(Mutex::new(None)),
151            }
152        }
153    }
154
155    impl RuntimeRepository for FakeRepository {
156        fn list_dispatchable_attempts(
157            &self,
158            _now: DateTime<Utc>,
159            _limit: usize,
160        ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
161            Ok(Vec::new())
162        }
163
164        fn upsert_lease(
165            &self,
166            _attempt_id: &str,
167            _worker_id: &str,
168            lease_expires_at: DateTime<Utc>,
169        ) -> Result<LeaseRecord, KernelError> {
170            Ok(LeaseRecord {
171                lease_id: "lease-test".to_string(),
172                attempt_id: "attempt-test".to_string(),
173                worker_id: "worker-test".to_string(),
174                lease_expires_at,
175                heartbeat_at: Utc::now(),
176                version: 1,
177            })
178        }
179
180        fn heartbeat_lease(
181            &self,
182            _lease_id: &str,
183            _heartbeat_at: DateTime<Utc>,
184            _lease_expires_at: DateTime<Utc>,
185        ) -> Result<(), KernelError> {
186            Ok(())
187        }
188
189        fn expire_leases_and_requeue(
190            &self,
191            stale_before: DateTime<Utc>,
192        ) -> Result<u64, KernelError> {
193            *self.seen_cutoff.lock().expect("cutoff lock") = Some(stale_before);
194            Ok(self.expired)
195        }
196
197        fn transition_timed_out_attempts(&self, _now: DateTime<Utc>) -> Result<u64, KernelError> {
198            Ok(self.timed_out)
199        }
200
201        fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
202            Ok(0)
203        }
204    }
205
206    #[test]
207    fn worker_lease_verify_owner_accepts_owner() {
208        let record = LeaseRecord {
209            lease_id: "L1".to_string(),
210            attempt_id: "A1".to_string(),
211            worker_id: "W1".to_string(),
212            lease_expires_at: Utc::now() + Duration::seconds(60),
213            heartbeat_at: Utc::now(),
214            version: 1,
215        };
216        let lease = WorkerLease::from_record(record);
217        assert!(lease.verify_owner("W1").is_ok());
218        assert!(lease.verify_owner("W2").is_err());
219    }
220
221    #[test]
222    fn worker_lease_is_expired() {
223        let now = Utc::now();
224        let record = LeaseRecord {
225            lease_id: "L1".to_string(),
226            attempt_id: "A1".to_string(),
227            worker_id: "W1".to_string(),
228            lease_expires_at: now - Duration::seconds(1),
229            heartbeat_at: now - Duration::seconds(2),
230            version: 1,
231        };
232        let lease = WorkerLease::from_record(record);
233        assert!(lease.is_expired(now));
234        assert!(!lease.is_expired(now - Duration::seconds(2)));
235    }
236
237    #[test]
238    fn worker_lease_check_execution_allowed() {
239        let now = Utc::now();
240        let record = LeaseRecord {
241            lease_id: "L1".to_string(),
242            attempt_id: "A1".to_string(),
243            worker_id: "W1".to_string(),
244            lease_expires_at: now + Duration::seconds(10),
245            heartbeat_at: now,
246            version: 1,
247        };
248        let lease = WorkerLease::from_record(record);
249        assert!(lease.check_execution_allowed("W1", now).is_ok());
250        assert!(lease.check_execution_allowed("W2", now).is_err());
251        assert!(lease
252            .check_execution_allowed("W1", now + Duration::seconds(11))
253            .is_err());
254    }
255
256    #[test]
257    fn tick_applies_heartbeat_grace_before_requeueing() {
258        let repo = FakeRepository::new(2, 3);
259        let config = LeaseConfig {
260            lease_ttl: Duration::seconds(30),
261            heartbeat_grace: Duration::seconds(7),
262        };
263        let manager = RepositoryLeaseManager::new(repo.clone(), config);
264        let now = Utc::now();
265
266        let result = manager.tick(now).expect("tick succeeds");
267
268        assert_eq!(result.timed_out, 2);
269        assert_eq!(result.expired_requeued, 3);
270        let seen_cutoff = repo
271            .seen_cutoff
272            .lock()
273            .expect("cutoff lock")
274            .expect("cutoff recorded");
275        assert_eq!(seen_cutoff, now - Duration::seconds(7));
276    }
277}