Skip to main content

oris_execution_runtime/
lease.rs

1//! Lease management for Phase 1: single-owner execution, expiry, and recovery.
2//!
3//! [WorkerLease] wraps [super::models::LeaseRecord] to strictly enforce single-owner
4//! execution: call [WorkerLease::verify_owner] and [WorkerLease::is_expired] before
5//! running work. Lease expiry and recovery are handled by [LeaseManager::tick]
6//! (expire stale leases, requeue attempts); replay-restart is re-dispatch after requeue.
7
8use chrono::{DateTime, Duration, Utc};
9
10use oris_kernel::event::KernelError;
11
12use super::models::LeaseRecord;
13use super::repository::RuntimeRepository;
14
15/// Strict single-owner execution guard for a lease. Verify ownership and expiry before executing.
16#[derive(Clone, Debug)]
17pub struct WorkerLease {
18    record: LeaseRecord,
19}
20
21impl WorkerLease {
22    /// Build a worker lease from a repository lease record (e.g. from `get_lease_for_attempt`).
23    pub fn from_record(record: LeaseRecord) -> Self {
24        Self { record }
25    }
26
27    /// Lease record for heartbeat or persistence.
28    pub fn record(&self) -> &LeaseRecord {
29        &self.record
30    }
31
32    pub fn lease_id(&self) -> &str {
33        &self.record.lease_id
34    }
35
36    pub fn attempt_id(&self) -> &str {
37        &self.record.attempt_id
38    }
39
40    pub fn worker_id(&self) -> &str {
41        &self.record.worker_id
42    }
43
44    /// Returns true if the lease has passed its expiry time (no heartbeat grace here).
45    pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
46        now >= self.record.lease_expires_at
47    }
48
49    /// Enforce single-owner: returns `Ok(())` only if `worker_id` matches the lease owner.
50    pub fn verify_owner(&self, worker_id: &str) -> Result<(), KernelError> {
51        if self.record.worker_id != worker_id {
52            return Err(KernelError::Driver(format!(
53                "lease {} is owned by {}, not {}",
54                self.record.lease_id, self.record.worker_id, worker_id
55            )));
56        }
57        Ok(())
58    }
59
60    /// Returns `Ok(())` if the given worker owns the lease and it is not yet expired.
61    pub fn check_execution_allowed(
62        &self,
63        worker_id: &str,
64        now: DateTime<Utc>,
65    ) -> Result<(), KernelError> {
66        self.verify_owner(worker_id)?;
67        if self.is_expired(now) {
68            return Err(KernelError::Driver(format!(
69                "lease {} expired at {}",
70                self.record.lease_id, self.record.lease_expires_at
71            )));
72        }
73        Ok(())
74    }
75}
76
77/// Lease behavior tuning knobs for scheduler/data-plane coordination.
78#[derive(Clone, Debug)]
79pub struct LeaseConfig {
80    pub lease_ttl: Duration,
81    pub heartbeat_grace: Duration,
82}
83
84impl Default for LeaseConfig {
85    fn default() -> Self {
86        Self {
87            lease_ttl: Duration::seconds(30),
88            heartbeat_grace: Duration::seconds(5),
89        }
90    }
91}
92
93/// Result of a periodic lease tick.
94#[derive(Clone, Debug, Default)]
95pub struct LeaseTickResult {
96    pub timed_out: u64,
97    pub expired_requeued: u64,
98}
99
100/// Lease manager abstraction.
101pub trait LeaseManager: Send + Sync {
102    fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError>;
103}
104
105/// Skeleton lease manager using `RuntimeRepository`.
106pub struct RepositoryLeaseManager<R: RuntimeRepository> {
107    repository: R,
108    config: LeaseConfig,
109}
110
111impl<R: RuntimeRepository> RepositoryLeaseManager<R> {
112    pub fn new(repository: R, config: LeaseConfig) -> Self {
113        Self { repository, config }
114    }
115}
116
117impl<R: RuntimeRepository> LeaseManager for RepositoryLeaseManager<R> {
118    fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError> {
119        let stale_before = now - self.config.heartbeat_grace;
120        let timed_out = self.repository.transition_timed_out_attempts(now)?;
121        let expired = self.repository.expire_leases_and_requeue(stale_before)?;
122        Ok(LeaseTickResult {
123            timed_out,
124            expired_requeued: expired,
125        })
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use std::sync::{Arc, Mutex};
132
133    use super::*;
134    use oris_kernel::identity::{RunId, Seq};
135
136    use super::super::models::{AttemptDispatchRecord, LeaseRecord};
137
138    #[derive(Clone)]
139    struct FakeRepository {
140        timed_out: u64,
141        expired: u64,
142        seen_cutoff: Arc<Mutex<Option<DateTime<Utc>>>>,
143    }
144
145    impl FakeRepository {
146        fn new(timed_out: u64, expired: u64) -> Self {
147            Self {
148                timed_out,
149                expired,
150                seen_cutoff: Arc::new(Mutex::new(None)),
151            }
152        }
153    }
154
155    impl RuntimeRepository for FakeRepository {
156        fn list_dispatchable_attempts(
157            &self,
158            _now: DateTime<Utc>,
159            _limit: usize,
160        ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
161            Ok(Vec::new())
162        }
163
164        fn upsert_lease(
165            &self,
166            _attempt_id: &str,
167            _worker_id: &str,
168            lease_expires_at: DateTime<Utc>,
169        ) -> Result<LeaseRecord, KernelError> {
170            Ok(LeaseRecord {
171                lease_id: "lease-test".to_string(),
172                attempt_id: "attempt-test".to_string(),
173                worker_id: "worker-test".to_string(),
174                lease_expires_at,
175                heartbeat_at: Utc::now(),
176                version: 1,
177            })
178        }
179
180        fn heartbeat_lease(
181            &self,
182            _lease_id: &str,
183            _heartbeat_at: DateTime<Utc>,
184            _lease_expires_at: DateTime<Utc>,
185        ) -> Result<(), KernelError> {
186            Ok(())
187        }
188
189        fn expire_leases_and_requeue(
190            &self,
191            stale_before: DateTime<Utc>,
192        ) -> Result<u64, KernelError> {
193            *self.seen_cutoff.lock().expect("cutoff lock") = Some(stale_before);
194            Ok(self.expired)
195        }
196
197        fn transition_timed_out_attempts(&self, _now: DateTime<Utc>) -> Result<u64, KernelError> {
198            Ok(self.timed_out)
199        }
200
201        fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
202            Ok(0)
203        }
204
205        fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
206            Ok(())
207        }
208        fn get_bounty(
209            &self,
210            _: &str,
211        ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
212            Ok(None)
213        }
214        fn list_bounties(
215            &self,
216            _: Option<&str>,
217            _: usize,
218        ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
219            Ok(vec![])
220        }
221        fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
222            Ok(())
223        }
224        fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
225            Ok(())
226        }
227        fn upsert_swarm_decomposition(
228            &self,
229            _: &super::super::models::SwarmTaskRecord,
230        ) -> Result<(), KernelError> {
231            Ok(())
232        }
233        fn get_swarm_decomposition(
234            &self,
235            _: &str,
236        ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
237            Ok(None)
238        }
239        fn register_worker(
240            &self,
241            _: &super::super::models::WorkerRecord,
242        ) -> Result<(), KernelError> {
243            Ok(())
244        }
245        fn get_worker(
246            &self,
247            _: &str,
248        ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
249            Ok(None)
250        }
251        fn list_workers(
252            &self,
253            _: Option<&str>,
254            _: Option<&str>,
255            _: usize,
256        ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
257            Ok(vec![])
258        }
259        fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
260            Ok(())
261        }
262        fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
263            Ok(())
264        }
265        fn get_recipe(
266            &self,
267            _: &str,
268        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
269            Ok(None)
270        }
271        fn fork_recipe(
272            &self,
273            _: &str,
274            _: &str,
275            _: &str,
276        ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
277            Ok(None)
278        }
279        fn list_recipes(
280            &self,
281            _: Option<&str>,
282            _: usize,
283        ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
284            Ok(vec![])
285        }
286        fn express_organism(
287            &self,
288            _: &super::super::models::OrganismRecord,
289        ) -> Result<(), KernelError> {
290            Ok(())
291        }
292        fn get_organism(
293            &self,
294            _: &str,
295        ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
296            Ok(None)
297        }
298        fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
299            Ok(())
300        }
301        fn create_session(
302            &self,
303            _: &super::super::models::SessionRecord,
304        ) -> Result<(), KernelError> {
305            Ok(())
306        }
307        fn get_session(
308            &self,
309            _: &str,
310        ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
311            Ok(None)
312        }
313        fn add_session_message(
314            &self,
315            _: &super::super::models::SessionMessageRecord,
316        ) -> Result<(), KernelError> {
317            Ok(())
318        }
319        fn get_session_history(
320            &self,
321            _: &str,
322            _: usize,
323        ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
324            Ok(vec![])
325        }
326        fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
327            Ok(())
328        }
329        fn get_dispute(
330            &self,
331            _: &str,
332        ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
333            Ok(None)
334        }
335        fn get_disputes_for_bounty(
336            &self,
337            _: &str,
338        ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
339            Ok(vec![])
340        }
341        fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
342            Ok(())
343        }
344    }
345
346    #[test]
347    fn worker_lease_verify_owner_accepts_owner() {
348        let record = LeaseRecord {
349            lease_id: "L1".to_string(),
350            attempt_id: "A1".to_string(),
351            worker_id: "W1".to_string(),
352            lease_expires_at: Utc::now() + Duration::seconds(60),
353            heartbeat_at: Utc::now(),
354            version: 1,
355        };
356        let lease = WorkerLease::from_record(record);
357        assert!(lease.verify_owner("W1").is_ok());
358        assert!(lease.verify_owner("W2").is_err());
359    }
360
361    #[test]
362    fn worker_lease_is_expired() {
363        let now = Utc::now();
364        let record = LeaseRecord {
365            lease_id: "L1".to_string(),
366            attempt_id: "A1".to_string(),
367            worker_id: "W1".to_string(),
368            lease_expires_at: now - Duration::seconds(1),
369            heartbeat_at: now - Duration::seconds(2),
370            version: 1,
371        };
372        let lease = WorkerLease::from_record(record);
373        assert!(lease.is_expired(now));
374        assert!(!lease.is_expired(now - Duration::seconds(2)));
375    }
376
377    #[test]
378    fn worker_lease_check_execution_allowed() {
379        let now = Utc::now();
380        let record = LeaseRecord {
381            lease_id: "L1".to_string(),
382            attempt_id: "A1".to_string(),
383            worker_id: "W1".to_string(),
384            lease_expires_at: now + Duration::seconds(10),
385            heartbeat_at: now,
386            version: 1,
387        };
388        let lease = WorkerLease::from_record(record);
389        assert!(lease.check_execution_allowed("W1", now).is_ok());
390        assert!(lease.check_execution_allowed("W2", now).is_err());
391        assert!(lease
392            .check_execution_allowed("W1", now + Duration::seconds(11))
393            .is_err());
394    }
395
396    #[test]
397    fn tick_applies_heartbeat_grace_before_requeueing() {
398        let repo = FakeRepository::new(2, 3);
399        let config = LeaseConfig {
400            lease_ttl: Duration::seconds(30),
401            heartbeat_grace: Duration::seconds(7),
402        };
403        let manager = RepositoryLeaseManager::new(repo.clone(), config);
404        let now = Utc::now();
405
406        let result = manager.tick(now).expect("tick succeeds");
407
408        assert_eq!(result.timed_out, 2);
409        assert_eq!(result.expired_requeued, 3);
410        let seen_cutoff = repo
411            .seen_cutoff
412            .lock()
413            .expect("cutoff lock")
414            .expect("cutoff recorded");
415        assert_eq!(seen_cutoff, now - Duration::seconds(7));
416    }
417}