Skip to main content

scheduler/
valkey_guard.rs

1use crate::error::{ExecutionGuardError, ExecutionGuardErrorKind};
2use crate::valkey_execution_support::{
3    lease_key, next_token, now_millis, occurrence_index_key, resource_lock_key,
4};
5use crate::{
6    ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionGuardScope,
7    ExecutionLease, ExecutionSlot,
8};
9use redis::{Client, ErrorKind, Script, ServerErrorKind, aio::ConnectionManager};
10use std::fmt::{self, Display, Formatter};
11use std::num::TryFromIntError;
12use std::sync::atomic::AtomicU64;
13use std::time::Duration;
14
15const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
16const ACQUIRE_OCCURRENCE_LUA: &str = include_str!("lua/valkey_guard/acquire_occurrence.lua");
17const ACQUIRE_RESOURCE_LUA: &str = include_str!("lua/valkey_guard/acquire_resource.lua");
18const RENEW_OCCURRENCE_LUA: &str = include_str!("lua/valkey_guard/renew_occurrence.lua");
19const RENEW_RESOURCE_LUA: &str = include_str!("lua/valkey_guard/renew_resource.lua");
20const RELEASE_OCCURRENCE_LUA: &str = include_str!("lua/valkey_guard/release_occurrence.lua");
21const RELEASE_RESOURCE_LUA: &str = include_str!("lua/valkey_guard/release_resource.lua");
22
23static TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub struct ValkeyLeaseConfig {
27    pub ttl: Duration,
28    pub renew_interval: Duration,
29}
30
31#[derive(Debug, Clone)]
32pub struct ValkeyExecutionGuard {
33    connection: ConnectionManager,
34    key_prefix: String,
35    lease_config: ValkeyLeaseConfig,
36}
37
38impl ValkeyExecutionGuard {
39    pub async fn new(
40        url: impl AsRef<str>,
41        lease_config: ValkeyLeaseConfig,
42    ) -> Result<Self, ExecutionGuardError> {
43        Self::with_prefix(url, DEFAULT_KEY_PREFIX, lease_config).await
44    }
45
46    pub async fn with_prefix(
47        url: impl AsRef<str>,
48        key_prefix: impl Into<String>,
49        lease_config: ValkeyLeaseConfig,
50    ) -> Result<Self, ExecutionGuardError> {
51        lease_config.validate()?;
52        let client = Client::open(url.as_ref()).map_err(|error| {
53            let kind = classify_redis_error(&error);
54            ExecutionGuardError::new(error, kind)
55        })?;
56        let connection = client.get_connection_manager().await.map_err(|error| {
57            let kind = classify_redis_error(&error);
58            ExecutionGuardError::new(error, kind)
59        })?;
60
61        Ok(Self {
62            connection,
63            key_prefix: key_prefix.into(),
64            lease_config,
65        })
66    }
67
68    fn lease_key(&self, slot: &ExecutionSlot) -> String {
69        lease_key(&self.key_prefix, slot)
70    }
71
72    fn resource_lock_key(&self, resource_id: &str) -> String {
73        resource_lock_key(&self.key_prefix, resource_id)
74    }
75
76    fn occurrence_index_key(&self, resource_id: &str) -> String {
77        occurrence_index_key(&self.key_prefix, resource_id)
78    }
79
80    fn ttl_millis(&self) -> Result<u64, ValkeyExecutionGuardError> {
81        u64::try_from(self.lease_config.ttl.as_millis())
82            .map_err(ValkeyExecutionGuardError::DurationOutOfRange)
83    }
84}
85
86impl ExecutionGuard for ValkeyExecutionGuard {
87    type Error = ValkeyExecutionGuardError;
88
89    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
90        let lease_key = self.lease_key(&slot);
91        let token = next_token(&TOKEN_COUNTER, "lease");
92        let ttl_millis = self.ttl_millis()?;
93        let now_millis = now_millis();
94        let expires_at_millis = now_millis.saturating_add(ttl_millis);
95        let mut connection = self.connection.clone();
96        let acquired = match slot.scope {
97            ExecutionGuardScope::Occurrence => {
98                let resource_lock_key = self.resource_lock_key(&slot.resource_id);
99                let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
100                let acquired: i32 = Script::new(ACQUIRE_OCCURRENCE_LUA)
101                    .key(resource_lock_key)
102                    .key(&lease_key)
103                    .key(occurrence_index_key)
104                    .arg(now_millis)
105                    .arg(&token)
106                    .arg(ttl_millis)
107                    .arg(expires_at_millis)
108                    .invoke_async(&mut connection)
109                    .await
110                    .map_err(ValkeyExecutionGuardError::Redis)?;
111                acquired == 1
112            }
113            ExecutionGuardScope::Resource => {
114                let resource_lock_key = self.resource_lock_key(&slot.resource_id);
115                let occurrence_index_key = self.occurrence_index_key(&slot.resource_id);
116                let acquired: i32 = Script::new(ACQUIRE_RESOURCE_LUA)
117                    .key(&resource_lock_key)
118                    .key(occurrence_index_key)
119                    .arg(now_millis)
120                    .arg(&token)
121                    .arg(ttl_millis)
122                    .invoke_async(&mut connection)
123                    .await
124                    .map_err(ValkeyExecutionGuardError::Redis)?;
125                acquired == 1
126            }
127        };
128
129        Ok(if acquired {
130            ExecutionGuardAcquire::Acquired(ExecutionLease::new(
131                slot.job_id,
132                slot.resource_id,
133                slot.scope,
134                slot.scheduled_at,
135                token,
136                lease_key,
137            ))
138        } else {
139            ExecutionGuardAcquire::Contended
140        })
141    }
142
143    async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
144        let ttl_millis = self.ttl_millis()?;
145        let expires_at_millis = now_millis().saturating_add(ttl_millis);
146        let mut connection = self.connection.clone();
147        let renewed: i32 = match lease.scope {
148            ExecutionGuardScope::Occurrence => Script::new(RENEW_OCCURRENCE_LUA)
149                .key(&lease.lease_key)
150                .key(self.occurrence_index_key(&lease.resource_id))
151                .arg(&lease.token)
152                .arg(ttl_millis)
153                .arg(expires_at_millis)
154                .invoke_async(&mut connection)
155                .await
156                .map_err(ValkeyExecutionGuardError::Redis)?,
157            ExecutionGuardScope::Resource => Script::new(RENEW_RESOURCE_LUA)
158                .key(&lease.lease_key)
159                .arg(&lease.token)
160                .arg(ttl_millis)
161                .invoke_async(&mut connection)
162                .await
163                .map_err(ValkeyExecutionGuardError::Redis)?,
164        };
165
166        Ok(if renewed == 1 {
167            ExecutionGuardRenewal::Renewed
168        } else {
169            ExecutionGuardRenewal::Lost
170        })
171    }
172
173    async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
174        let mut connection = self.connection.clone();
175        let _: i32 = match lease.scope {
176            ExecutionGuardScope::Occurrence => Script::new(RELEASE_OCCURRENCE_LUA)
177                .key(&lease.lease_key)
178                .key(self.occurrence_index_key(&lease.resource_id))
179                .arg(&lease.token)
180                .invoke_async(&mut connection)
181                .await
182                .map_err(ValkeyExecutionGuardError::Redis)?,
183            ExecutionGuardScope::Resource => Script::new(RELEASE_RESOURCE_LUA)
184                .key(&lease.lease_key)
185                .arg(&lease.token)
186                .invoke_async(&mut connection)
187                .await
188                .map_err(ValkeyExecutionGuardError::Redis)?,
189        };
190
191        Ok(())
192    }
193
194    fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
195    where
196        Self: Sized,
197    {
198        match error {
199            ValkeyExecutionGuardError::Config(_)
200            | ValkeyExecutionGuardError::DurationOutOfRange(_) => ExecutionGuardErrorKind::Data,
201            ValkeyExecutionGuardError::Redis(error) => classify_redis_error(error),
202        }
203    }
204
205    fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
206        Some(self.lease_config.renew_interval)
207    }
208}
209
210#[derive(Debug)]
211pub enum ValkeyExecutionGuardError {
212    Redis(redis::RedisError),
213    Config(LeaseConfigError),
214    DurationOutOfRange(TryFromIntError),
215}
216
217impl Display for ValkeyExecutionGuardError {
218    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
219        match self {
220            Self::Redis(error) => write!(f, "{error}"),
221            Self::Config(error) => write!(f, "{error}"),
222            Self::DurationOutOfRange(error) => write!(f, "{error}"),
223        }
224    }
225}
226
227impl std::error::Error for ValkeyExecutionGuardError {
228    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
229        match self {
230            Self::Redis(error) => Some(error),
231            Self::Config(error) => Some(error),
232            Self::DurationOutOfRange(error) => Some(error),
233        }
234    }
235}
236
237#[derive(Debug)]
238pub struct LeaseConfigError {
239    message: &'static str,
240}
241
242impl Display for LeaseConfigError {
243    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
244        f.write_str(self.message)
245    }
246}
247
248impl std::error::Error for LeaseConfigError {}
249
250impl ValkeyLeaseConfig {
251    fn validate(self) -> Result<Self, ExecutionGuardError> {
252        if self.ttl.is_zero() {
253            return Err(ExecutionGuardError::new(
254                LeaseConfigError {
255                    message: "execution guard ttl must be greater than zero",
256                },
257                ExecutionGuardErrorKind::Data,
258            ));
259        }
260
261        if self.renew_interval.is_zero() {
262            return Err(ExecutionGuardError::new(
263                LeaseConfigError {
264                    message: "execution guard renew_interval must be greater than zero",
265                },
266                ExecutionGuardErrorKind::Data,
267            ));
268        }
269
270        if self.renew_interval >= self.ttl {
271            return Err(ExecutionGuardError::new(
272                LeaseConfigError {
273                    message: "execution guard renew_interval must be less than ttl",
274                },
275                ExecutionGuardErrorKind::Data,
276            ));
277        }
278
279        if u64::try_from(self.ttl.as_millis()).is_err() {
280            return Err(ExecutionGuardError::new(
281                LeaseConfigError {
282                    message: "execution guard ttl is too large to encode in milliseconds",
283                },
284                ExecutionGuardErrorKind::Data,
285            ));
286        }
287
288        Ok(self)
289    }
290}
291
292fn classify_redis_error(error: &redis::RedisError) -> ExecutionGuardErrorKind {
293    if error.is_connection_dropped()
294        || error.is_connection_refusal()
295        || error.is_timeout()
296        || matches!(
297            error.kind(),
298            ErrorKind::Io
299                | ErrorKind::ClusterConnectionNotFound
300                | ErrorKind::Server(ServerErrorKind::BusyLoading)
301                | ErrorKind::Server(ServerErrorKind::ClusterDown)
302                | ErrorKind::Server(ServerErrorKind::MasterDown)
303                | ErrorKind::Server(ServerErrorKind::TryAgain)
304        )
305    {
306        ExecutionGuardErrorKind::Connection
307    } else {
308        ExecutionGuardErrorKind::Unknown
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::{DEFAULT_KEY_PREFIX, lease_key, next_token};
315    use crate::{ExecutionGuardScope, ExecutionSlot};
316    use chrono::{TimeZone, Utc};
317
318    #[test]
319    fn generated_tokens_are_not_empty() {
320        assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
321        assert_ne!(next_token(&super::TOKEN_COUNTER, "lease"), "");
322    }
323
324    #[test]
325    fn lease_key_uses_default_prefix_and_occurrence_time() {
326        let slot = ExecutionSlot::for_occurrence(
327            "job-1",
328            "resource-1",
329            Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 3).unwrap(),
330        );
331
332        assert_eq!(
333            lease_key(DEFAULT_KEY_PREFIX, &slot),
334            "scheduler:valkey:execution-lease:resource-1:occurrence:2026-04-03T01:02:03.000000000Z"
335        );
336    }
337
338    #[test]
339    fn resource_scope_lease_key_uses_resource_lock() {
340        let slot = ExecutionSlot::for_resource("job-1", "resource-1");
341
342        assert_eq!(slot.scope, ExecutionGuardScope::Resource);
343        assert_eq!(
344            lease_key(DEFAULT_KEY_PREFIX, &slot),
345            "scheduler:valkey:execution-lease:resource-1:resource"
346        );
347    }
348}