1use crate::coordinated_store::{
2 CoordinatedClaim, CoordinatedLeaseConfig, CoordinatedPendingTrigger, CoordinatedRuntimeState,
3 CoordinatedStateStore,
4};
5use crate::error::{ExecutionGuardErrorKind, StoreErrorKind};
6use crate::execution_guard::{ExecutionGuardRenewal, ExecutionGuardScope, ExecutionLease};
7use crate::model::JobState;
8use crate::valkey_execution_support::{
9 next_token, now_millis, occurrence_index_key, occurrence_lease_key, resource_lock_key,
10};
11use crate::valkey_store::ValkeyStoreError;
12use chrono::SecondsFormat;
13use chrono::{DateTime, Utc};
14use redis::{AsyncCommands, Client, Script, aio::ConnectionManager, cmd};
15use std::collections::HashMap;
16use std::sync::atomic::AtomicU64;
17
18const DEFAULT_STATE_KEY_PREFIX: &str = "scheduler:valkey:job-state:";
19const LEGACY_DEFAULT_STATE_KEY_PREFIX: &str = "scheduler:job-state:";
20const DEFAULT_EXECUTION_KEY_PREFIX: &str = "scheduler:valkey:execution-lease:";
21
22const FIELD_VERSION: &str = "version";
23const FIELD_STATE: &str = "state";
24const FIELD_PAUSED: &str = "paused";
25const FIELD_INFLIGHT_SCHEDULED_AT: &str = "inflight_scheduled_at";
26const FIELD_INFLIGHT_CATCH_UP: &str = "inflight_catch_up";
27const FIELD_INFLIGHT_TRIGGER_COUNT: &str = "inflight_trigger_count";
28const FIELD_INFLIGHT_RESOURCE_ID: &str = "inflight_resource_id";
29const FIELD_INFLIGHT_SCOPE: &str = "inflight_scope";
30const FIELD_INFLIGHT_TOKEN: &str = "inflight_token";
31const FIELD_INFLIGHT_LEASE_KEY: &str = "inflight_lease_key";
32const FIELD_INFLIGHT_LEASE_EXPIRES_AT: &str = "inflight_lease_expires_at";
33
34static COORDINATED_TOKEN_COUNTER: AtomicU64 = AtomicU64::new(1);
35
36#[derive(Debug, Clone)]
37pub struct ValkeyCoordinatedStateStore {
38 connection: ConnectionManager,
39 state_key_prefix: String,
40 execution_key_prefix: String,
41}
42
43impl ValkeyCoordinatedStateStore {
44 pub async fn new(url: impl AsRef<str>) -> Result<Self, redis::RedisError> {
45 Self::with_prefixes(url, DEFAULT_STATE_KEY_PREFIX, DEFAULT_EXECUTION_KEY_PREFIX).await
46 }
47
48 pub async fn with_prefixes(
49 url: impl AsRef<str>,
50 state_key_prefix: impl Into<String>,
51 execution_key_prefix: impl Into<String>,
52 ) -> Result<Self, redis::RedisError> {
53 let client = Client::open(url.as_ref())?;
54 let connection = client.get_connection_manager().await?;
55 Ok(Self {
56 connection,
57 state_key_prefix: state_key_prefix.into(),
58 execution_key_prefix: execution_key_prefix.into(),
59 })
60 }
61
62 fn state_key(&self, job_id: &str) -> String {
63 format!("{}{}", self.state_key_prefix, job_id)
64 }
65
66 fn legacy_state_key(&self, job_id: &str) -> Option<String> {
67 if self.state_key_prefix == DEFAULT_STATE_KEY_PREFIX {
68 Some(format!("{LEGACY_DEFAULT_STATE_KEY_PREFIX}{job_id}"))
69 } else {
70 None
71 }
72 }
73
74 fn resource_lock_key(&self, resource_id: &str) -> String {
75 resource_lock_key(&self.execution_key_prefix, resource_id)
76 }
77
78 fn occurrence_index_key(&self, resource_id: &str) -> String {
79 occurrence_index_key(&self.execution_key_prefix, resource_id)
80 }
81
82 fn occurrence_lease_key(&self, resource_id: &str, scheduled_at: DateTime<Utc>) -> String {
83 occurrence_lease_key(&self.execution_key_prefix, resource_id, scheduled_at)
84 }
85
86 async fn key_type(&self, key: &str) -> Result<String, ValkeyStoreError> {
87 let mut connection = self.connection.clone();
88 cmd("TYPE")
89 .arg(key)
90 .query_async(&mut connection)
91 .await
92 .map_err(ValkeyStoreError::from)
93 }
94
95 async fn load_hash(
96 &self,
97 key: &str,
98 ) -> Result<Option<CoordinatedRuntimeState>, ValkeyStoreError> {
99 let mut connection = self.connection.clone();
100 let fields: HashMap<String, String> = connection
101 .hgetall(key)
102 .await
103 .map_err(ValkeyStoreError::from)?;
104 if fields.is_empty() {
105 return Ok(None);
106 }
107
108 Ok(Some(parse_runtime_state(&fields)?))
109 }
110
111 async fn migrate_string_state(
112 &self,
113 key: &str,
114 payload: String,
115 ) -> Result<CoordinatedRuntimeState, ValkeyStoreError> {
116 let state: JobState = serde_json::from_str(&payload).map_err(ValkeyStoreError::from)?;
117 let runtime = CoordinatedRuntimeState {
118 state,
119 revision: 0,
120 paused: false,
121 };
122 self.write_runtime(key, &runtime).await?;
123 Ok(runtime)
124 }
125
126 async fn write_runtime(
127 &self,
128 key: &str,
129 runtime: &CoordinatedRuntimeState,
130 ) -> Result<(), ValkeyStoreError> {
131 let mut connection = self.connection.clone();
132 let payload = serde_json::to_string(&runtime.state).map_err(ValkeyStoreError::from)?;
133 let _: () = cmd("DEL")
134 .arg(key)
135 .query_async(&mut connection)
136 .await
137 .map_err(ValkeyStoreError::from)?;
138 let _: () = cmd("HSET")
139 .arg(key)
140 .arg(FIELD_VERSION)
141 .arg(runtime.revision)
142 .arg(FIELD_STATE)
143 .arg(payload)
144 .arg(FIELD_PAUSED)
145 .arg(if runtime.paused { "1" } else { "0" })
146 .query_async(&mut connection)
147 .await
148 .map_err(ValkeyStoreError::from)?;
149 Ok(())
150 }
151
152 async fn load_payload_state(&self, key: &str) -> Result<Option<String>, ValkeyStoreError> {
153 let mut connection = self.connection.clone();
154 connection.get(key).await.map_err(ValkeyStoreError::from)
155 }
156}
157
158impl CoordinatedStateStore for ValkeyCoordinatedStateStore {
159 type Error = ValkeyStoreError;
160
161 async fn load_or_initialize(
162 &self,
163 job_id: &str,
164 initial_state: JobState,
165 ) -> Result<CoordinatedRuntimeState, Self::Error> {
166 let key = self.state_key(job_id);
167 match self.key_type(&key).await?.as_str() {
168 "hash" => {
169 if let Some(runtime) = self.load_hash(&key).await? {
170 return Ok(runtime);
171 }
172 }
173 "string" => {
174 if let Some(payload) = self.load_payload_state(&key).await? {
175 return self.migrate_string_state(&key, payload).await;
176 }
177 }
178 "none" => {}
179 _ => {}
180 }
181
182 if let Some(legacy_key) = self.legacy_state_key(job_id) {
183 if self.key_type(&legacy_key).await?.as_str() == "string" {
184 if let Some(payload) = self.load_payload_state(&legacy_key).await? {
185 let runtime = self.migrate_string_state(&key, payload).await?;
186 let mut connection = self.connection.clone();
187 let _: () = cmd("DEL")
188 .arg(legacy_key)
189 .query_async(&mut connection)
190 .await
191 .map_err(ValkeyStoreError::from)?;
192 return Ok(runtime);
193 }
194 }
195 }
196
197 let runtime = CoordinatedRuntimeState {
198 state: initial_state,
199 revision: 0,
200 paused: false,
201 };
202 self.write_runtime(&key, &runtime).await?;
203 Ok(runtime)
204 }
205
206 async fn save_state(
207 &self,
208 job_id: &str,
209 revision: u64,
210 state: &JobState,
211 ) -> Result<bool, Self::Error> {
212 let key = self.state_key(job_id);
213 let payload = serde_json::to_string(state).map_err(ValkeyStoreError::from)?;
214 let mut connection = self.connection.clone();
215 let updated: i32 = Script::new(
216 r"
217 local version = tonumber(redis.call('HGET', KEYS[1], ARGV[1]) or '-1')
218 local inflight = redis.call('HGET', KEYS[1], ARGV[3])
219 if inflight then
220 return 0
221 end
222 if version ~= tonumber(ARGV[2]) then
223 return 0
224 end
225 redis.call('HSET', KEYS[1], ARGV[1], version + 1, ARGV[4], ARGV[5])
226 return 1
227 ",
228 )
229 .key(key)
230 .arg(FIELD_VERSION)
231 .arg(revision)
232 .arg(FIELD_INFLIGHT_TOKEN)
233 .arg(FIELD_STATE)
234 .arg(payload)
235 .invoke_async(&mut connection)
236 .await
237 .map_err(ValkeyStoreError::from)?;
238 Ok(updated == 1)
239 }
240
241 async fn reclaim_inflight(
242 &self,
243 job_id: &str,
244 resource_id: &str,
245 lease_config: CoordinatedLeaseConfig,
246 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
247 let key = self.state_key(job_id);
248 let lease_key = self.occurrence_lease_key(resource_id, Utc::now());
249 let token = next_token(&COORDINATED_TOKEN_COUNTER, "coord");
250 let ttl_millis = u64::try_from(lease_config.ttl.as_millis()).unwrap_or(u64::MAX);
251 let now_millis = now_millis();
252 let expires_at_millis = now_millis.saturating_add(ttl_millis);
253 let mut connection = self.connection.clone();
254 let result: Option<Vec<String>> = Script::new(
255 r"
256 local scheduled_at = redis.call('HGET', KEYS[1], ARGV[1])
257 local catch_up = redis.call('HGET', KEYS[1], ARGV[2])
258 local trigger_count = redis.call('HGET', KEYS[1], ARGV[3])
259 local inflight_resource_id = redis.call('HGET', KEYS[1], ARGV[4])
260 local inflight_scope = redis.call('HGET', KEYS[1], ARGV[5])
261 local inflight_expires_at = tonumber(redis.call('HGET', KEYS[1], ARGV[6]) or '0')
262 local state_payload = redis.call('HGET', KEYS[1], ARGV[7])
263 local version = tonumber(redis.call('HGET', KEYS[1], ARGV[8]) or '0')
264 local paused = redis.call('HGET', KEYS[1], ARGV[9])
265
266 if not scheduled_at or not inflight_resource_id or not inflight_scope then
267 return nil
268 end
269 if paused == '1' or paused == 'true' then
270 return nil
271 end
272 if inflight_expires_at > tonumber(ARGV[9]) then
273 return nil
274 end
275 redis.call('ZREMRANGEBYSCORE', KEYS[4], '-inf', ARGV[9])
276 if redis.call('EXISTS', KEYS[2]) == 1 then
277 return nil
278 end
279 local new_lease_key = ARGV[10] .. scheduled_at
280 local ok = redis.call('SET', new_lease_key, ARGV[11], 'NX', 'PX', ARGV[12])
281 if not ok then
282 return nil
283 end
284 redis.call('ZADD', KEYS[4], ARGV[13], new_lease_key)
285 redis.call('HSET', KEYS[1],
286 ARGV[6], ARGV[13],
287 ARGV[14], ARGV[11],
288 ARGV[15], new_lease_key,
289 ARGV[8], version + 1
290 )
291 return { tostring(version + 1), state_payload, scheduled_at, catch_up, trigger_count, inflight_scope, new_lease_key, ARGV[11] }
292 ",
293 )
294 .key(key)
295 .key(self.resource_lock_key(resource_id))
296 .key(lease_key.clone())
297 .key(self.occurrence_index_key(resource_id))
298 .arg(FIELD_INFLIGHT_SCHEDULED_AT)
299 .arg(FIELD_INFLIGHT_CATCH_UP)
300 .arg(FIELD_INFLIGHT_TRIGGER_COUNT)
301 .arg(FIELD_INFLIGHT_RESOURCE_ID)
302 .arg(FIELD_INFLIGHT_SCOPE)
303 .arg(FIELD_INFLIGHT_LEASE_EXPIRES_AT)
304 .arg(FIELD_STATE)
305 .arg(FIELD_VERSION)
306 .arg(FIELD_PAUSED)
307 .arg(now_millis)
308 .arg(format!("{}{}:occurrence:", self.execution_key_prefix, resource_id))
309 .arg(&token)
310 .arg(ttl_millis)
311 .arg(expires_at_millis)
312 .arg(FIELD_INFLIGHT_TOKEN)
313 .arg(FIELD_INFLIGHT_LEASE_KEY)
314 .invoke_async(&mut connection)
315 .await
316 .map_err(ValkeyStoreError::from)?;
317
318 let Some(values) = result else {
319 return Ok(None);
320 };
321 if values.len() != 8 {
322 return Ok(None);
323 }
324 let revision = values[0].parse::<u64>().unwrap_or(0);
325 let state: JobState = serde_json::from_str(&values[1]).map_err(ValkeyStoreError::from)?;
326 let scheduled_at = DateTime::parse_from_rfc3339(&values[2])
327 .map_err(|error| {
328 ValkeyStoreError::Codec(serde_json::Error::io(std::io::Error::other(
329 error.to_string(),
330 )))
331 })?
332 .with_timezone(&Utc);
333 let catch_up = values[3].parse::<bool>().unwrap_or(false);
334 let trigger_count = values[4].parse::<u32>().unwrap_or(0);
335 let scope = parse_scope(&values[5]);
336 Ok(Some(CoordinatedClaim {
337 state: CoordinatedRuntimeState {
338 state,
339 revision,
340 paused: false,
341 },
342 trigger: CoordinatedPendingTrigger {
343 scheduled_at,
344 catch_up,
345 trigger_count,
346 },
347 lease: ExecutionLease::new(
348 job_id.to_string(),
349 resource_id.to_string(),
350 scope,
351 Some(scheduled_at),
352 values[7].clone(),
353 values[6].clone(),
354 ),
355 replayed: true,
356 }))
357 }
358
359 async fn claim_trigger(
360 &self,
361 job_id: &str,
362 resource_id: &str,
363 revision: u64,
364 trigger: CoordinatedPendingTrigger,
365 next_state: &JobState,
366 lease_config: CoordinatedLeaseConfig,
367 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
368 let key = self.state_key(job_id);
369 let lease_key = self.occurrence_lease_key(resource_id, trigger.scheduled_at);
370 let token = next_token(&COORDINATED_TOKEN_COUNTER, "coord");
371 let ttl_millis = u64::try_from(lease_config.ttl.as_millis()).unwrap_or(u64::MAX);
372 let now_millis = now_millis();
373 let expires_at_millis = now_millis.saturating_add(ttl_millis);
374 let next_state_payload =
375 serde_json::to_string(next_state).map_err(ValkeyStoreError::from)?;
376 let mut connection = self.connection.clone();
377 let new_revision: i64 = Script::new(
378 r"
379 local version = tonumber(redis.call('HGET', KEYS[1], ARGV[1]) or '-1')
380 local inflight = redis.call('HGET', KEYS[1], ARGV[2])
381 local paused = redis.call('HGET', KEYS[1], ARGV[4])
382 if paused == '1' or paused == 'true' then
383 return 0
384 end
385 if inflight then
386 local inflight_expires_at = tonumber(redis.call('HGET', KEYS[1], ARGV[3]) or '0')
387 if inflight_expires_at > tonumber(ARGV[5]) then
388 return 0
389 end
390 return 0
391 end
392 if version ~= tonumber(ARGV[6]) then
393 return 0
394 end
395 redis.call('ZREMRANGEBYSCORE', KEYS[4], '-inf', ARGV[5])
396 if redis.call('EXISTS', KEYS[2]) == 1 then
397 return 0
398 end
399 local ok = redis.call('SET', KEYS[3], ARGV[7], 'NX', 'PX', ARGV[8])
400 if not ok then
401 return 0
402 end
403 redis.call('ZADD', KEYS[4], ARGV[9], KEYS[3])
404 redis.call('HSET', KEYS[1],
405 ARGV[1], version + 1,
406 ARGV[10], ARGV[11],
407 ARGV[12], ARGV[13],
408 ARGV[14], ARGV[15],
409 ARGV[16], ARGV[17],
410 ARGV[18], ARGV[19],
411 ARGV[20], ARGV[21],
412 ARGV[22], ARGV[7],
413 ARGV[23], KEYS[3],
414 ARGV[3], ARGV[9]
415 )
416 return version + 1
417 ",
418 )
419 .key(key)
420 .key(self.resource_lock_key(resource_id))
421 .key(&lease_key)
422 .key(self.occurrence_index_key(resource_id))
423 .arg(FIELD_VERSION)
424 .arg(FIELD_INFLIGHT_TOKEN)
425 .arg(FIELD_INFLIGHT_LEASE_EXPIRES_AT)
426 .arg(FIELD_PAUSED)
427 .arg(now_millis)
428 .arg(revision)
429 .arg(&token)
430 .arg(ttl_millis)
431 .arg(expires_at_millis)
432 .arg(FIELD_STATE)
433 .arg(next_state_payload)
434 .arg(FIELD_INFLIGHT_SCHEDULED_AT)
435 .arg(
436 trigger
437 .scheduled_at
438 .to_rfc3339_opts(SecondsFormat::Nanos, true),
439 )
440 .arg(FIELD_INFLIGHT_CATCH_UP)
441 .arg(trigger.catch_up)
442 .arg(FIELD_INFLIGHT_TRIGGER_COUNT)
443 .arg(trigger.trigger_count)
444 .arg(FIELD_INFLIGHT_RESOURCE_ID)
445 .arg(resource_id)
446 .arg(FIELD_INFLIGHT_SCOPE)
447 .arg("occurrence")
448 .arg(FIELD_INFLIGHT_TOKEN)
449 .arg(FIELD_INFLIGHT_LEASE_KEY)
450 .invoke_async(&mut connection)
451 .await
452 .map_err(ValkeyStoreError::from)?;
453
454 if new_revision <= 0 {
455 return Ok(None);
456 }
457
458 Ok(Some(CoordinatedClaim {
459 state: CoordinatedRuntimeState {
460 state: next_state.clone(),
461 revision: new_revision as u64,
462 paused: false,
463 },
464 trigger: trigger.clone(),
465 lease: ExecutionLease::new(
466 job_id.to_string(),
467 resource_id.to_string(),
468 ExecutionGuardScope::Occurrence,
469 Some(trigger.scheduled_at),
470 token,
471 lease_key,
472 ),
473 replayed: false,
474 }))
475 }
476
477 async fn renew(
478 &self,
479 lease: &ExecutionLease,
480 lease_config: CoordinatedLeaseConfig,
481 ) -> Result<ExecutionGuardRenewal, Self::Error> {
482 let ttl_millis = u64::try_from(lease_config.ttl.as_millis()).unwrap_or(u64::MAX);
483 let expires_at_millis = now_millis().saturating_add(ttl_millis);
484 let mut connection = self.connection.clone();
485 let renewed: i32 = Script::new(
486 r"
487 if redis.call('GET', KEYS[1]) == ARGV[1] then
488 redis.call('PEXPIRE', KEYS[1], ARGV[2])
489 redis.call('ZADD', KEYS[2], ARGV[3], KEYS[1])
490 redis.call('HSET', KEYS[3], ARGV[4], ARGV[3])
491 return 1
492 end
493 redis.call('ZREM', KEYS[2], KEYS[1])
494 return 0
495 ",
496 )
497 .key(&lease.lease_key)
498 .key(self.occurrence_index_key(&lease.resource_id))
499 .key(self.state_key(&lease.job_id))
500 .arg(&lease.token)
501 .arg(ttl_millis)
502 .arg(expires_at_millis)
503 .arg(FIELD_INFLIGHT_LEASE_EXPIRES_AT)
504 .invoke_async(&mut connection)
505 .await
506 .map_err(ValkeyStoreError::from)?;
507 Ok(if renewed == 1 {
508 ExecutionGuardRenewal::Renewed
509 } else {
510 ExecutionGuardRenewal::Lost
511 })
512 }
513
514 async fn complete(
515 &self,
516 job_id: &str,
517 revision: u64,
518 lease: &ExecutionLease,
519 state: &JobState,
520 ) -> Result<bool, Self::Error> {
521 let key = self.state_key(job_id);
522 let payload = serde_json::to_string(state).map_err(ValkeyStoreError::from)?;
523 let mut connection = self.connection.clone();
524 let completed: i32 = Script::new(
525 r"
526 local version = tonumber(redis.call('HGET', KEYS[1], ARGV[1]) or '-1')
527 local token = redis.call('HGET', KEYS[1], ARGV[2])
528 if version ~= tonumber(ARGV[3]) then
529 return 0
530 end
531 if token ~= ARGV[4] then
532 return 0
533 end
534 redis.call('DEL', KEYS[2])
535 redis.call('ZREM', KEYS[3], KEYS[2])
536 redis.call('HSET', KEYS[1], ARGV[1], version + 1, ARGV[5], ARGV[6])
537 redis.call('HDEL', KEYS[1], ARGV[2], ARGV[7], ARGV[8], ARGV[9], ARGV[10], ARGV[11], ARGV[12])
538 return 1
539 ",
540 )
541 .key(key)
542 .key(&lease.lease_key)
543 .key(self.occurrence_index_key(&lease.resource_id))
544 .arg(FIELD_VERSION)
545 .arg(FIELD_INFLIGHT_TOKEN)
546 .arg(revision)
547 .arg(&lease.token)
548 .arg(FIELD_STATE)
549 .arg(payload)
550 .arg(FIELD_INFLIGHT_SCHEDULED_AT)
551 .arg(FIELD_INFLIGHT_CATCH_UP)
552 .arg(FIELD_INFLIGHT_TRIGGER_COUNT)
553 .arg(FIELD_INFLIGHT_RESOURCE_ID)
554 .arg(FIELD_INFLIGHT_SCOPE)
555 .arg(FIELD_INFLIGHT_LEASE_KEY)
556 .invoke_async(&mut connection)
557 .await
558 .map_err(ValkeyStoreError::from)?;
559 Ok(completed == 1)
560 }
561
562 async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
563 let key = self.state_key(job_id);
564 let mut connection = self.connection.clone();
565 let _: () = cmd("DEL")
566 .arg(key)
567 .query_async(&mut connection)
568 .await
569 .map_err(ValkeyStoreError::from)?;
570 Ok(())
571 }
572
573 async fn pause(&self, job_id: &str) -> Result<bool, Self::Error> {
574 let key = self.state_key(job_id);
575 let mut connection = self.connection.clone();
576 let changed: i32 = Script::new(
577 r"
578 local paused = redis.call('HGET', KEYS[1], ARGV[1])
579 if not paused then
580 return 0
581 end
582 if paused == '1' or paused == 'true' then
583 return 0
584 end
585 redis.call('HSET', KEYS[1], ARGV[1], '1')
586 return 1
587 ",
588 )
589 .key(key)
590 .arg(FIELD_PAUSED)
591 .invoke_async(&mut connection)
592 .await
593 .map_err(ValkeyStoreError::from)?;
594 Ok(changed == 1)
595 }
596
597 async fn resume(&self, job_id: &str) -> Result<bool, Self::Error> {
598 let key = self.state_key(job_id);
599 let mut connection = self.connection.clone();
600 let changed: i32 = Script::new(
601 r"
602 local paused = redis.call('HGET', KEYS[1], ARGV[1])
603 if not paused then
604 return 0
605 end
606 if paused == '0' or paused == 'false' then
607 return 0
608 end
609 redis.call('HSET', KEYS[1], ARGV[1], '0')
610 return 1
611 ",
612 )
613 .key(key)
614 .arg(FIELD_PAUSED)
615 .invoke_async(&mut connection)
616 .await
617 .map_err(ValkeyStoreError::from)?;
618 Ok(changed == 1)
619 }
620
621 fn classify_store_error(error: &Self::Error) -> StoreErrorKind
622 where
623 Self: Sized,
624 {
625 if matches!(error, ValkeyStoreError::Codec(_)) {
626 StoreErrorKind::Data
627 } else if error.is_connection_issue() {
628 StoreErrorKind::Connection
629 } else {
630 StoreErrorKind::Unknown
631 }
632 }
633
634 fn classify_guard_error(error: &Self::Error) -> ExecutionGuardErrorKind
635 where
636 Self: Sized,
637 {
638 if matches!(error, ValkeyStoreError::Codec(_)) {
639 ExecutionGuardErrorKind::Data
640 } else if error.is_connection_issue() {
641 ExecutionGuardErrorKind::Connection
642 } else {
643 ExecutionGuardErrorKind::Unknown
644 }
645 }
646}
647
648fn parse_runtime_state(
649 fields: &HashMap<String, String>,
650) -> Result<CoordinatedRuntimeState, ValkeyStoreError> {
651 let revision = fields
652 .get(FIELD_VERSION)
653 .and_then(|value| value.parse::<u64>().ok())
654 .unwrap_or(0);
655 let paused = fields
656 .get(FIELD_PAUSED)
657 .map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
658 .unwrap_or(false);
659 let state = serde_json::from_str(fields.get(FIELD_STATE).map(String::as_str).unwrap_or("{}"))
660 .map_err(ValkeyStoreError::from)?;
661 Ok(CoordinatedRuntimeState {
662 state,
663 revision,
664 paused,
665 })
666}
667
668fn parse_scope(raw: &str) -> ExecutionGuardScope {
669 match raw {
670 "resource" => ExecutionGuardScope::Resource,
671 _ => ExecutionGuardScope::Occurrence,
672 }
673}