Skip to main content

scheduler/
valkey_guard.rs

1use crate::error::{ExecutionGuardError, ExecutionGuardErrorKind};
2use crate::{
3    ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionLease, ExecutionSlot,
4};
5use chrono::SecondsFormat;
6use redis::{Client, ErrorKind, Script, ServerErrorKind, aio::ConnectionManager, cmd};
7use std::fmt::{self, Display, Formatter};
8use std::num::TryFromIntError;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12const DEFAULT_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
13
14static TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub struct ValkeyLeaseConfig {
18    pub ttl: Duration,
19    pub renew_interval: Duration,
20}
21
22#[derive(Debug, Clone)]
23pub struct ValkeyExecutionGuard {
24    connection: ConnectionManager,
25    key_prefix: String,
26    lease_config: ValkeyLeaseConfig,
27}
28
29impl ValkeyExecutionGuard {
30    pub async fn new(
31        url: impl AsRef<str>,
32        lease_config: ValkeyLeaseConfig,
33    ) -> Result<Self, ExecutionGuardError> {
34        Self::with_prefix(url, DEFAULT_KEY_PREFIX, lease_config).await
35    }
36
37    pub async fn with_prefix(
38        url: impl AsRef<str>,
39        key_prefix: impl Into<String>,
40        lease_config: ValkeyLeaseConfig,
41    ) -> Result<Self, ExecutionGuardError> {
42        lease_config.validate()?;
43        let client = Client::open(url.as_ref()).map_err(|error| {
44            let kind = classify_redis_error(&error);
45            ExecutionGuardError::new(error, kind)
46        })?;
47        let connection = client.get_connection_manager().await.map_err(|error| {
48            let kind = classify_redis_error(&error);
49            ExecutionGuardError::new(error, kind)
50        })?;
51
52        Ok(Self {
53            connection,
54            key_prefix: key_prefix.into(),
55            lease_config,
56        })
57    }
58
59    fn lease_key(&self, slot: &ExecutionSlot) -> String {
60        lease_key(&self.key_prefix, slot)
61    }
62
63    fn ttl_millis(&self) -> Result<u64, ValkeyExecutionGuardError> {
64        u64::try_from(self.lease_config.ttl.as_millis())
65            .map_err(ValkeyExecutionGuardError::DurationOutOfRange)
66    }
67}
68
69impl ExecutionGuard for ValkeyExecutionGuard {
70    type Error = ValkeyExecutionGuardError;
71
72    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
73        let lease_key = self.lease_key(&slot);
74        let token = next_token();
75        let ttl_millis = self.ttl_millis()?;
76        let mut connection = self.connection.clone();
77        let response: Option<String> = cmd("SET")
78            .arg(&lease_key)
79            .arg(&token)
80            .arg("NX")
81            .arg("PX")
82            .arg(ttl_millis)
83            .query_async(&mut connection)
84            .await
85            .map_err(ValkeyExecutionGuardError::Redis)?;
86
87        Ok(match response {
88            Some(_) => ExecutionGuardAcquire::Acquired(ExecutionLease::new(
89                slot.job_id,
90                slot.scheduled_at,
91                token,
92                lease_key,
93            )),
94            None => ExecutionGuardAcquire::Contended,
95        })
96    }
97
98    async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
99        let ttl_millis = self.ttl_millis()?;
100        let mut connection = self.connection.clone();
101        let renewed: i32 = Script::new(
102            r"
103            if redis.call('GET', KEYS[1]) == ARGV[1] then
104                return redis.call('PEXPIRE', KEYS[1], ARGV[2])
105            end
106            return 0
107            ",
108        )
109        .key(&lease.lease_key)
110        .arg(&lease.token)
111        .arg(ttl_millis)
112        .invoke_async(&mut connection)
113        .await
114        .map_err(ValkeyExecutionGuardError::Redis)?;
115
116        Ok(if renewed == 1 {
117            ExecutionGuardRenewal::Renewed
118        } else {
119            ExecutionGuardRenewal::Lost
120        })
121    }
122
123    async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
124        let mut connection = self.connection.clone();
125        let _: i32 = Script::new(
126            r"
127            if redis.call('GET', KEYS[1]) == ARGV[1] then
128                return redis.call('DEL', KEYS[1])
129            end
130            return 0
131            ",
132        )
133        .key(&lease.lease_key)
134        .arg(&lease.token)
135        .invoke_async(&mut connection)
136        .await
137        .map_err(ValkeyExecutionGuardError::Redis)?;
138
139        Ok(())
140    }
141
142    fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
143    where
144        Self: Sized,
145    {
146        match error {
147            ValkeyExecutionGuardError::Config(_) | ValkeyExecutionGuardError::DurationOutOfRange(_) => {
148                ExecutionGuardErrorKind::Data
149            }
150            ValkeyExecutionGuardError::Redis(error) => classify_redis_error(error),
151        }
152    }
153
154    fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
155        Some(self.lease_config.renew_interval)
156    }
157}
158
159#[derive(Debug)]
160pub enum ValkeyExecutionGuardError {
161    Redis(redis::RedisError),
162    Config(LeaseConfigError),
163    DurationOutOfRange(TryFromIntError),
164}
165
166impl Display for ValkeyExecutionGuardError {
167    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
168        match self {
169            Self::Redis(error) => write!(f, "{error}"),
170            Self::Config(error) => write!(f, "{error}"),
171            Self::DurationOutOfRange(error) => write!(f, "{error}"),
172        }
173    }
174}
175
176impl std::error::Error for ValkeyExecutionGuardError {
177    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
178        match self {
179            Self::Redis(error) => Some(error),
180            Self::Config(error) => Some(error),
181            Self::DurationOutOfRange(error) => Some(error),
182        }
183    }
184}
185
186#[derive(Debug)]
187pub struct LeaseConfigError {
188    message: &'static str,
189}
190
191impl Display for LeaseConfigError {
192    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
193        f.write_str(self.message)
194    }
195}
196
197impl std::error::Error for LeaseConfigError {}
198
199impl ValkeyLeaseConfig {
200    fn validate(self) -> Result<Self, ExecutionGuardError> {
201        if self.ttl.is_zero() {
202            return Err(ExecutionGuardError::new(
203                LeaseConfigError {
204                    message: "execution guard ttl must be greater than zero",
205                },
206                ExecutionGuardErrorKind::Data,
207            ));
208        }
209
210        if self.renew_interval.is_zero() {
211            return Err(ExecutionGuardError::new(
212                LeaseConfigError {
213                    message: "execution guard renew_interval must be greater than zero",
214                },
215                ExecutionGuardErrorKind::Data,
216            ));
217        }
218
219        if self.renew_interval >= self.ttl {
220            return Err(ExecutionGuardError::new(
221                LeaseConfigError {
222                    message: "execution guard renew_interval must be less than ttl",
223                },
224                ExecutionGuardErrorKind::Data,
225            ));
226        }
227
228        if u64::try_from(self.ttl.as_millis()).is_err() {
229            return Err(ExecutionGuardError::new(
230                LeaseConfigError {
231                    message: "execution guard ttl is too large to encode in milliseconds",
232                },
233                ExecutionGuardErrorKind::Data,
234            ));
235        }
236
237        Ok(self)
238    }
239}
240
241fn lease_key(prefix: &str, slot: &ExecutionSlot) -> String {
242    format!(
243        "{}{}:{}",
244        prefix,
245        slot.job_id,
246        slot.scheduled_at
247            .to_rfc3339_opts(SecondsFormat::Nanos, true)
248    )
249}
250
251fn next_token() -> String {
252    let now = SystemTime::now()
253        .duration_since(UNIX_EPOCH)
254        .unwrap_or_default()
255        .as_nanos();
256    let counter = TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed);
257    format!("lease-{now}-{counter}")
258}
259
260fn classify_redis_error(error: &redis::RedisError) -> ExecutionGuardErrorKind {
261    if error.is_connection_dropped()
262        || error.is_connection_refusal()
263        || error.is_timeout()
264        || matches!(
265            error.kind(),
266            ErrorKind::Io
267                | ErrorKind::ClusterConnectionNotFound
268                | ErrorKind::Server(ServerErrorKind::BusyLoading)
269                | ErrorKind::Server(ServerErrorKind::ClusterDown)
270                | ErrorKind::Server(ServerErrorKind::MasterDown)
271                | ErrorKind::Server(ServerErrorKind::TryAgain)
272        )
273    {
274        ExecutionGuardErrorKind::Connection
275    } else {
276        ExecutionGuardErrorKind::Unknown
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::{DEFAULT_KEY_PREFIX, lease_key, next_token};
283    use crate::ExecutionSlot;
284    use chrono::{TimeZone, Utc};
285
286    #[test]
287    fn generated_tokens_are_not_empty() {
288        assert_ne!(next_token(), "");
289        assert_ne!(next_token(), "");
290    }
291
292    #[test]
293    fn lease_key_uses_default_prefix_and_occurrence_time() {
294        let slot = ExecutionSlot::new(
295            "job-1",
296            Utc.with_ymd_and_hms(2026, 4, 3, 1, 2, 3).unwrap(),
297        );
298
299        assert_eq!(
300            lease_key(DEFAULT_KEY_PREFIX, &slot),
301            "scheduler:valkey:execution-lease:job-1:2026-04-03T01:02:03.000000000Z"
302        );
303    }
304}