1use chrono::{DateTime, Duration, Utc};
9
10use oris_kernel::event::KernelError;
11
12use super::models::LeaseRecord;
13use super::repository::RuntimeRepository;
14
15#[derive(Clone, Debug)]
17pub struct WorkerLease {
18 record: LeaseRecord,
19}
20
21impl WorkerLease {
22 pub fn from_record(record: LeaseRecord) -> Self {
24 Self { record }
25 }
26
27 pub fn record(&self) -> &LeaseRecord {
29 &self.record
30 }
31
32 pub fn lease_id(&self) -> &str {
33 &self.record.lease_id
34 }
35
36 pub fn attempt_id(&self) -> &str {
37 &self.record.attempt_id
38 }
39
40 pub fn worker_id(&self) -> &str {
41 &self.record.worker_id
42 }
43
44 pub fn is_expired(&self, now: DateTime<Utc>) -> bool {
46 now >= self.record.lease_expires_at
47 }
48
49 pub fn verify_owner(&self, worker_id: &str) -> Result<(), KernelError> {
51 if self.record.worker_id != worker_id {
52 return Err(KernelError::Driver(format!(
53 "lease {} is owned by {}, not {}",
54 self.record.lease_id, self.record.worker_id, worker_id
55 )));
56 }
57 Ok(())
58 }
59
60 pub fn check_execution_allowed(
62 &self,
63 worker_id: &str,
64 now: DateTime<Utc>,
65 ) -> Result<(), KernelError> {
66 self.verify_owner(worker_id)?;
67 if self.is_expired(now) {
68 return Err(KernelError::Driver(format!(
69 "lease {} expired at {}",
70 self.record.lease_id, self.record.lease_expires_at
71 )));
72 }
73 Ok(())
74 }
75}
76
77#[derive(Clone, Debug)]
79pub struct LeaseConfig {
80 pub lease_ttl: Duration,
81 pub heartbeat_grace: Duration,
82}
83
84impl Default for LeaseConfig {
85 fn default() -> Self {
86 Self {
87 lease_ttl: Duration::seconds(30),
88 heartbeat_grace: Duration::seconds(5),
89 }
90 }
91}
92
93#[derive(Clone, Debug, Default)]
95pub struct LeaseTickResult {
96 pub timed_out: u64,
97 pub expired_requeued: u64,
98}
99
100pub trait LeaseManager: Send + Sync {
102 fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError>;
103}
104
105pub struct RepositoryLeaseManager<R: RuntimeRepository> {
107 repository: R,
108 config: LeaseConfig,
109}
110
111impl<R: RuntimeRepository> RepositoryLeaseManager<R> {
112 pub fn new(repository: R, config: LeaseConfig) -> Self {
113 Self { repository, config }
114 }
115}
116
117impl<R: RuntimeRepository> LeaseManager for RepositoryLeaseManager<R> {
118 fn tick(&self, now: DateTime<Utc>) -> Result<LeaseTickResult, KernelError> {
119 let stale_before = now - self.config.heartbeat_grace;
120 let timed_out = self.repository.transition_timed_out_attempts(now)?;
121 let expired = self.repository.expire_leases_and_requeue(stale_before)?;
122 Ok(LeaseTickResult {
123 timed_out,
124 expired_requeued: expired,
125 })
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use std::sync::{Arc, Mutex};
132
133 use super::*;
134 use oris_kernel::identity::{RunId, Seq};
135
136 use super::super::models::{AttemptDispatchRecord, LeaseRecord};
137
138 #[derive(Clone)]
139 struct FakeRepository {
140 timed_out: u64,
141 expired: u64,
142 seen_cutoff: Arc<Mutex<Option<DateTime<Utc>>>>,
143 }
144
145 impl FakeRepository {
146 fn new(timed_out: u64, expired: u64) -> Self {
147 Self {
148 timed_out,
149 expired,
150 seen_cutoff: Arc::new(Mutex::new(None)),
151 }
152 }
153 }
154
155 impl RuntimeRepository for FakeRepository {
156 fn list_dispatchable_attempts(
157 &self,
158 _now: DateTime<Utc>,
159 _limit: usize,
160 ) -> Result<Vec<AttemptDispatchRecord>, KernelError> {
161 Ok(Vec::new())
162 }
163
164 fn upsert_lease(
165 &self,
166 _attempt_id: &str,
167 _worker_id: &str,
168 lease_expires_at: DateTime<Utc>,
169 ) -> Result<LeaseRecord, KernelError> {
170 Ok(LeaseRecord {
171 lease_id: "lease-test".to_string(),
172 attempt_id: "attempt-test".to_string(),
173 worker_id: "worker-test".to_string(),
174 lease_expires_at,
175 heartbeat_at: Utc::now(),
176 version: 1,
177 })
178 }
179
180 fn heartbeat_lease(
181 &self,
182 _lease_id: &str,
183 _heartbeat_at: DateTime<Utc>,
184 _lease_expires_at: DateTime<Utc>,
185 ) -> Result<(), KernelError> {
186 Ok(())
187 }
188
189 fn expire_leases_and_requeue(
190 &self,
191 stale_before: DateTime<Utc>,
192 ) -> Result<u64, KernelError> {
193 *self.seen_cutoff.lock().expect("cutoff lock") = Some(stale_before);
194 Ok(self.expired)
195 }
196
197 fn transition_timed_out_attempts(&self, _now: DateTime<Utc>) -> Result<u64, KernelError> {
198 Ok(self.timed_out)
199 }
200
201 fn latest_seq_for_run(&self, _run_id: &RunId) -> Result<Seq, KernelError> {
202 Ok(0)
203 }
204
205 fn upsert_bounty(&self, _: &super::super::models::BountyRecord) -> Result<(), KernelError> {
206 Ok(())
207 }
208 fn get_bounty(
209 &self,
210 _: &str,
211 ) -> Result<Option<super::super::models::BountyRecord>, KernelError> {
212 Ok(None)
213 }
214 fn list_bounties(
215 &self,
216 _: Option<&str>,
217 _: usize,
218 ) -> Result<Vec<super::super::models::BountyRecord>, KernelError> {
219 Ok(vec![])
220 }
221 fn accept_bounty(&self, _: &str, _: &str) -> Result<(), KernelError> {
222 Ok(())
223 }
224 fn close_bounty(&self, _: &str) -> Result<(), KernelError> {
225 Ok(())
226 }
227 fn upsert_swarm_decomposition(
228 &self,
229 _: &super::super::models::SwarmTaskRecord,
230 ) -> Result<(), KernelError> {
231 Ok(())
232 }
233 fn get_swarm_decomposition(
234 &self,
235 _: &str,
236 ) -> Result<Option<super::super::models::SwarmTaskRecord>, KernelError> {
237 Ok(None)
238 }
239 fn register_worker(
240 &self,
241 _: &super::super::models::WorkerRecord,
242 ) -> Result<(), KernelError> {
243 Ok(())
244 }
245 fn get_worker(
246 &self,
247 _: &str,
248 ) -> Result<Option<super::super::models::WorkerRecord>, KernelError> {
249 Ok(None)
250 }
251 fn list_workers(
252 &self,
253 _: Option<&str>,
254 _: Option<&str>,
255 _: usize,
256 ) -> Result<Vec<super::super::models::WorkerRecord>, KernelError> {
257 Ok(vec![])
258 }
259 fn heartbeat_worker(&self, _: &str, _: i64) -> Result<(), KernelError> {
260 Ok(())
261 }
262 fn create_recipe(&self, _: &super::super::models::RecipeRecord) -> Result<(), KernelError> {
263 Ok(())
264 }
265 fn get_recipe(
266 &self,
267 _: &str,
268 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
269 Ok(None)
270 }
271 fn fork_recipe(
272 &self,
273 _: &str,
274 _: &str,
275 _: &str,
276 ) -> Result<Option<super::super::models::RecipeRecord>, KernelError> {
277 Ok(None)
278 }
279 fn list_recipes(
280 &self,
281 _: Option<&str>,
282 _: usize,
283 ) -> Result<Vec<super::super::models::RecipeRecord>, KernelError> {
284 Ok(vec![])
285 }
286 fn express_organism(
287 &self,
288 _: &super::super::models::OrganismRecord,
289 ) -> Result<(), KernelError> {
290 Ok(())
291 }
292 fn get_organism(
293 &self,
294 _: &str,
295 ) -> Result<Option<super::super::models::OrganismRecord>, KernelError> {
296 Ok(None)
297 }
298 fn update_organism(&self, _: &str, _: i32, _: &str) -> Result<(), KernelError> {
299 Ok(())
300 }
301 fn create_session(
302 &self,
303 _: &super::super::models::SessionRecord,
304 ) -> Result<(), KernelError> {
305 Ok(())
306 }
307 fn get_session(
308 &self,
309 _: &str,
310 ) -> Result<Option<super::super::models::SessionRecord>, KernelError> {
311 Ok(None)
312 }
313 fn add_session_message(
314 &self,
315 _: &super::super::models::SessionMessageRecord,
316 ) -> Result<(), KernelError> {
317 Ok(())
318 }
319 fn get_session_history(
320 &self,
321 _: &str,
322 _: usize,
323 ) -> Result<Vec<super::super::models::SessionMessageRecord>, KernelError> {
324 Ok(vec![])
325 }
326 fn open_dispute(&self, _: &super::super::models::DisputeRecord) -> Result<(), KernelError> {
327 Ok(())
328 }
329 fn get_dispute(
330 &self,
331 _: &str,
332 ) -> Result<Option<super::super::models::DisputeRecord>, KernelError> {
333 Ok(None)
334 }
335 fn get_disputes_for_bounty(
336 &self,
337 _: &str,
338 ) -> Result<Vec<super::super::models::DisputeRecord>, KernelError> {
339 Ok(vec![])
340 }
341 fn resolve_dispute(&self, _: &str, _: &str, _: &str) -> Result<(), KernelError> {
342 Ok(())
343 }
344 }
345
346 #[test]
347 fn worker_lease_verify_owner_accepts_owner() {
348 let record = LeaseRecord {
349 lease_id: "L1".to_string(),
350 attempt_id: "A1".to_string(),
351 worker_id: "W1".to_string(),
352 lease_expires_at: Utc::now() + Duration::seconds(60),
353 heartbeat_at: Utc::now(),
354 version: 1,
355 };
356 let lease = WorkerLease::from_record(record);
357 assert!(lease.verify_owner("W1").is_ok());
358 assert!(lease.verify_owner("W2").is_err());
359 }
360
361 #[test]
362 fn worker_lease_is_expired() {
363 let now = Utc::now();
364 let record = LeaseRecord {
365 lease_id: "L1".to_string(),
366 attempt_id: "A1".to_string(),
367 worker_id: "W1".to_string(),
368 lease_expires_at: now - Duration::seconds(1),
369 heartbeat_at: now - Duration::seconds(2),
370 version: 1,
371 };
372 let lease = WorkerLease::from_record(record);
373 assert!(lease.is_expired(now));
374 assert!(!lease.is_expired(now - Duration::seconds(2)));
375 }
376
377 #[test]
378 fn worker_lease_check_execution_allowed() {
379 let now = Utc::now();
380 let record = LeaseRecord {
381 lease_id: "L1".to_string(),
382 attempt_id: "A1".to_string(),
383 worker_id: "W1".to_string(),
384 lease_expires_at: now + Duration::seconds(10),
385 heartbeat_at: now,
386 version: 1,
387 };
388 let lease = WorkerLease::from_record(record);
389 assert!(lease.check_execution_allowed("W1", now).is_ok());
390 assert!(lease.check_execution_allowed("W2", now).is_err());
391 assert!(lease
392 .check_execution_allowed("W1", now + Duration::seconds(11))
393 .is_err());
394 }
395
396 #[test]
397 fn tick_applies_heartbeat_grace_before_requeueing() {
398 let repo = FakeRepository::new(2, 3);
399 let config = LeaseConfig {
400 lease_ttl: Duration::seconds(30),
401 heartbeat_grace: Duration::seconds(7),
402 };
403 let manager = RepositoryLeaseManager::new(repo.clone(), config);
404 let now = Utc::now();
405
406 let result = manager.tick(now).expect("tick succeeds");
407
408 assert_eq!(result.timed_out, 2);
409 assert_eq!(result.expired_requeued, 3);
410 let seen_cutoff = repo
411 .seen_cutoff
412 .lock()
413 .expect("cutoff lock")
414 .expect("cutoff recorded");
415 assert_eq!(seen_cutoff, now - Duration::seconds(7));
416 }
417}