Skip to main content

scheduler/
valkey_guard.rs

1use crate::error::{ExecutionGuardError, ExecutionGuardErrorKind};
2use crate::valkey_execution_support::{
3    lease_key, next_token, now_millis, occurrence_index_key, resource_lock_key,
4};
5use crate::{
6    ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionGuardScope,
7    ExecutionLease, ExecutionSlot,
8};
9use redis::{Client, ErrorKind, Script, ServerErrorKind, aio::ConnectionManager};
10use std::fmt::{self, Display, Formatter};
11use std::num::TryFromIntError;
12use std::sync::atomic::AtomicU64;
13use std::time::Duration;
14
15const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
16
17static TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub struct ValkeyLeaseConfig {
21    pub ttl: Duration,
22    pub renew_interval: Duration,
23}
24
25#[derive(Debug, Clone)]
26pub struct ValkeyExecutionGuard {
27    connection: ConnectionManager,
28    key_prefix: String,
29    lease_config: ValkeyLeaseConfig,
30}
31
32impl ValkeyExecutionGuard {
33    pub async fn new(
34        url: impl AsRef<str>,
35        lease_config: ValkeyLeaseConfig,
36    ) -> Result<Self, ExecutionGuardError> {
37        Self::with_prefix(url, DEFAULT_KEY_PREFIX, lease_config).await
38    }
39
40    pub async fn with_prefix(
41        url: impl AsRef<str>,
42        key_prefix: impl Into<String>,
43        lease_config: ValkeyLeaseConfig,
44    ) -> Result<Self, ExecutionGuardError> {
45        lease_config.validate()?;
46        let client = Client::open(url.as_ref()).map_err(|error| {
47            let kind = classify_redis_error(&error);
48            ExecutionGuardError::new(error, kind)
49        })?;
50        let connection = client.get_connection_manager().await.map_err(|error| {
51            let kind = classify_redis_error(&error);
52            ExecutionGuardError::new(error, kind)
53        })?;
54
55        Ok(Self {
56            connection,
57            key_prefix: key_prefix.into(),
58            lease_config,
59        })
60    }
61
62    fn lease_key(&self, slot: &ExecutionSlot) -> String {
63        lease_key(&self.key_prefix, slot)
64    }
65
66    fn resource_lock_key(&self, resource_id: &str) -> String {
67        resource_lock_key(&self.key_prefix, resource_id)
68    }
69
70    fn occurrence_index_key(&self, resource_id: &str) -> String {
71        occurrence_index_key(&self.key_prefix, resource_id)
72    }
73
74    fn ttl_millis(&self) -> Result<u64, ValkeyExecutionGuardError> {
75        u64::try_from(self.lease_config.ttl.as_millis())
76            .map_err(ValkeyExecutionGuardError::DurationOutOfRange)
77    }
78}
79
80impl ExecutionGuard for ValkeyExecutionGuard {
81    type Error = ValkeyExecutionGuardError;
82
83    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
84        let lease_key = self.lease_key(&slot);
85        let token = next_token(&TOKEN_COUNTER, "lease");
86        let ttl_millis = self.ttl_millis()?;
87        let now_millis = now_millis();
88        let expires_at_millis = now_millis.saturating_add(ttl_millis);
89        let mut connection = self.connection.clone();
90        let acquired = match slot.scope {
91            ExecutionGuardScope::Occurrence => {
92                let resource_lock_key = self.resource_lock_key(&slot.resource_id);
93                let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
94                let acquired: i32 = Script::new(
95                    r"
96                    redis.call('ZREMRANGEBYSCORE', KEYS[3], '-inf', ARGV[1])
97                    if redis.call('EXISTS', KEYS[1]) == 1 then
98                        return 0
99                    end
100                    local ok = redis.call('SET', KEYS[2], ARGV[2], 'NX', 'PX', ARGV[3])
101                    if not ok then
102                        return 0
103                    end
104                    redis.call('ZADD', KEYS[3], ARGV[4], KEYS[2])
105                    return 1
106                    ",
107                )
108                .key(resource_lock_key)
109                .key(&lease_key)
110                .key(occurrence_index_key)
111                .arg(now_millis)
112                .arg(&token)
113                .arg(ttl_millis)
114                .arg(expires_at_millis)
115                .invoke_async(&mut connection)
116                .await
117                .map_err(ValkeyExecutionGuardError::Redis)?;
118                acquired == 1
119            }
120            ExecutionGuardScope::Resource => {
121                let resource_lock_key = self.resource_lock_key(&slot.resource_id);
122                let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
123                let acquired: i32 = Script::new(
124                    r"
125                    redis.call('ZREMRANGEBYSCORE', KEYS[2], '-inf', ARGV[1])
126                    if redis.call('EXISTS', KEYS[1]) == 1 then
127                        return 0
128                    end
129                    if redis.call('ZCARD', KEYS[2]) > 0 then
130                        return 0
131                    end
132                    local ok = redis.call('SET', KEYS[1], ARGV[2], 'NX', 'PX', ARGV[3])
133                    if not ok then
134                        return 0
135                    end
136                    return 1
137                    ",
138                )
139                .key(&resource_lock_key)
140                .key(occurrence_index_key)
141                .arg(now_millis)
142                .arg(&token)
143                .arg(ttl_millis)
144                .invoke_async(&mut connection)
145                .await
146                .map_err(ValkeyExecutionGuardError::Redis)?;
147                acquired == 1
148            }
149        };
150
151        Ok(if acquired {
152            ExecutionGuardAcquire::Acquired(ExecutionLease::new(
153                slot.job_id,
154                slot.resource_id,
155                slot.scope,
156                slot.scheduled_at,
157                token,
158                lease_key,
159            ))
160        } else {
161            ExecutionGuardAcquire::Contended
162        })
163    }
164
165    async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
166        let ttl_millis = self.ttl_millis()?;
167        let expires_at_millis = now_millis().saturating_add(ttl_millis);
168        let mut connection = self.connection.clone();
169        let renewed: i32 = match lease.scope {
170            ExecutionGuardScope::Occurrence => Script::new(
171                r"
172                if redis.call('GET', KEYS[1]) == ARGV[1] then
173                    redis.call('PEXPIRE', KEYS[1], ARGV[2])
174                    redis.call('ZADD', KEYS[2], ARGV[3], KEYS[1])
175                    return 1
176                end
177                redis.call('ZREM', KEYS[2], KEYS[1])
178                return 0
179                ",
180            )
181            .key(&lease.lease_key)
182            .key(self.occurrence_index_key(&lease.resource_id))
183            .arg(&lease.token)
184            .arg(ttl_millis)
185            .arg(expires_at_millis)
186            .invoke_async(&mut connection)
187            .await
188            .map_err(ValkeyExecutionGuardError::Redis)?,
189            ExecutionGuardScope::Resource => Script::new(
190                r"
191                if redis.call('GET', KEYS[1]) == ARGV[1] then
192                    return redis.call('PEXPIRE', KEYS[1], ARGV[2])
193                end
194                return 0
195                ",
196            )
197            .key(&lease.lease_key)
198            .arg(&lease.token)
199            .arg(ttl_millis)
200            .invoke_async(&mut connection)
201            .await
202            .map_err(ValkeyExecutionGuardError::Redis)?,
203        };
204
205        Ok(if renewed == 1 {
206            ExecutionGuardRenewal::Renewed
207        } else {
208            ExecutionGuardRenewal::Lost
209        })
210    }
211
212    async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
213        let mut connection = self.connection.clone();
214        let _: i32 = match lease.scope {
215            ExecutionGuardScope::Occurrence => Script::new(
216                r"
217                if redis.call('GET', KEYS[1]) == ARGV[1] then
218                    redis.call('DEL', KEYS[1])
219                    redis.call('ZREM', KEYS[2], KEYS[1])
220                    return 1
221                end
222                redis.call('ZREM', KEYS[2], KEYS[1])
223                return 0
224                ",
225            )
226            .key(&lease.lease_key)
227            .key(self.occurrence_index_key(&lease.resource_id))
228            .arg(&lease.token)
229            .invoke_async(&mut connection)
230            .await
231            .map_err(ValkeyExecutionGuardError::Redis)?,
232            ExecutionGuardScope::Resource => Script::new(
233                r"
234                if redis.call('GET', KEYS[1]) == ARGV[1] then
235                    return redis.call('DEL', KEYS[1])
236                end
237                return 0
238                ",
239            )
240            .key(&lease.lease_key)
241            .arg(&lease.token)
242            .invoke_async(&mut connection)
243            .await
244            .map_err(ValkeyExecutionGuardError::Redis)?,
245        };
246
247        Ok(())
248    }
249
250    fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
251    where
252        Self: Sized,
253    {
254        match error {
255            ValkeyExecutionGuardError::Config(_)
256            | ValkeyExecutionGuardError::DurationOutOfRange(_) => ExecutionGuardErrorKind::Data,
257            ValkeyExecutionGuardError::Redis(error) => classify_redis_error(error),
258        }
259    }
260
261    fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
262        Some(self.lease_config.renew_interval)
263    }
264}
265
266#[derive(Debug)]
267pub enum ValkeyExecutionGuardError {
268    Redis(redis::RedisError),
269    Config(LeaseConfigError),
270    DurationOutOfRange(TryFromIntError),
271}
272
273impl Display for ValkeyExecutionGuardError {
274    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
275        match self {
276            Self::Redis(error) => write!(f, "{error}"),
277            Self::Config(error) => write!(f, "{error}"),
278            Self::DurationOutOfRange(error) => write!(f, "{error}"),
279        }
280    }
281}
282
283impl std::error::Error for ValkeyExecutionGuardError {
284    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
285        match self {
286            Self::Redis(error) => Some(error),
287            Self::Config(error) => Some(error),
288            Self::DurationOutOfRange(error) => Some(error),
289        }
290    }
291}
292
293#[derive(Debug)]
294pub struct LeaseConfigError {
295    message: &'static str,
296}
297
298impl Display for LeaseConfigError {
299    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
300        f.write_str(self.message)
301    }
302}
303
304impl std::error::Error for LeaseConfigError {}
305
306impl ValkeyLeaseConfig {
307    fn validate(self) -> Result<Self, ExecutionGuardError> {
308        if self.ttl.is_zero() {
309            return Err(ExecutionGuardError::new(
310                LeaseConfigError {
311                    message: "execution guard ttl must be greater than zero",
312                },
313                ExecutionGuardErrorKind::Data,
314            ));
315        }
316
317        if self.renew_interval.is_zero() {
318            return Err(ExecutionGuardError::new(
319                LeaseConfigError {
320                    message: "execution guard renew_interval must be greater than zero",
321                },
322                ExecutionGuardErrorKind::Data,
323            ));
324        }
325
326        if self.renew_interval >= self.ttl {
327            return Err(ExecutionGuardError::new(
328                LeaseConfigError {
329                    message: "execution guard renew_interval must be less than ttl",
330                },
331                ExecutionGuardErrorKind::Data,
332            ));
333        }
334
335        if u64::try_from(self.ttl.as_millis()).is_err() {
336            return Err(ExecutionGuardError::new(
337                LeaseConfigError {
338                    message: "execution guard ttl is too large to encode in milliseconds",
339                },
340                ExecutionGuardErrorKind::Data,
341            ));
342        }
343
344        Ok(self)
345    }
346}
347
348fn classify_redis_error(error: &redis::RedisError) -> ExecutionGuardErrorKind {
349    if error.is_connection_dropped()
350        || error.is_connection_refusal()
351        || error.is_timeout()
352        || matches!(
353            error.kind(),
354            ErrorKind::Io
355                | ErrorKind::ClusterConnectionNotFound
356                | ErrorKind::Server(ServerErrorKind::BusyLoading)
357                | ErrorKind::Server(ServerErrorKind::ClusterDown)
358                | ErrorKind::Server(ServerErrorKind::MasterDown)
359                | ErrorKind::Server(ServerErrorKind::TryAgain)
360        )
361    {
362        ExecutionGuardErrorKind::Connection
363    } else {
364        ExecutionGuardErrorKind::Unknown
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::{DEFAULT_KEY_PREFIX, lease_key, next_token};
371    use crate::{ExecutionGuardScope, ExecutionSlot};
372    use chrono::{TimeZone, Utc};
373
374    #[test]
375    fn generated_tokens_are_not_empty() {
376        assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
377        assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
378    }
379
380    #[test]
381    fn lease_key_uses_default_prefix_and_occurrence_time() {
382        let slot = ExecutionSlot::for_occurrence(
383            "job-1",
384            "resource-1",
385            Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 3).unwrap(),
386        );
387
388        assert_eq!(
389            lease_key(DEFAULT_KEY_PREFIX, &slot),
390            "scheduler:valkey:execution-lease:resource-1:occurrence:2026-04-03T01:02:03.000000000Z"
391        );
392    }
393
394    #[test]
395    fn resource_scope_lease_key_uses_resource_lock() {
396        let slot = ExecutionSlot::for_resource("job-1", "resource-1");
397
398        assert_eq!(slot.scope, ExecutionGuardScope::Resource);
399        assert_eq!(
400            lease_key(DEFAULT_KEY_PREFIX, &slot),
401            "scheduler:valkey:execution-lease:resource-1:resource"
402        );
403    }
404}