1use crate::error::{ExecutionGuardError, ExecutionGuardErrorKind};
2use crate::valkey_execution_support::{
3 lease_key, next_token, now_millis, occurrence_index_key, resource_lock_key,
4};
5use crate::{
6 ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionGuardScope,
7 ExecutionLease, ExecutionSlot,
8};
9use redis::{Client, ErrorKind, Script, ServerErrorKind, aio::ConnectionManager};
10use std::fmt::{self, Display, Formatter};
11use std::num::TryFromIntError;
12use std::sync::atomic::AtomicU64;
13use std::time::Duration;
14
15const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
16
17static TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub struct ValkeyLeaseConfig {
21 pub ttl: Duration,
22 pub renew_interval: Duration,
23}
24
25#[derive(Debug, Clone)]
26pub struct ValkeyExecutionGuard {
27 connection: ConnectionManager,
28 key_prefix: String,
29 lease_config: ValkeyLeaseConfig,
30}
31
32impl ValkeyExecutionGuard {
33 pub async fn new(
34 url: impl AsRef<str>,
35 lease_config: ValkeyLeaseConfig,
36 ) -> Result<Self, ExecutionGuardError> {
37 Self::with_prefix(url, DEFAULT_KEY_PREFIX, lease_config).await
38 }
39
40 pub async fn with_prefix(
41 url: impl AsRef<str>,
42 key_prefix: impl Into<String>,
43 lease_config: ValkeyLeaseConfig,
44 ) -> Result<Self, ExecutionGuardError> {
45 lease_config.validate()?;
46 let client = Client::open(url.as_ref()).map_err(|error| {
47 let kind = classify_redis_error(&error);
48 ExecutionGuardError::new(error, kind)
49 })?;
50 let connection = client.get_connection_manager().await.map_err(|error| {
51 let kind = classify_redis_error(&error);
52 ExecutionGuardError::new(error, kind)
53 })?;
54
55 Ok(Self {
56 connection,
57 key_prefix: key_prefix.into(),
58 lease_config,
59 })
60 }
61
62 fn lease_key(&self, slot: &ExecutionSlot) -> String {
63 lease_key(&self.key_prefix, slot)
64 }
65
66 fn resource_lock_key(&self, resource_id: &str) -> String {
67 resource_lock_key(&self.key_prefix, resource_id)
68 }
69
70 fn occurrence_index_key(&self, resource_id: &str) -> String {
71 occurrence_index_key(&self.key_prefix, resource_id)
72 }
73
74 fn ttl_millis(&self) -> Result<u64, ValkeyExecutionGuardError> {
75 u64::try_from(self.lease_config.ttl.as_millis())
76 .map_err(ValkeyExecutionGuardError::DurationOutOfRange)
77 }
78}
79
80impl ExecutionGuard for ValkeyExecutionGuard {
81 type Error = ValkeyExecutionGuardError;
82
83 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
84 let lease_key = self.lease_key(&slot);
85 let token = next_token(&TOKEN_COUNTER, "lease");
86 let ttl_millis = self.ttl_millis()?;
87 let now_millis = now_millis();
88 let expires_at_millis = now_millis.saturating_add(ttl_millis);
89 let mut connection = self.connection.clone();
90 let acquired = match slot.scope {
91 ExecutionGuardScope::Occurrence => {
92 let resource_lock_key = self.resource_lock_key(&slot.resource_id);
93 let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
94 let acquired: i32 = Script::new(
95 r"
96 redis.call('ZREMRANGEBYSCORE', KEYS[3], '-inf', ARGV[1])
97 if redis.call('EXISTS', KEYS[1]) == 1 then
98 return 0
99 end
100 local ok = redis.call('SET', KEYS[2], ARGV[2], 'NX', 'PX', ARGV[3])
101 if not ok then
102 return 0
103 end
104 redis.call('ZADD', KEYS[3], ARGV[4], KEYS[2])
105 return 1
106 ",
107 )
108 .key(resource_lock_key)
109 .key(&lease_key)
110 .key(occurrence_index_key)
111 .arg(now_millis)
112 .arg(&token)
113 .arg(ttl_millis)
114 .arg(expires_at_millis)
115 .invoke_async(&mut connection)
116 .await
117 .map_err(ValkeyExecutionGuardError::Redis)?;
118 acquired == 1
119 }
120 ExecutionGuardScope::Resource => {
121 let resource_lock_key = self.resource_lock_key(&slot.resource_id);
122 let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
123 let acquired: i32 = Script::new(
124 r"
125 redis.call('ZREMRANGEBYSCORE', KEYS[2], '-inf', ARGV[1])
126 if redis.call('EXISTS', KEYS[1]) == 1 then
127 return 0
128 end
129 if redis.call('ZCARD', KEYS[2]) > 0 then
130 return 0
131 end
132 local ok = redis.call('SET', KEYS[1], ARGV[2], 'NX', 'PX', ARGV[3])
133 if not ok then
134 return 0
135 end
136 return 1
137 ",
138 )
139 .key(&resource_lock_key)
140 .key(occurrence_index_key)
141 .arg(now_millis)
142 .arg(&token)
143 .arg(ttl_millis)
144 .invoke_async(&mut connection)
145 .await
146 .map_err(ValkeyExecutionGuardError::Redis)?;
147 acquired == 1
148 }
149 };
150
151 Ok(if acquired {
152 ExecutionGuardAcquire::Acquired(ExecutionLease::new(
153 slot.job_id,
154 slot.resource_id,
155 slot.scope,
156 slot.scheduled_at,
157 token,
158 lease_key,
159 ))
160 } else {
161 ExecutionGuardAcquire::Contended
162 })
163 }
164
165 async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
166 let ttl_millis = self.ttl_millis()?;
167 let expires_at_millis = now_millis().saturating_add(ttl_millis);
168 let mut connection = self.connection.clone();
169 let renewed: i32 = match lease.scope {
170 ExecutionGuardScope::Occurrence => Script::new(
171 r"
172 if redis.call('GET', KEYS[1]) == ARGV[1] then
173 redis.call('PEXPIRE', KEYS[1], ARGV[2])
174 redis.call('ZADD', KEYS[2], ARGV[3], KEYS[1])
175 return 1
176 end
177 redis.call('ZREM', KEYS[2], KEYS[1])
178 return 0
179 ",
180 )
181 .key(&lease.lease_key)
182 .key(self.occurrence_index_key(&lease.resource_id))
183 .arg(&lease.token)
184 .arg(ttl_millis)
185 .arg(expires_at_millis)
186 .invoke_async(&mut connection)
187 .await
188 .map_err(ValkeyExecutionGuardError::Redis)?,
189 ExecutionGuardScope::Resource => Script::new(
190 r"
191 if redis.call('GET', KEYS[1]) == ARGV[1] then
192 return redis.call('PEXPIRE', KEYS[1], ARGV[2])
193 end
194 return 0
195 ",
196 )
197 .key(&lease.lease_key)
198 .arg(&lease.token)
199 .arg(ttl_millis)
200 .invoke_async(&mut connection)
201 .await
202 .map_err(ValkeyExecutionGuardError::Redis)?,
203 };
204
205 Ok(if renewed == 1 {
206 ExecutionGuardRenewal::Renewed
207 } else {
208 ExecutionGuardRenewal::Lost
209 })
210 }
211
212 async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
213 let mut connection = self.connection.clone();
214 let _: i32 = match lease.scope {
215 ExecutionGuardScope::Occurrence => Script::new(
216 r"
217 if redis.call('GET', KEYS[1]) == ARGV[1] then
218 redis.call('DEL', KEYS[1])
219 redis.call('ZREM', KEYS[2], KEYS[1])
220 return 1
221 end
222 redis.call('ZREM', KEYS[2], KEYS[1])
223 return 0
224 ",
225 )
226 .key(&lease.lease_key)
227 .key(self.occurrence_index_key(&lease.resource_id))
228 .arg(&lease.token)
229 .invoke_async(&mut connection)
230 .await
231 .map_err(ValkeyExecutionGuardError::Redis)?,
232 ExecutionGuardScope::Resource => Script::new(
233 r"
234 if redis.call('GET', KEYS[1]) == ARGV[1] then
235 return redis.call('DEL', KEYS[1])
236 end
237 return 0
238 ",
239 )
240 .key(&lease.lease_key)
241 .arg(&lease.token)
242 .invoke_async(&mut connection)
243 .await
244 .map_err(ValkeyExecutionGuardError::Redis)?,
245 };
246
247 Ok(())
248 }
249
250 fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
251 where
252 Self: Sized,
253 {
254 match error {
255 ValkeyExecutionGuardError::Config(_)
256 | ValkeyExecutionGuardError::DurationOutOfRange(_) => ExecutionGuardErrorKind::Data,
257 ValkeyExecutionGuardError::Redis(error) => classify_redis_error(error),
258 }
259 }
260
261 fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
262 Some(self.lease_config.renew_interval)
263 }
264}
265
266#[derive(Debug)]
267pub enum ValkeyExecutionGuardError {
268 Redis(redis::RedisError),
269 Config(LeaseConfigError),
270 DurationOutOfRange(TryFromIntError),
271}
272
273impl Display for ValkeyExecutionGuardError {
274 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
275 match self {
276 Self::Redis(error) => write!(f, "{error}"),
277 Self::Config(error) => write!(f, "{error}"),
278 Self::DurationOutOfRange(error) => write!(f, "{error}"),
279 }
280 }
281}
282
283impl std::error::Error for ValkeyExecutionGuardError {
284 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
285 match self {
286 Self::Redis(error) => Some(error),
287 Self::Config(error) => Some(error),
288 Self::DurationOutOfRange(error) => Some(error),
289 }
290 }
291}
292
293#[derive(Debug)]
294pub struct LeaseConfigError {
295 message: &'static str,
296}
297
298impl Display for LeaseConfigError {
299 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
300 f.write_str(self.message)
301 }
302}
303
304impl std::error::Error for LeaseConfigError {}
305
306impl ValkeyLeaseConfig {
307 fn validate(self) -> Result<Self, ExecutionGuardError> {
308 if self.ttl.is_zero() {
309 return Err(ExecutionGuardError::new(
310 LeaseConfigError {
311 message: "execution guard ttl must be greater than zero",
312 },
313 ExecutionGuardErrorKind::Data,
314 ));
315 }
316
317 if self.renew_interval.is_zero() {
318 return Err(ExecutionGuardError::new(
319 LeaseConfigError {
320 message: "execution guard renew_interval must be greater than zero",
321 },
322 ExecutionGuardErrorKind::Data,
323 ));
324 }
325
326 if self.renew_interval >= self.ttl {
327 return Err(ExecutionGuardError::new(
328 LeaseConfigError {
329 message: "execution guard renew_interval must be less than ttl",
330 },
331 ExecutionGuardErrorKind::Data,
332 ));
333 }
334
335 if u64::try_from(self.ttl.as_millis()).is_err() {
336 return Err(ExecutionGuardError::new(
337 LeaseConfigError {
338 message: "execution guard ttl is too large to encode in milliseconds",
339 },
340 ExecutionGuardErrorKind::Data,
341 ));
342 }
343
344 Ok(self)
345 }
346}
347
348fn classify_redis_error(error: &redis::RedisError) -> ExecutionGuardErrorKind {
349 if error.is_connection_dropped()
350 || error.is_connection_refusal()
351 || error.is_timeout()
352 || matches!(
353 error.kind(),
354 ErrorKind::Io
355 | ErrorKind::ClusterConnectionNotFound
356 | ErrorKind::Server(ServerErrorKind::BusyLoading)
357 | ErrorKind::Server(ServerErrorKind::ClusterDown)
358 | ErrorKind::Server(ServerErrorKind::MasterDown)
359 | ErrorKind::Server(ServerErrorKind::TryAgain)
360 )
361 {
362 ExecutionGuardErrorKind::Connection
363 } else {
364 ExecutionGuardErrorKind::Unknown
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::{DEFAULT_KEY_PREFIX, lease_key, next_token};
371 use crate::{ExecutionGuardScope, ExecutionSlot};
372 use chrono::{TimeZone, Utc};
373
374 #[test]
375 fn generated_tokens_are_not_empty() {
376 assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
377 assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
378 }
379
380 #[test]
381 fn lease_key_uses_default_prefix_and_occurrence_time() {
382 let slot = ExecutionSlot::for_occurrence(
383 "job-1",
384 "resource-1",
385 Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 3).unwrap(),
386 );
387
388 assert_eq!(
389 lease_key(DEFAULT_KEY_PREFIX, &slot),
390 "scheduler:valkey:execution-lease:resource-1:occurrence:2026-04-03T01:02:03.000000000Z"
391 );
392 }
393
394 #[test]
395 fn resource_scope_lease_key_uses_resource_lock() {
396 let slot = ExecutionSlot::for_resource("job-1", "resource-1");
397
398 assert_eq!(slot.scope, ExecutionGuardScope::Resource);
399 assert_eq!(
400 lease_key(DEFAULT_KEY_PREFIX, &slot),
401 "scheduler:valkey:execution-lease:resource-1:resource"
402 );
403 }
404}