1use crate::error::{ExecutionGuardError, ExecutionGuardErrorKind};
2use crate::{
3 ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionLease, ExecutionSlot,
4};
5use chrono::SecondsFormat;
6use redis::{Client, ErrorKind, Script, ServerErrorKind, aio::ConnectionManager, cmd};
7use std::fmt::{self, Display, Formatter};
8use std::num::TryFromIntError;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
13
14static TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ValkeyLeaseConfig {
18 pub ttl: Duration,
19 pub renew_interval: Duration,
20}
21
22#[derive(Debug, Clone)]
23pub struct ValkeyExecutionGuard {
24 connection: ConnectionManager,
25 key_prefix: String,
26 lease_config: ValkeyLeaseConfig,
27}
28
29impl ValkeyExecutionGuard {
30 pub async fn new(
31 url: impl AsRef<str>,
32 lease_config: ValkeyLeaseConfig,
33 ) -> Result<Self, ExecutionGuardError> {
34 Self::with_prefix(url, DEFAULT_KEY_PREFIX, lease_config).await
35 }
36
37 pub async fn with_prefix(
38 url: impl AsRef<str>,
39 key_prefix: impl Into<String>,
40 lease_config: ValkeyLeaseConfig,
41 ) -> Result<Self, ExecutionGuardError> {
42 lease_config.validate()?;
43 let client = Client::open(url.as_ref()).map_err(|error| {
44 let kind = classify_redis_error(&error);
45 ExecutionGuardError::new(error, kind)
46 })?;
47 let connection = client.get_connection_manager().await.map_err(|error| {
48 let kind = classify_redis_error(&error);
49 ExecutionGuardError::new(error, kind)
50 })?;
51
52 Ok(Self {
53 connection,
54 key_prefix: key_prefix.into(),
55 lease_config,
56 })
57 }
58
59 fn lease_key(&self, slot: &ExecutionSlot) -> String {
60 lease_key(&self.key_prefix, slot)
61 }
62
63 fn ttl_millis(&self) -> Result<u64, ValkeyExecutionGuardError> {
64 u64::try_from(self.lease_config.ttl.as_millis())
65 .map_err(ValkeyExecutionGuardError::DurationOutOfRange)
66 }
67}
68
69impl ExecutionGuard for ValkeyExecutionGuard {
70 type Error = ValkeyExecutionGuardError;
71
72 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
73 let lease_key = self.lease_key(&slot);
74 let token = next_token();
75 let ttl_millis = self.ttl_millis()?;
76 let mut connection = self.connection.clone();
77 let response: Option<String> = cmd("SET")
78 .arg(&lease_key)
79 .arg(&token)
80 .arg("NX")
81 .arg("PX")
82 .arg(ttl_millis)
83 .query_async(&mut connection)
84 .await
85 .map_err(ValkeyExecutionGuardError::Redis)?;
86
87 Ok(match response {
88 Some(_) => ExecutionGuardAcquire::Acquired(ExecutionLease::new(
89 slot.job_id,
90 slot.scheduled_at,
91 token,
92 lease_key,
93 )),
94 None => ExecutionGuardAcquire::Contended,
95 })
96 }
97
98 async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
99 let ttl_millis = self.ttl_millis()?;
100 let mut connection = self.connection.clone();
101 let renewed: i32 = Script::new(
102 r"
103 if redis.call('GET', KEYS[1]) == ARGV[1] then
104 return redis.call('PEXPIRE', KEYS[1], ARGV[2])
105 end
106 return 0
107 ",
108 )
109 .key(&lease.lease_key)
110 .arg(&lease.token)
111 .arg(ttl_millis)
112 .invoke_async(&mut connection)
113 .await
114 .map_err(ValkeyExecutionGuardError::Redis)?;
115
116 Ok(if renewed == 1 {
117 ExecutionGuardRenewal::Renewed
118 } else {
119 ExecutionGuardRenewal::Lost
120 })
121 }
122
123 async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
124 let mut connection = self.connection.clone();
125 let _: i32 = Script::new(
126 r"
127 if redis.call('GET', KEYS[1]) == ARGV[1] then
128 return redis.call('DEL', KEYS[1])
129 end
130 return 0
131 ",
132 )
133 .key(&lease.lease_key)
134 .arg(&lease.token)
135 .invoke_async(&mut connection)
136 .await
137 .map_err(ValkeyExecutionGuardError::Redis)?;
138
139 Ok(())
140 }
141
142 fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
143 where
144 Self: Sized,
145 {
146 match error {
147 ValkeyExecutionGuardError::Config(_) | ValkeyExecutionGuardError::DurationOutOfRange(_) => {
148 ExecutionGuardErrorKind::Data
149 }
150 ValkeyExecutionGuardError::Redis(error) => classify_redis_error(error),
151 }
152 }
153
154 fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
155 Some(self.lease_config.renew_interval)
156 }
157}
158
159#[derive(Debug)]
160pub enum ValkeyExecutionGuardError {
161 Redis(redis::RedisError),
162 Config(LeaseConfigError),
163 DurationOutOfRange(TryFromIntError),
164}
165
166impl Display for ValkeyExecutionGuardError {
167 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
168 match self {
169 Self::Redis(error) => write!(f, "{error}"),
170 Self::Config(error) => write!(f, "{error}"),
171 Self::DurationOutOfRange(error) => write!(f, "{error}"),
172 }
173 }
174}
175
176impl std::error::Error for ValkeyExecutionGuardError {
177 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
178 match self {
179 Self::Redis(error) => Some(error),
180 Self::Config(error) => Some(error),
181 Self::DurationOutOfRange(error) => Some(error),
182 }
183 }
184}
185
186#[derive(Debug)]
187pub struct LeaseConfigError {
188 message: &'static str,
189}
190
191impl Display for LeaseConfigError {
192 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
193 f.write_str(self.message)
194 }
195}
196
197impl std::error::Error for LeaseConfigError {}
198
199impl ValkeyLeaseConfig {
200 fn validate(self) -> Result<Self, ExecutionGuardError> {
201 if self.ttl.is_zero() {
202 return Err(ExecutionGuardError::new(
203 LeaseConfigError {
204 message: "execution guard ttl must be greater than zero",
205 },
206 ExecutionGuardErrorKind::Data,
207 ));
208 }
209
210 if self.renew_interval.is_zero() {
211 return Err(ExecutionGuardError::new(
212 LeaseConfigError {
213 message: "execution guard renew_interval must be greater than zero",
214 },
215 ExecutionGuardErrorKind::Data,
216 ));
217 }
218
219 if self.renew_interval >= self.ttl {
220 return Err(ExecutionGuardError::new(
221 LeaseConfigError {
222 message: "execution guard renew_interval must be less than ttl",
223 },
224 ExecutionGuardErrorKind::Data,
225 ));
226 }
227
228 if u64::try_from(self.ttl.as_millis()).is_err() {
229 return Err(ExecutionGuardError::new(
230 LeaseConfigError {
231 message: "execution guard ttl is too large to encode in milliseconds",
232 },
233 ExecutionGuardErrorKind::Data,
234 ));
235 }
236
237 Ok(self)
238 }
239}
240
241fn lease_key(prefix: &str, slot: &ExecutionSlot) -> String {
242 format!(
243 "{}{}:{}",
244 prefix,
245 slot.job_id,
246 slot.scheduled_at
247 .to_rfc3339_opts(SecondsFormat::Nanos, true)
248 )
249}
250
251fn next_token() -> String {
252 let now = SystemTime::now()
253 .duration_since(UNIX_EPOCH)
254 .unwrap_or_default()
255 .as_nanos();
256 let counter = TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed);
257 format!("lease-{now}-{counter}")
258}
259
260fn classify_redis_error(error: &redis::RedisError) -> ExecutionGuardErrorKind {
261 if error.is_connection_dropped()
262 || error.is_connection_refusal()
263 || error.is_timeout()
264 || matches!(
265 error.kind(),
266 ErrorKind::Io
267 | ErrorKind::ClusterConnectionNotFound
268 | ErrorKind::Server(ServerErrorKind::BusyLoading)
269 | ErrorKind::Server(ServerErrorKind::ClusterDown)
270 | ErrorKind::Server(ServerErrorKind::MasterDown)
271 | ErrorKind::Server(ServerErrorKind::TryAgain)
272 )
273 {
274 ExecutionGuardErrorKind::Connection
275 } else {
276 ExecutionGuardErrorKind::Unknown
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::{DEFAULT_KEY_PREFIX, lease_key, next_token};
283 use crate::ExecutionSlot;
284 use chrono::{TimeZone, Utc};
285
286 #[test]
287 fn generated_tokens_are_not_empty() {
288 assert_ne!(next_token(), "");
289 assert_ne!(next_token(), "");
290 }
291
292 #[test]
293 fn lease_key_uses_default_prefix_and_occurrence_time() {
294 let slot = ExecutionSlot::new(
295 "job-1",
296 Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 3).unwrap(),
297 );
298
299 assert_eq!(
300 lease_key(DEFAULT_KEY_PREFIX, &slot),
301 "scheduler:valkey:execution-lease:job-1:2026-04-03T01:02:03.000000000Z"
302 );
303 }
304}