Skip to main content

scheduler/
valkey_coordinated_store.rs

1use crate::coordinated_store::{
2    CoordinatedClaim, CoordinatedLeaseConfig, CoordinatedPendingTrigger, CoordinatedRuntimeState,
3    CoordinatedStateStore,
4};
5use crate::error::{ExecutionGuardErrorKind, StoreErrorKind};
6use crate::execution_guard::{ExecutionGuardRenewal, ExecutionGuardScope, ExecutionLease};
7use crate::model::JobState;
8use crate::valkey_execution_support::{
9    next_token, now_millis, occurrence_index_key, occurrence_lease_key, resource_lock_key,
10};
11use crate::valkey_store::ValkeyStoreError;
12use chrono::SecondsFormat;
13use chrono::{DateTime, Utc};
14use redis::{AsyncCommands, Client, Script, aio::ConnectionManager, cmd};
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17
18const DEFAULT_STATE_KEY_PREFIX: &str = "scheduler:valkey:job-state:";
19const LEGACY_DEFAULT_STATE_KEY_PREFIX: &str = "scheduler:job-state:";
20const DEFAULT_EXECUTION_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
21
22const FIELD_VERSION: &str = "version";
23const FIELD_STATE: &str = "state";
24const FIELD_PAUSED: &str = "paused";
25const FIELD_INFLIGHT_SCHEDULED_AT: &str = "inflight_scheduled_at";
26const FIELD_INFLIGHT_CATCH_UP: &str = "inflight_catch_up";
27const FIELD_INFLIGHT_TRIGGER_COUNT: &str = "inflight_trigger_count";
28const FIELD_INFLIGHT_RESOURCE_ID: &str = "inflight_resource_id";
29const FIELD_INFLIGHT_SCOPE: &str = "inflight_scope";
30const FIELD_INFLIGHT_TOKEN: &str = "inflight_token";
31const FIELD_INFLIGHT_LEASE_KEY: &str = "inflight_lease_key";
32const FIELD_INFLIGHT_LEASE_EXPIRES_AT: &str = "inflight_lease_expires_at";
33
34static COORDINATED_TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
35
36#[derive(Debug, Clone)]
37pub struct ValkeyCoordinatedStateStore {
38    connection: ConnectionManager,
39    state_key_prefix: String,
40    execution_key_prefix: String,
41}
42
43impl ValkeyCoordinatedStateStore {
44    pub async fn new(url: impl AsRef<str>) -> Result<Self, redis::RedisError> {
45        Self::with_prefixes(url, DEFAULT_STATE_KEY_PREFIX, DEFAULT_EXECUTION_KEY_PREFIX).await
46    }
47
48    pub async fn with_prefixes(
49        url: impl AsRef<str>,
50        state_key_prefix: impl Into<String>,
51        execution_key_prefix: impl Into<String>,
52    ) -> Result<Self, redis::RedisError> {
53        let client = Client::open(url.as_ref())?;
54        let connection = client.get_connection_manager().await?;
55        Ok(Self {
56            connection,
57            state_key_prefix: state_key_prefix.into(),
58            execution_key_prefix: execution_key_prefix.into(),
59        })
60    }
61
62    fn state_key(&self, job_id: &str) -> String {
63        format!("{}{}", self.state_key_prefix, job_id)
64    }
65
66    fn legacy_state_key(&self, job_id: &str) -> Option<String> {
67        if self.state_key_prefix == DEFAULT_STATE_KEY_PREFIX {
68            Some(format!("{LEGACY_DEFAULT_STATE_KEY_PREFIX}{job_id}"))
69        } else {
70            None
71        }
72    }
73
74    fn resource_lock_key(&self, resource_id: &str) -> String {
75        resource_lock_key(&self.execution_key_prefix, resource_id)
76    }
77
78    fn occurrence_index_key(&self, resource_id: &str) -> String {
79        occurrence_index_key(&self.execution_key_prefix, resource_id)
80    }
81
82    fn occurrence_lease_key(&self, resource_id: &str, scheduled_at: DateTime<Utc>) -> String {
83        occurrence_lease_key(&self.execution_key_prefix, resource_id, scheduled_at)
84    }
85
86    async fn key_type(&self, key: &str) -> Result<String, ValkeyStoreError> {
87        let mut connection = self.connection.clone();
88        cmd("TYPE")
89            .arg(key)
90            .query_async(&mut connection)
91            .await
92            .map_err(ValkeyStoreError::from)
93    }
94
95    async fn load_hash(
96        &self,
97        key: &str,
98    ) -> Result<Option<CoordinatedRuntimeState>, ValkeyStoreError> {
99        let mut connection = self.connection.clone();
100        let fields: HashMap<String, String> = connection
101            .hgetall(key)
102            .await
103            .map_err(ValkeyStoreError::from)?;
104        if fields.is_empty() {
105            return Ok(None);
106        }
107
108        Ok(Some(parse_runtime_state(&fields)?))
109    }
110
111    async fn migrate_string_state(
112        &self,
113        key: &str,
114        payload: String,
115    ) -> Result<CoordinatedRuntimeState, ValkeyStoreError> {
116        let state: JobState = serde_json::from_str(&payload).map_err(ValkeyStoreError::from)?;
117        let runtime = CoordinatedRuntimeState {
118            state,
119            revision: 0,
120            paused: false,
121        };
122        self.write_runtime(key, &runtime).await?;
123        Ok(runtime)
124    }
125
126    async fn write_runtime(
127        &self,
128        key: &str,
129        runtime: &CoordinatedRuntimeState,
130    ) -> Result<(), ValkeyStoreError> {
131        let mut connection = self.connection.clone();
132        let payload = serde_json::to_string(&runtime.state).map_err(ValkeyStoreError::from)?;
133        let _: () = cmd("DEL")
134            .arg(key)
135            .query_async(&mut connection)
136            .await
137            .map_err(ValkeyStoreError::from)?;
138        let _: () = cmd("HSET")
139            .arg(key)
140            .arg(FIELD_VERSION)
141            .arg(runtime.revision)
142            .arg(FIELD_STATE)
143            .arg(payload)
144            .arg(FIELD_PAUSED)
145            .arg(if runtime.paused { "1" } else { "0" })
146            .query_async(&mut connection)
147            .await
148            .map_err(ValkeyStoreError::from)?;
149        Ok(())
150    }
151
152    async fn load_payload_state(&self, key: &str) -> Result<Option<String>, ValkeyStoreError> {
153        let mut connection = self.connection.clone();
154        connection.get(key).await.map_err(ValkeyStoreError::from)
155    }
156}
157
158impl CoordinatedStateStore for ValkeyCoordinatedStateStore {
159    type Error = ValkeyStoreError;
160
161    async fn load_or_initialize(
162        &self,
163        job_id: &str,
164        initial_state: JobState,
165    ) -> Result<CoordinatedRuntimeState, Self::Error> {
166        let key = self.state_key(job_id);
167        match self.key_type(&key).await?.as_str() {
168            "hash" => {
169                if let Some(runtime) = self.load_hash(&key).await? {
170                    return Ok(runtime);
171                }
172            }
173            "string" => {
174                if let Some(payload) = self.load_payload_state(&key).await? {
175                    return self.migrate_string_state(&key, payload).await;
176                }
177            }
178            "none" => {}
179            _ => {}
180        }
181
182        if let Some(legacy_key) = self.legacy_state_key(job_id) {
183            if self.key_type(&legacy_key).await?.as_str() == "string" {
184                if let Some(payload) = self.load_payload_state(&legacy_key).await? {
185                    let runtime = self.migrate_string_state(&key, payload).await?;
186                    let mut connection = self.connection.clone();
187                    let _: () = cmd("DEL")
188                        .arg(legacy_key)
189                        .query_async(&mut connection)
190                        .await
191                        .map_err(ValkeyStoreError::from)?;
192                    return Ok(runtime);
193                }
194            }
195        }
196
197        let runtime = CoordinatedRuntimeState {
198            state: initial_state,
199            revision: 0,
200            paused: false,
201        };
202        self.write_runtime(&key, &runtime).await?;
203        Ok(runtime)
204    }
205
206    async fn save_state(
207        &self,
208        job_id: &str,
209        revision: u64,
210        state: &JobState,
211    ) -> Result<bool, Self::Error> {
212        let key = self.state_key(job_id);
213        let payload = serde_json::to_string(state).map_err(ValkeyStoreError::from)?;
214        let mut connection = self.connection.clone();
215        let updated: i32 = Script::new(
216            r"
217            local version = tonumber(redis.call('HGET', KEYS[1], ARGV[1]) or '-1')
218            local inflight = redis.call('HGET', KEYS[1], ARGV[3])
219            if inflight then
220                return 0
221            end
222            if version ~= tonumber(ARGV[2]) then
223                return 0
224            end
225            redis.call('HSET', KEYS[1], ARGV[1], version + 1, ARGV[4], ARGV[5])
226            return 1
227            ",
228        )
229        .key(key)
230        .arg(FIELD_VERSION)
231        .arg(revision)
232        .arg(FIELD_INFLIGHT_TOKEN)
233        .arg(FIELD_STATE)
234        .arg(payload)
235        .invoke_async(&mut connection)
236        .await
237        .map_err(ValkeyStoreError::from)?;
238        Ok(updated == 1)
239    }
240
241    async fn reclaim_inflight(
242        &self,
243        job_id: &str,
244        resource_id: &str,
245        lease_config: CoordinatedLeaseConfig,
246    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
247        let key = self.state_key(job_id);
248        let lease_key = self.occurrence_lease_key(resource_id, Utc::now());
249        let token = next_token(&COORDINATED_TOKEN_COUNTER, "coord");
250        let ttl_millis = u64::try_from(lease_config.ttl.as_millis()).unwrap_or(u64::MAX);
251        let now_millis = now_millis();
252        let expires_at_millis = now_millis.saturating_add(ttl_millis);
253        let mut connection = self.connection.clone();
254        let result: Option<Vec<String>> = Script::new(
255            r"
256            local scheduled_at = redis.call('HGET', KEYS[1], ARGV[1])
257            local catch_up = redis.call('HGET', KEYS[1], ARGV[2])
258            local trigger_count = redis.call('HGET', KEYS[1], ARGV[3])
259            local inflight_resource_id = redis.call('HGET', KEYS[1], ARGV[4])
260            local inflight_scope = redis.call('HGET', KEYS[1], ARGV[5])
261            local inflight_expires_at = tonumber(redis.call('HGET', KEYS[1], ARGV[6]) or '0')
262            local state_payload = redis.call('HGET', KEYS[1], ARGV[7])
263            local version = tonumber(redis.call('HGET', KEYS[1], ARGV[8]) or '0')
264            local paused = redis.call('HGET', KEYS[1], ARGV[9])
265
266            if not scheduled_at or not inflight_resource_id or not inflight_scope then
267                return nil
268            end
269            if paused == '1' or paused == 'true' then
270                return nil
271            end
272            if inflight_expires_at > tonumber(ARGV[9]) then
273                return nil
274            end
275            redis.call('ZREMRANGEBYSCORE', KEYS[4], '-inf', ARGV[9])
276            if redis.call('EXISTS', KEYS[2]) == 1 then
277                return nil
278            end
279            local new_lease_key = ARGV[10] .. scheduled_at
280            local ok = redis.call('SET', new_lease_key, ARGV[11], 'NX', 'PX', ARGV[12])
281            if not ok then
282                return nil
283            end
284            redis.call('ZADD', KEYS[4], ARGV[13], new_lease_key)
285            redis.call('HSET', KEYS[1],
286                ARGV[6], ARGV[13],
287                ARGV[14], ARGV[11],
288                ARGV[15], new_lease_key,
289                ARGV[8], version + 1
290            )
291            return { tostring(version + 1), state_payload, scheduled_at, catch_up, trigger_count, inflight_scope, new_lease_key, ARGV[11] }
292            ",
293        )
294        .key(key)
295        .key(self.resource_lock_key(resource_id))
296        .key(lease_key.clone())
297        .key(self.occurrence_index_key(resource_id))
298        .arg(FIELD_INFLIGHT_SCHEDULED_AT)
299        .arg(FIELD_INFLIGHT_CATCH_UP)
300        .arg(FIELD_INFLIGHT_TRIGGER_COUNT)
301        .arg(FIELD_INFLIGHT_RESOURCE_ID)
302        .arg(FIELD_INFLIGHT_SCOPE)
303        .arg(FIELD_INFLIGHT_LEASE_EXPIRES_AT)
304        .arg(FIELD_STATE)
305        .arg(FIELD_VERSION)
306        .arg(FIELD_PAUSED)
307        .arg(now_millis)
308        .arg(format!("{}{}:occurrence:", self.execution_key_prefix, resource_id))
309        .arg(&token)
310        .arg(ttl_millis)
311        .arg(expires_at_millis)
312        .arg(FIELD_INFLIGHT_TOKEN)
313        .arg(FIELD_INFLIGHT_LEASE_KEY)
314        .invoke_async(&mut connection)
315        .await
316        .map_err(ValkeyStoreError::from)?;
317
318        let Some(values) = result else {
319            return Ok(None);
320        };
321        if values.len() != 8 {
322            return Ok(None);
323        }
324        let revision = values[0].parse::<u64>().unwrap_or(0);
325        let state: JobState = serde_json::from_str(&values[1]).map_err(ValkeyStoreError::from)?;
326        let scheduled_at = DateTime::parse_from_rfc3339(&values[2])
327            .map_err(|error| {
328                ValkeyStoreError::Codec(serde_json::Error::io(std::io::Error::other(
329                    error.to_string(),
330                )))
331            })?
332            .with_timezone(&Utc);
333        let catch_up = values[3].parse::<bool>().unwrap_or(false);
334        let trigger_count = values[4].parse::<u32>().unwrap_or(0);
335        let scope = parse_scope(&values[5]);
336        Ok(Some(CoordinatedClaim {
337            state: CoordinatedRuntimeState {
338                state,
339                revision,
340                paused: false,
341            },
342            trigger: CoordinatedPendingTrigger {
343                scheduled_at,
344                catch_up,
345                trigger_count,
346            },
347            lease: ExecutionLease::new(
348                job_id.to_string(),
349                resource_id.to_string(),
350                scope,
351                Some(scheduled_at),
352                values[7].clone(),
353                values[6].clone(),
354            ),
355            replayed: true,
356        }))
357    }
358
359    async fn claim_trigger(
360        &self,
361        job_id: &str,
362        resource_id: &str,
363        revision: u64,
364        trigger: CoordinatedPendingTrigger,
365        next_state: &JobState,
366        lease_config: CoordinatedLeaseConfig,
367    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
368        let key = self.state_key(job_id);
369        let lease_key = self.occurrence_lease_key(resource_id, trigger.scheduled_at);
370        let token = next_token(&COORDINATED_TOKEN_COUNTER, "coord");
371        let ttl_millis = u64::try_from(lease_config.ttl.as_millis()).unwrap_or(u64::MAX);
372        let now_millis = now_millis();
373        let expires_at_millis = now_millis.saturating_add(ttl_millis);
374        let next_state_payload =
375            serde_json::to_string(next_state).map_err(ValkeyStoreError::from)?;
376        let mut connection = self.connection.clone();
377        let new_revision: i64 = Script::new(
378            r"
379            local version = tonumber(redis.call('HGET', KEYS[1], ARGV[1]) or '-1')
380            local inflight = redis.call('HGET', KEYS[1], ARGV[2])
381            local paused = redis.call('HGET', KEYS[1], ARGV[4])
382            if paused == '1' or paused == 'true' then
383                return 0
384            end
385            if inflight then
386                local inflight_expires_at = tonumber(redis.call('HGET', KEYS[1], ARGV[3]) or '0')
387                if inflight_expires_at > tonumber(ARGV[5]) then
388                    return 0
389                end
390                return 0
391            end
392            if version ~= tonumber(ARGV[6]) then
393                return 0
394            end
395            redis.call('ZREMRANGEBYSCORE', KEYS[4], '-inf', ARGV[5])
396            if redis.call('EXISTS', KEYS[2]) == 1 then
397                return 0
398            end
399            local ok = redis.call('SET', KEYS[3], ARGV[7], 'NX', 'PX', ARGV[8])
400            if not ok then
401                return 0
402            end
403            redis.call('ZADD', KEYS[4], ARGV[9], KEYS[3])
404            redis.call('HSET', KEYS[1],
405                ARGV[1], version + 1,
406                ARGV[10], ARGV[11],
407                ARGV[12], ARGV[13],
408                ARGV[14], ARGV[15],
409                ARGV[16], ARGV[17],
410                ARGV[18], ARGV[19],
411                ARGV[20], ARGV[21],
412                ARGV[22], ARGV[7],
413                ARGV[23], KEYS[3],
414                ARGV[3], ARGV[9]
415            )
416            return version + 1
417            ",
418        )
419        .key(key)
420        .key(self.resource_lock_key(resource_id))
421        .key(&lease_key)
422        .key(self.occurrence_index_key(resource_id))
423        .arg(FIELD_VERSION)
424        .arg(FIELD_INFLIGHT_TOKEN)
425        .arg(FIELD_INFLIGHT_LEASE_EXPIRES_AT)
426        .arg(FIELD_PAUSED)
427        .arg(now_millis)
428        .arg(revision)
429        .arg(&token)
430        .arg(ttl_millis)
431        .arg(expires_at_millis)
432        .arg(FIELD_STATE)
433        .arg(next_state_payload)
434        .arg(FIELD_INFLIGHT_SCHEDULED_AT)
435        .arg(
436            trigger
437                .scheduled_at
438                .to_rfc3339_opts(SecondsFormat::Nanos, true),
439        )
440        .arg(FIELD_INFLIGHT_CATCH_UP)
441        .arg(trigger.catch_up)
442        .arg(FIELD_INFLIGHT_TRIGGER_COUNT)
443        .arg(trigger.trigger_count)
444        .arg(FIELD_INFLIGHT_RESOURCE_ID)
445        .arg(resource_id)
446        .arg(FIELD_INFLIGHT_SCOPE)
447        .arg("occurrence")
448        .arg(FIELD_INFLIGHT_TOKEN)
449        .arg(FIELD_INFLIGHT_LEASE_KEY)
450        .invoke_async(&mut connection)
451        .await
452        .map_err(ValkeyStoreError::from)?;
453
454        if new_revision <= 0 {
455            return Ok(None);
456        }
457
458        Ok(Some(CoordinatedClaim {
459            state: CoordinatedRuntimeState {
460                state: next_state.clone(),
461                revision: new_revision as u64,
462                paused: false,
463            },
464            trigger: trigger.clone(),
465            lease: ExecutionLease::new(
466                job_id.to_string(),
467                resource_id.to_string(),
468                ExecutionGuardScope::Occurrence,
469                Some(trigger.scheduled_at),
470                token,
471                lease_key,
472            ),
473            replayed: false,
474        }))
475    }
476
477    async fn renew(
478        &self,
479        lease: &ExecutionLease,
480        lease_config: CoordinatedLeaseConfig,
481    ) -> Result<ExecutionGuardRenewal, Self::Error> {
482        let ttl_millis = u64::try_from(lease_config.ttl.as_millis()).unwrap_or(u64::MAX);
483        let expires_at_millis = now_millis().saturating_add(ttl_millis);
484        let mut connection = self.connection.clone();
485        let renewed: i32 = Script::new(
486            r"
487            if redis.call('GET', KEYS[1]) == ARGV[1] then
488                redis.call('PEXPIRE', KEYS[1], ARGV[2])
489                redis.call('ZADD', KEYS[2], ARGV[3], KEYS[1])
490                redis.call('HSET', KEYS[3], ARGV[4], ARGV[3])
491                return 1
492            end
493            redis.call('ZREM', KEYS[2], KEYS[1])
494            return 0
495            ",
496        )
497        .key(&lease.lease_key)
498        .key(self.occurrence_index_key(&lease.resource_id))
499        .key(self.state_key(&lease.job_id))
500        .arg(&lease.token)
501        .arg(ttl_millis)
502        .arg(expires_at_millis)
503        .arg(FIELD_INFLIGHT_LEASE_EXPIRES_AT)
504        .invoke_async(&mut connection)
505        .await
506        .map_err(ValkeyStoreError::from)?;
507        Ok(if renewed == 1 {
508            ExecutionGuardRenewal::Renewed
509        } else {
510            ExecutionGuardRenewal::Lost
511        })
512    }
513
514    async fn complete(
515        &self,
516        job_id: &str,
517        revision: u64,
518        lease: &ExecutionLease,
519        state: &JobState,
520    ) -> Result<bool, Self::Error> {
521        let key = self.state_key(job_id);
522        let payload = serde_json::to_string(state).map_err(ValkeyStoreError::from)?;
523        let mut connection = self.connection.clone();
524        let completed: i32 = Script::new(
525            r"
526            local version = tonumber(redis.call('HGET', KEYS[1], ARGV[1]) or '-1')
527            local token = redis.call('HGET', KEYS[1], ARGV[2])
528            if version ~= tonumber(ARGV[3]) then
529                return 0
530            end
531            if token ~= ARGV[4] then
532                return 0
533            end
534            redis.call('DEL', KEYS[2])
535            redis.call('ZREM', KEYS[3], KEYS[2])
536            redis.call('HSET', KEYS[1], ARGV[1], version + 1, ARGV[5], ARGV[6])
537            redis.call('HDEL', KEYS[1], ARGV[2], ARGV[7], ARGV[8], ARGV[9], ARGV[10], ARGV[11], ARGV[12])
538            return 1
539            ",
540        )
541        .key(key)
542        .key(&lease.lease_key)
543        .key(self.occurrence_index_key(&lease.resource_id))
544        .arg(FIELD_VERSION)
545        .arg(FIELD_INFLIGHT_TOKEN)
546        .arg(revision)
547        .arg(&lease.token)
548        .arg(FIELD_STATE)
549        .arg(payload)
550        .arg(FIELD_INFLIGHT_SCHEDULED_AT)
551        .arg(FIELD_INFLIGHT_CATCH_UP)
552        .arg(FIELD_INFLIGHT_TRIGGER_COUNT)
553        .arg(FIELD_INFLIGHT_RESOURCE_ID)
554        .arg(FIELD_INFLIGHT_SCOPE)
555        .arg(FIELD_INFLIGHT_LEASE_KEY)
556        .invoke_async(&mut connection)
557        .await
558        .map_err(ValkeyStoreError::from)?;
559        Ok(completed == 1)
560    }
561
562    async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
563        let key = self.state_key(job_id);
564        let mut connection = self.connection.clone();
565        let _: () = cmd("DEL")
566            .arg(key)
567            .query_async(&mut connection)
568            .await
569            .map_err(ValkeyStoreError::from)?;
570        Ok(())
571    }
572
573    async fn pause(&self, job_id: &str) -> Result<bool, Self::Error> {
574        let key = self.state_key(job_id);
575        let mut connection = self.connection.clone();
576        let changed: i32 = Script::new(
577            r"
578            local paused = redis.call('HGET', KEYS[1], ARGV[1])
579            if not paused then
580                return 0
581            end
582            if paused == '1' or paused == 'true' then
583                return 0
584            end
585            redis.call('HSET', KEYS[1], ARGV[1], '1')
586            return 1
587            ",
588        )
589        .key(key)
590        .arg(FIELD_PAUSED)
591        .invoke_async(&mut connection)
592        .await
593        .map_err(ValkeyStoreError::from)?;
594        Ok(changed == 1)
595    }
596
597    async fn resume(&self, job_id: &str) -> Result<bool, Self::Error> {
598        let key = self.state_key(job_id);
599        let mut connection = self.connection.clone();
600        let changed: i32 = Script::new(
601            r"
602            local paused = redis.call('HGET', KEYS[1], ARGV[1])
603            if not paused then
604                return 0
605            end
606            if paused == '0' or paused == 'false' then
607                return 0
608            end
609            redis.call('HSET', KEYS[1], ARGV[1], '0')
610            return 1
611            ",
612        )
613        .key(key)
614        .arg(FIELD_PAUSED)
615        .invoke_async(&mut connection)
616        .await
617        .map_err(ValkeyStoreError::from)?;
618        Ok(changed == 1)
619    }
620
621    fn classify_store_error(error: &Self::Error) -> StoreErrorKind
622    where
623        Self: Sized,
624    {
625        if matches!(error, ValkeyStoreError::Codec(_)) {
626            StoreErrorKind::Data
627        } else if error.is_connection_issue() {
628            StoreErrorKind::Connection
629        } else {
630            StoreErrorKind::Unknown
631        }
632    }
633
634    fn classify_guard_error(error: &Self::Error) -> ExecutionGuardErrorKind
635    where
636        Self: Sized,
637    {
638        if matches!(error, ValkeyStoreError::Codec(_)) {
639            ExecutionGuardErrorKind::Data
640        } else if error.is_connection_issue() {
641            ExecutionGuardErrorKind::Connection
642        } else {
643            ExecutionGuardErrorKind::Unknown
644        }
645    }
646}
647
648fn parse_runtime_state(
649    fields: &HashMap<String, String>,
650) -> Result<CoordinatedRuntimeState, ValkeyStoreError> {
651    let revision = fields
652        .get(FIELD_VERSION)
653        .and_then(|value| value.parse::<u64>().ok())
654        .unwrap_or(0);
655    let paused = fields
656        .get(FIELD_PAUSED)
657        .map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
658        .unwrap_or(false);
659    let state = serde_json::from_str(fields.get(FIELD_STATE).map(String::as_str).unwrap_or("{}"))
660        .map_err(ValkeyStoreError::from)?;
661    Ok(CoordinatedRuntimeState {
662        state,
663        revision,
664        paused,
665    })
666}
667
668fn parse_scope(raw: &str) -> ExecutionGuardScope {
669    match raw {
670        "resource" => ExecutionGuardScope::Resource,
671        _ => ExecutionGuardScope::Occurrence,
672    }
673}