1use chrono::{DateTime, Duration, Utc};
9
10use oris_kernel::event::KernelError;
11
12use super::models::LeaseRecord;
13use super::repository::RuntimeRepository;
14
15#[derive(Clone, Debug)]
17pub struct WorkerLease {
18 record: LeaseRecord,
19}
20
21impl WorkerLease {
22 pub fn from_record(record: LeaseRecord) -> Self {
24 Self { record }
25 }
26
27 pub fn record(&self) -> &LeaseRecord {
29 &self.record
30 }
31
32 pub fn lease_id(&self) -> &str {
33 &self.record.lease_id
34 }
35
36 pub fn attempt_id(&self) -> &str {
37 &self.record.attempt_id
38 }
39
40 pub fn worker_id(&self) -> &str {
41 &self.record.worker_id
42 }
43
44 pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
46 now >= self.record.lease_expires_at
47 }
48
49 pub fn verify_owner(&self, worker_id: &str) -> Result<(), KernelError> {
51 if self.record.worker_id != worker_id {
52 return Err(KernelError::Driver(format!(
53 "lease {} is owned by {}, not {}",
54 self.record.lease_id, self.record.worker_id, worker_id
55 )));
56 }
57 Ok(())
58 }
59
60 pub fn check_execution_allowed(
62 &self,
63 worker_id: &str,
64 now: DateTime<Utc>,
65 ) -> Result<(), KernelError> {
66 self.verify_owner(worker_id)?;
67 if self.is_expired(now) {
68 return Err(KernelError::Driver(format!(
69 "lease {} expired at {}",
70 self.record.lease_id, self.record.lease_expires_at
71 )));
72 }
73 Ok(())
74 }
75}
76
77#[derive(Clone, Debug)]
79pub struct LeaseConfig {
80 pub lease_ttl: Duration,
81 pub heartbeat_grace: Duration,
82}
83
84impl Default for LeaseConfig {
85 fn default() -> Self {
86 Self {
87 lease_ttl: Duration::seconds(30),
88 heartbeat_grace: Duration::seconds(5),
89 }
90 }
91}
92
93#[derive(Clone, Debug, Default)]
95pub struct LeaseTickResult {
96 pub timed_out: u64,
97 pub expired_requeued: u64,
98}
99
100pub trait LeaseManager: Send + Sync {
102 fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError>;
103}
104
105pub struct RepositoryLeaseManager<R: RuntimeRepository> {
107 repository: R,
108 config: LeaseConfig,
109}
110
111impl<R: RuntimeRepository> RepositoryLeaseManager<R> {
112 pub fn new(repository: R, config: LeaseConfig) -> Self {
113 Self { repository, config }
114 }
115}
116
117impl<R: RuntimeRepository> LeaseManager for RepositoryLeaseManager<R> {
118 fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError> {
119 let stale_before = now - self.config.heartbeat_grace;
120 let timed_out = self.repository.transition_timed_out_attempts(now)?;
121 let expired = self.repository.expire_leases_and_requeue(stale_before)?;
122 Ok(LeaseTickResult {
123 timed_out,
124 expired_requeued: expired,
125 })
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use std::sync::{Arc, Mutex};
132
133 use super::*;
134 use oris_kernel::identity::{RunId, Seq};
135
136 use super::super::models::{AttemptDispatchRecord, LeaseRecord};
137
138 #[derive(Clone)]
139 struct FakeRepository {
140 timed_out: u64,
141 expired: u64,
142 seen_cutoff: Arc<Mutex<Option<DateTime<Utc>>>>,
143 }
144
145 impl FakeRepository {
146 fn new(timed_out: u64, expired: u64) -> Self {
147 Self {
148 timed_out,
149 expired,
150 seen_cutoff: Arc::new(Mutex::new(None)),
151 }
152 }
153 }
154
155 impl RuntimeRepository for FakeRepository {
156 fn list_dispatchable_attempts(
157 &self,
158 _now: DateTime<Utc>,
159 _limit: usize,
160 ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
161 Ok(Vec::new())
162 }
163
164 fn upsert_lease(
165 &self,
166 _attempt_id: &str,
167 _worker_id: &str,
168 lease_expires_at: DateTime<Utc>,
169 ) -> Result<LeaseRecord, KernelError> {
170 Ok(LeaseRecord {
171 lease_id: "lease-test".to_string(),
172 attempt_id: "attempt-test".to_string(),
173 worker_id: "worker-test".to_string(),
174 lease_expires_at,
175 heartbeat_at: Utc::now(),
176 version: 1,
177 })
178 }
179
180 fn heartbeat_lease(
181 &self,
182 _lease_id: &str,
183 _heartbeat_at: DateTime<Utc>,
184 _lease_expires_at: DateTime<Utc>,
185 ) -> Result<(), KernelError> {
186 Ok(())
187 }
188
189 fn expire_leases_and_requeue(
190 &self,
191 stale_before: DateTime<Utc>,
192 ) -> Result<u64, KernelError> {
193 *self.seen_cutoff.lock().expect("cutoff lock") = Some(stale_before);
194 Ok(self.expired)
195 }
196
197 fn transition_timed_out_attempts(&self, _now: DateTime<Utc>) -> Result<u64, KernelError> {
198 Ok(self.timed_out)
199 }
200
201 fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
202 Ok(0)
203 }
204 }
205
206 #[test]
207 fn worker_lease_verify_owner_accepts_owner() {
208 let record = LeaseRecord {
209 lease_id: "L1".to_string(),
210 attempt_id: "A1".to_string(),
211 worker_id: "W1".to_string(),
212 lease_expires_at: Utc::now() + Duration::seconds(60),
213 heartbeat_at: Utc::now(),
214 version: 1,
215 };
216 let lease = WorkerLease::from_record(record);
217 assert!(lease.verify_owner("W1").is_ok());
218 assert!(lease.verify_owner("W2").is_err());
219 }
220
221 #[test]
222 fn worker_lease_is_expired() {
223 let now = Utc::now();
224 let record = LeaseRecord {
225 lease_id: "L1".to_string(),
226 attempt_id: "A1".to_string(),
227 worker_id: "W1".to_string(),
228 lease_expires_at: now - Duration::seconds(1),
229 heartbeat_at: now - Duration::seconds(2),
230 version: 1,
231 };
232 let lease = WorkerLease::from_record(record);
233 assert!(lease.is_expired(now));
234 assert!(!lease.is_expired(now - Duration::seconds(2)));
235 }
236
237 #[test]
238 fn worker_lease_check_execution_allowed() {
239 let now = Utc::now();
240 let record = LeaseRecord {
241 lease_id: "L1".to_string(),
242 attempt_id: "A1".to_string(),
243 worker_id: "W1".to_string(),
244 lease_expires_at: now + Duration::seconds(10),
245 heartbeat_at: now,
246 version: 1,
247 };
248 let lease = WorkerLease::from_record(record);
249 assert!(lease.check_execution_allowed("W1", now).is_ok());
250 assert!(lease.check_execution_allowed("W2", now).is_err());
251 assert!(lease
252 .check_execution_allowed("W1", now + Duration::seconds(11))
253 .is_err());
254 }
255
256 #[test]
257 fn tick_applies_heartbeat_grace_before_requeueing() {
258 let repo = FakeRepository::new(2, 3);
259 let config = LeaseConfig {
260 lease_ttl: Duration::seconds(30),
261 heartbeat_grace: Duration::seconds(7),
262 };
263 let manager = RepositoryLeaseManager::new(repo.clone(), config);
264 let now = Utc::now();
265
266 let result = manager.tick(now).expect("tick succeeds");
267
268 assert_eq!(result.timed_out, 2);
269 assert_eq!(result.expired_requeued, 3);
270 let seen_cutoff = repo
271 .seen_cutoff
272 .lock()
273 .expect("cutoff lock")
274 .expect("cutoff recorded");
275 assert_eq!(seen_cutoff, now - Duration::seconds(7));
276 }
277}