1use crate::error::{ExecutionGuardError, ExecutionGuardErrorKind};
2use crate::valkey_execution_support::{
3 lease_key, next_token, now_millis, occurrence_index_key, resource_lock_key,
4};
5use crate::{
6 ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionGuardScope,
7 ExecutionLease, ExecutionSlot,
8};
9use redis::{Client, ErrorKind, Script, ServerErrorKind, aio::ConnectionManager};
10use std::fmt::{self, Display, Formatter};
11use std::num::TryFromIntError;
12use std::sync::atomic::AtomicU64;
13use std::time::Duration;
14
15const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
16const ACQUIRE_OCCURRENCE_LUA: &str = include_str!("lua/valkey_guard/acquire_occurrence.lua");
17const ACQUIRE_RESOURCE_LUA: &str = include_str!("lua/valkey_guard/acquire_resource.lua");
18const RENEW_OCCURRENCE_LUA: &str = include_str!("lua/valkey_guard/renew_occurrence.lua");
19const RENEW_RESOURCE_LUA: &str = include_str!("lua/valkey_guard/renew_resource.lua");
20const RELEASE_OCCURRENCE_LUA: &str = include_str!("lua/valkey_guard/release_occurrence.lua");
21const RELEASE_RESOURCE_LUA: &str = include_str!("lua/valkey_guard/release_resource.lua");
22
23static TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct ValkeyLeaseConfig {
27 pub ttl: Duration,
28 pub renew_interval: Duration,
29}
30
31#[derive(Debug, Clone)]
32pub struct ValkeyExecutionGuard {
33 connection: ConnectionManager,
34 key_prefix: String,
35 lease_config: ValkeyLeaseConfig,
36}
37
38impl ValkeyExecutionGuard {
39 pub async fn new(
40 url: impl AsRef<str>,
41 lease_config: ValkeyLeaseConfig,
42 ) -> Result<Self, ExecutionGuardError> {
43 Self::with_prefix(url, DEFAULT_KEY_PREFIX, lease_config).await
44 }
45
46 pub async fn with_prefix(
47 url: impl AsRef<str>,
48 key_prefix: impl Into<String>,
49 lease_config: ValkeyLeaseConfig,
50 ) -> Result<Self, ExecutionGuardError> {
51 lease_config.validate()?;
52 let client = Client::open(url.as_ref()).map_err(|error| {
53 let kind = classify_redis_error(&error);
54 ExecutionGuardError::new(error, kind)
55 })?;
56 let connection = client.get_connection_manager().await.map_err(|error| {
57 let kind = classify_redis_error(&error);
58 ExecutionGuardError::new(error, kind)
59 })?;
60
61 Ok(Self {
62 connection,
63 key_prefix: key_prefix.into(),
64 lease_config,
65 })
66 }
67
68 fn lease_key(&self, slot: &ExecutionSlot) -> String {
69 lease_key(&self.key_prefix, slot)
70 }
71
72 fn resource_lock_key(&self, resource_id: &str) -> String {
73 resource_lock_key(&self.key_prefix, resource_id)
74 }
75
76 fn occurrence_index_key(&self, resource_id: &str) -> String {
77 occurrence_index_key(&self.key_prefix, resource_id)
78 }
79
80 fn ttl_millis(&self) -> Result<u64, ValkeyExecutionGuardError> {
81 u64::try_from(self.lease_config.ttl.as_millis())
82 .map_err(ValkeyExecutionGuardError::DurationOutOfRange)
83 }
84}
85
86impl ExecutionGuard for ValkeyExecutionGuard {
87 type Error = ValkeyExecutionGuardError;
88
89 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
90 let lease_key = self.lease_key(&slot);
91 let token = next_token(&TOKEN_COUNTER, "lease");
92 let ttl_millis = self.ttl_millis()?;
93 let now_millis = now_millis();
94 let expires_at_millis = now_millis.saturating_add(ttl_millis);
95 let mut connection = self.connection.clone();
96 let acquired = match slot.scope {
97 ExecutionGuardScope::Occurrence => {
98 let resource_lock_key = self.resource_lock_key(&slot.resource_id);
99 let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
100 let acquired: i32 = Script::new(ACQUIRE_OCCURRENCE_LUA)
101 .key(resource_lock_key)
102 .key(&lease_key)
103 .key(occurrence_index_key)
104 .arg(now_millis)
105 .arg(&token)
106 .arg(ttl_millis)
107 .arg(expires_at_millis)
108 .invoke_async(&mut connection)
109 .await
110 .map_err(ValkeyExecutionGuardError::Redis)?;
111 acquired == 1
112 }
113 ExecutionGuardScope::Resource => {
114 let resource_lock_key = self.resource_lock_key(&slot.resource_id);
115 let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
116 let acquired: i32 = Script::new(ACQUIRE_RESOURCE_LUA)
117 .key(&resource_lock_key)
118 .key(occurrence_index_key)
119 .arg(now_millis)
120 .arg(&token)
121 .arg(ttl_millis)
122 .invoke_async(&mut connection)
123 .await
124 .map_err(ValkeyExecutionGuardError::Redis)?;
125 acquired == 1
126 }
127 };
128
129 Ok(if acquired {
130 ExecutionGuardAcquire::Acquired(ExecutionLease::new(
131 slot.job_id,
132 slot.resource_id,
133 slot.scope,
134 slot.scheduled_at,
135 token,
136 lease_key,
137 ))
138 } else {
139 ExecutionGuardAcquire::Contended
140 })
141 }
142
143 async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
144 let ttl_millis = self.ttl_millis()?;
145 let expires_at_millis = now_millis().saturating_add(ttl_millis);
146 let mut connection = self.connection.clone();
147 let renewed: i32 = match lease.scope {
148 ExecutionGuardScope::Occurrence => Script::new(RENEW_OCCURRENCE_LUA)
149 .key(&lease.lease_key)
150 .key(self.occurrence_index_key(&lease.resource_id))
151 .arg(&lease.token)
152 .arg(ttl_millis)
153 .arg(expires_at_millis)
154 .invoke_async(&mut connection)
155 .await
156 .map_err(ValkeyExecutionGuardError::Redis)?,
157 ExecutionGuardScope::Resource => Script::new(RENEW_RESOURCE_LUA)
158 .key(&lease.lease_key)
159 .arg(&lease.token)
160 .arg(ttl_millis)
161 .invoke_async(&mut connection)
162 .await
163 .map_err(ValkeyExecutionGuardError::Redis)?,
164 };
165
166 Ok(if renewed == 1 {
167 ExecutionGuardRenewal::Renewed
168 } else {
169 ExecutionGuardRenewal::Lost
170 })
171 }
172
173 async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
174 let mut connection = self.connection.clone();
175 let _: i32 = match lease.scope {
176 ExecutionGuardScope::Occurrence => Script::new(RELEASE_OCCURRENCE_LUA)
177 .key(&lease.lease_key)
178 .key(self.occurrence_index_key(&lease.resource_id))
179 .arg(&lease.token)
180 .invoke_async(&mut connection)
181 .await
182 .map_err(ValkeyExecutionGuardError::Redis)?,
183 ExecutionGuardScope::Resource => Script::new(RELEASE_RESOURCE_LUA)
184 .key(&lease.lease_key)
185 .arg(&lease.token)
186 .invoke_async(&mut connection)
187 .await
188 .map_err(ValkeyExecutionGuardError::Redis)?,
189 };
190
191 Ok(())
192 }
193
194 fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
195 where
196 Self: Sized,
197 {
198 match error {
199 ValkeyExecutionGuardError::Config(_)
200 | ValkeyExecutionGuardError::DurationOutOfRange(_) => ExecutionGuardErrorKind::Data,
201 ValkeyExecutionGuardError::Redis(error) => classify_redis_error(error),
202 }
203 }
204
205 fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
206 Some(self.lease_config.renew_interval)
207 }
208}
209
210#[derive(Debug)]
211pub enum ValkeyExecutionGuardError {
212 Redis(redis::RedisError),
213 Config(LeaseConfigError),
214 DurationOutOfRange(TryFromIntError),
215}
216
217impl Display for ValkeyExecutionGuardError {
218 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
219 match self {
220 Self::Redis(error) => write!(f, "{error}"),
221 Self::Config(error) => write!(f, "{error}"),
222 Self::DurationOutOfRange(error) => write!(f, "{error}"),
223 }
224 }
225}
226
227impl std::error::Error for ValkeyExecutionGuardError {
228 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
229 match self {
230 Self::Redis(error) => Some(error),
231 Self::Config(error) => Some(error),
232 Self::DurationOutOfRange(error) => Some(error),
233 }
234 }
235}
236
237#[derive(Debug)]
238pub struct LeaseConfigError {
239 message: &'static str,
240}
241
242impl Display for LeaseConfigError {
243 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
244 f.write_str(self.message)
245 }
246}
247
248impl std::error::Error for LeaseConfigError {}
249
250impl ValkeyLeaseConfig {
251 fn validate(self) -> Result<Self, ExecutionGuardError> {
252 if self.ttl.is_zero() {
253 return Err(ExecutionGuardError::new(
254 LeaseConfigError {
255 message: "execution guard ttl must be greater than zero",
256 },
257 ExecutionGuardErrorKind::Data,
258 ));
259 }
260
261 if self.renew_interval.is_zero() {
262 return Err(ExecutionGuardError::new(
263 LeaseConfigError {
264 message: "execution guard renew_interval must be greater than zero",
265 },
266 ExecutionGuardErrorKind::Data,
267 ));
268 }
269
270 if self.renew_interval >= self.ttl {
271 return Err(ExecutionGuardError::new(
272 LeaseConfigError {
273 message: "execution guard renew_interval must be less than ttl",
274 },
275 ExecutionGuardErrorKind::Data,
276 ));
277 }
278
279 if u64::try_from(self.ttl.as_millis()).is_err() {
280 return Err(ExecutionGuardError::new(
281 LeaseConfigError {
282 message: "execution guard ttl is too large to encode in milliseconds",
283 },
284 ExecutionGuardErrorKind::Data,
285 ));
286 }
287
288 Ok(self)
289 }
290}
291
292fn classify_redis_error(error: &redis::RedisError) -> ExecutionGuardErrorKind {
293 if error.is_connection_dropped()
294 || error.is_connection_refusal()
295 || error.is_timeout()
296 || matches!(
297 error.kind(),
298 ErrorKind::Io
299 | ErrorKind::ClusterConnectionNotFound
300 | ErrorKind::Server(ServerErrorKind::BusyLoading)
301 | ErrorKind::Server(ServerErrorKind::ClusterDown)
302 | ErrorKind::Server(ServerErrorKind::MasterDown)
303 | ErrorKind::Server(ServerErrorKind::TryAgain)
304 )
305 {
306 ExecutionGuardErrorKind::Connection
307 } else {
308 ExecutionGuardErrorKind::Unknown
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::{DEFAULT_KEY_PREFIX, lease_key, next_token};
315 use crate::{ExecutionGuardScope, ExecutionSlot};
316 use chrono::{TimeZone, Utc};
317
318 #[test]
319 fn generated_tokens_are_not_empty() {
320 assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
321 assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
322 }
323
324 #[test]
325 fn lease_key_uses_default_prefix_and_occurrence_time() {
326 let slot = ExecutionSlot::for_occurrence(
327 "job-1",
328 "resource-1",
329 Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 3).unwrap(),
330 );
331
332 assert_eq!(
333 lease_key(DEFAULT_KEY_PREFIX, &slot),
334 "scheduler:valkey:execution-lease:resource-1:occurrence:2026-04-03T01:02:03.000000000Z"
335 );
336 }
337
338 #[test]
339 fn resource_scope_lease_key_uses_resource_lock() {
340 let slot = ExecutionSlot::for_resource("job-1", "resource-1");
341
342 assert_eq!(slot.scope, ExecutionGuardScope::Resource);
343 assert_eq!(
344 lease_key(DEFAULT_KEY_PREFIX, &slot),
345 "scheduler:valkey:execution-lease:resource-1:resource"
346 );
347 }
348}