1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15 ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17 RevokeLeaseResult,
18 StageDependencyEdgeArgs, StageDependencyEdgeResult,
19};
20use ff_core::keys::{
21 self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
22 IndexKeys, QuotaKeyContext,
23};
24use ff_core::partition::{
25 budget_partition, execution_partition, flow_partition, quota_partition, Partition,
26 PartitionConfig, PartitionFamily,
27};
28use ff_core::state::{PublicState, StateVector};
29use ff_core::types::*;
30use ff_engine::Engine;
31use ff_script::retry::is_retryable_kind;
32
33use crate::config::ServerConfig;
34
35const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
40
41pub struct Server {
46 client: Client,
47 tail_client: Client,
68 stream_semaphore: Arc<tokio::sync::Semaphore>,
81 xread_block_lock: Arc<tokio::sync::Mutex<()>>,
110 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
118 engine: Engine,
119 config: ServerConfig,
120 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
123}
124
125#[derive(Debug, thiserror::Error)]
127pub enum ServerError {
128 #[error("valkey: {0}")]
130 Valkey(#[from] ferriskey::Error),
131 #[error("valkey ({context}): {source}")]
133 ValkeyContext {
134 #[source]
135 source: ferriskey::Error,
136 context: String,
137 },
138 #[error("config: {0}")]
139 Config(#[from] crate::config::ConfigError),
140 #[error("library load: {0}")]
141 LibraryLoad(#[from] ff_script::loader::LoadError),
142 #[error("partition mismatch: {0}")]
143 PartitionMismatch(String),
144 #[error("not found: {0}")]
145 NotFound(String),
146 #[error("invalid input: {0}")]
147 InvalidInput(String),
148 #[error("operation failed: {0}")]
149 OperationFailed(String),
150 #[error("script: {0}")]
151 Script(String),
152 #[error("too many concurrent {0} calls (max: {1})")]
161 ConcurrencyLimitExceeded(&'static str, u32),
162 #[error(
167 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
168 )]
169 ValkeyVersionTooLow {
170 detected: String,
171 required: String,
172 },
173}
174
175impl ServerError {
176 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
180 match self {
181 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
182 Self::LibraryLoad(e) => e.valkey_kind(),
183 _ => None,
184 }
185 }
186
187 pub fn is_retryable(&self) -> bool {
193 match self {
194 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
195 is_retryable_kind(e.kind())
196 }
197 Self::LibraryLoad(load_err) => load_err
198 .valkey_kind()
199 .map(is_retryable_kind)
200 .unwrap_or(false),
201 Self::Config(_)
202 | Self::PartitionMismatch(_)
203 | Self::NotFound(_)
204 | Self::InvalidInput(_)
205 | Self::OperationFailed(_)
206 | Self::Script(_) => false,
207 Self::ConcurrencyLimitExceeded(_, _) => true,
211 Self::ValkeyVersionTooLow { .. } => false,
213 }
214 }
215}
216
217impl Server {
218 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
226 tracing::info!(
228 host = %config.host, port = config.port,
229 tls = config.tls, cluster = config.cluster,
230 "connecting to Valkey"
231 );
232 let mut builder = ClientBuilder::new()
233 .host(&config.host, config.port)
234 .connect_timeout(Duration::from_secs(10))
235 .request_timeout(Duration::from_millis(5000));
236 if config.tls {
237 builder = builder.tls();
238 }
239 if config.cluster {
240 builder = builder.cluster();
241 }
242 let client = builder
243 .build()
244 .await
245 .map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
246
247 let pong: String = client
249 .cmd("PING")
250 .execute()
251 .await
252 .map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
253 if pong != "PONG" {
254 return Err(ServerError::OperationFailed(format!(
255 "unexpected PING response: {pong}"
256 )));
257 }
258 tracing::info!("Valkey connection established");
259
260 verify_valkey_version(&client).await?;
265
266 validate_or_create_partition_config(&client, &config.partition_config).await?;
268
269 initialize_waitpoint_hmac_secret(
274 &client,
275 &config.partition_config,
276 &config.waitpoint_hmac_secret,
277 )
278 .await?;
279
280 if !config.skip_library_load {
282 tracing::info!("loading flowfabric Lua library");
283 ff_script::loader::ensure_library(&client)
284 .await
285 .map_err(ServerError::LibraryLoad)?;
286 } else {
287 tracing::info!("skipping library load (skip_library_load=true)");
288 }
289
290 let engine_cfg = ff_engine::EngineConfig {
293 partition_config: config.partition_config,
294 lanes: config.lanes.clone(),
295 lease_expiry_interval: config.engine_config.lease_expiry_interval,
296 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
297 index_reconciler_interval: config.engine_config.index_reconciler_interval,
298 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
299 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
300 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
301 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
302 budget_reset_interval: config.engine_config.budget_reset_interval,
303 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
304 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
305 unblock_interval: config.engine_config.unblock_interval,
306 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
307 completion_listener: Some(ff_engine::CompletionListenerConfig {
311 addresses: vec![(config.host.clone(), config.port)],
312 tls: config.tls,
313 cluster: config.cluster,
314 }),
315 flow_projector_interval: config.engine_config.flow_projector_interval,
316 execution_deadline_interval: config.engine_config.execution_deadline_interval,
317 };
318 let engine = Engine::start(engine_cfg, client.clone());
325
326 tracing::info!("opening dedicated tail connection");
332 let mut tail_builder = ClientBuilder::new()
333 .host(&config.host, config.port)
334 .connect_timeout(Duration::from_secs(10))
335 .request_timeout(Duration::from_millis(5000));
339 if config.tls {
340 tail_builder = tail_builder.tls();
341 }
342 if config.cluster {
343 tail_builder = tail_builder.cluster();
344 }
345 let tail_client = tail_builder
346 .build()
347 .await
348 .map_err(|e| ServerError::ValkeyContext {
349 source: e,
350 context: "connect (tail)".into(),
351 })?;
352 let tail_pong: String = tail_client
353 .cmd("PING")
354 .execute()
355 .await
356 .map_err(|e| ServerError::ValkeyContext {
357 source: e,
358 context: "PING (tail)".into(),
359 })?;
360 if tail_pong != "PONG" {
361 return Err(ServerError::OperationFailed(format!(
362 "tail client unexpected PING response: {tail_pong}"
363 )));
364 }
365
366 let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
367 config.max_concurrent_stream_ops as usize,
368 ));
369 let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
370 tracing::info!(
371 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
372 "stream-op client ready (read + tail share the semaphore; \
373 tails additionally serialize via xread_block_lock)"
374 );
375
376 if config.api_token.is_none() {
384 tracing::warn!(
385 listen_addr = %config.listen_addr,
386 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
387 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
388 FF_API_TOKEN for any deployment reachable from untrusted \
389 networks."
390 );
391 tracing::warn!(
397 listen_addr = %config.listen_addr,
398 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
399 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
400 and GET /v1/executions/{{id}}/result returns raw completion payload \
401 bytes (may contain PII). Both are UNAUTHENTICATED in this \
402 configuration."
403 );
404 }
405
406 tracing::info!(
411 flow_partitions = config.partition_config.num_flow_partitions,
412 budget_partitions = config.partition_config.num_budget_partitions,
413 quota_partitions = config.partition_config.num_quota_partitions,
414 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
415 listen_addr = %config.listen_addr,
416 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
417 config.partition_config.num_flow_partitions,
418 config.partition_config.num_budget_partitions,
419 config.partition_config.num_quota_partitions,
420 );
421
422 Ok(Self {
423 client,
424 tail_client,
425 stream_semaphore,
426 xread_block_lock,
427 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
432 engine,
433 config,
434 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
435 })
436 }
437
438 pub fn client(&self) -> &Client {
440 &self.client
441 }
442
443 async fn fcall_with_reload(
450 &self,
451 function: &str,
452 keys: &[&str],
453 args: &[&str],
454 ) -> Result<Value, ServerError> {
455 fcall_with_reload_on_client(&self.client, function, keys, args).await
456 }
457
458 pub fn config(&self) -> &ServerConfig {
460 &self.config
461 }
462
463 pub fn partition_config(&self) -> &PartitionConfig {
465 &self.config.partition_config
466 }
467
468 pub async fn create_execution(
474 &self,
475 args: &CreateExecutionArgs,
476 ) -> Result<CreateExecutionResult, ServerError> {
477 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
478 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
479 let idx = IndexKeys::new(&partition);
480
481 let lane = &args.lane_id;
482 let tag = partition.hash_tag();
483 let idem_key = match &args.idempotency_key {
484 Some(k) if !k.is_empty() => {
485 keys::idempotency_key(&tag, args.namespace.as_str(), k)
486 }
487 _ => ctx.noop(),
488 };
489
490 let delay_str = args
491 .delay_until
492 .map(|d| d.0.to_string())
493 .unwrap_or_default();
494 let is_delayed = !delay_str.is_empty();
495
496 let scheduling_zset = if is_delayed {
501 idx.lane_delayed(lane)
502 } else {
503 idx.lane_eligible(lane)
504 };
505
506 let fcall_keys: Vec<String> = vec![
507 ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
516
517 let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
518
519 let fcall_args: Vec<String> = vec![
525 args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
532 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
533 .unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
537 .map(|_| "86400000".to_string())
538 .unwrap_or_default(), tags_json, args.execution_deadline_at
541 .map(|d| d.to_string())
542 .unwrap_or_default(), args.partition_id.to_string(), ];
545
546 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
547 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
548
549 let raw: Value = self
550 .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
551 .await?;
552
553 parse_create_result(&raw, &args.execution_id)
554 }
555
556 pub async fn cancel_execution(
558 &self,
559 args: &CancelExecutionArgs,
560 ) -> Result<CancelExecutionResult, ServerError> {
561 let raw = self
562 .fcall_cancel_execution_with_reload(args)
563 .await?;
564 parse_cancel_result(&raw, &args.execution_id)
565 }
566
567 async fn fcall_cancel_execution_with_reload(
571 &self,
572 args: &CancelExecutionArgs,
573 ) -> Result<Value, ServerError> {
574 let (keys, argv) = build_cancel_execution_fcall(
575 &self.client,
576 &self.config.partition_config,
577 args,
578 )
579 .await?;
580 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
581 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
582 self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
583 }
584
585 pub async fn get_execution_state(
590 &self,
591 execution_id: &ExecutionId,
592 ) -> Result<PublicState, ServerError> {
593 let partition = execution_partition(execution_id, &self.config.partition_config);
594 let ctx = ExecKeyContext::new(&partition, execution_id);
595
596 let state_str: Option<String> = self
597 .client
598 .hget(&ctx.core(), "public_state")
599 .await
600 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
601
602 match state_str {
603 Some(s) => {
604 let quoted = format!("\"{s}\"");
605 serde_json::from_str("ed).map_err(|e| {
606 ServerError::Script(format!(
607 "invalid public_state '{s}' for {execution_id}: {e}"
608 ))
609 })
610 }
611 None => Err(ServerError::NotFound(format!(
612 "execution not found: {execution_id}"
613 ))),
614 }
615 }
616
617 pub async fn get_execution_result(
644 &self,
645 execution_id: &ExecutionId,
646 ) -> Result<Option<Vec<u8>>, ServerError> {
647 let partition = execution_partition(execution_id, &self.config.partition_config);
648 let ctx = ExecKeyContext::new(&partition, execution_id);
649
650 let payload: Option<Vec<u8>> = self
660 .client
661 .cmd("GET")
662 .arg(ctx.result())
663 .execute()
664 .await
665 .map_err(|e| ServerError::ValkeyContext {
666 source: e,
667 context: "GET exec result".into(),
668 })?;
669 Ok(payload)
670 }
671
672 pub async fn list_pending_waitpoints(
718 &self,
719 execution_id: &ExecutionId,
720 ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
721 let partition = execution_partition(execution_id, &self.config.partition_config);
722 let ctx = ExecKeyContext::new(&partition, execution_id);
723
724 let core_exists: bool = self
725 .client
726 .cmd("EXISTS")
727 .arg(ctx.core())
728 .execute()
729 .await
730 .map_err(|e| ServerError::ValkeyContext {
731 source: e,
732 context: "EXISTS exec_core (pending waitpoints)".into(),
733 })?;
734 if !core_exists {
735 return Err(ServerError::NotFound(format!(
736 "execution not found: {execution_id}"
737 )));
738 }
739
740 const WAITPOINTS_SSCAN_COUNT: usize = 100;
748 let waitpoints_key = ctx.waitpoints();
749 let mut wp_ids_raw: Vec<String> = Vec::new();
750 let mut cursor: String = "0".to_owned();
751 loop {
752 let reply: (String, Vec<String>) = self
753 .client
754 .cmd("SSCAN")
755 .arg(&waitpoints_key)
756 .arg(&cursor)
757 .arg("COUNT")
758 .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
759 .execute()
760 .await
761 .map_err(|e| ServerError::ValkeyContext {
762 source: e,
763 context: "SSCAN waitpoints".into(),
764 })?;
765 cursor = reply.0;
766 wp_ids_raw.extend(reply.1);
767 if cursor == "0" {
768 break;
769 }
770 }
771
772 wp_ids_raw.sort_unstable();
780 wp_ids_raw.dedup();
781
782 if wp_ids_raw.is_empty() {
783 return Ok(Vec::new());
784 }
785
786 let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
790 for raw in &wp_ids_raw {
791 match WaitpointId::parse(raw) {
792 Ok(id) => wp_ids.push(id),
793 Err(e) => tracing::warn!(
794 raw_id = %raw,
795 error = %e,
796 execution_id = %execution_id,
797 "list_pending_waitpoints: skipping unparseable waitpoint_id"
798 ),
799 }
800 }
801 if wp_ids.is_empty() {
802 return Ok(Vec::new());
803 }
804
805 const WP_FIELDS: [&str; 6] = [
809 "state",
810 "waitpoint_key",
811 "waitpoint_token",
812 "created_at",
813 "activated_at",
814 "expires_at",
815 ];
816
817 let mut pass1 = self.client.pipeline();
822 let mut wp_slots = Vec::with_capacity(wp_ids.len());
823 let mut cond_slots = Vec::with_capacity(wp_ids.len());
824 for wp_id in &wp_ids {
825 let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
826 cmd = cmd.arg(ctx.waitpoint(wp_id));
827 for f in WP_FIELDS {
828 cmd = cmd.arg(f);
829 }
830 wp_slots.push(cmd.finish());
831
832 cond_slots.push(
833 pass1
834 .cmd::<Option<String>>("HGET")
835 .arg(ctx.waitpoint_condition(wp_id))
836 .arg("total_matchers")
837 .finish(),
838 );
839 }
840 pass1
841 .execute()
842 .await
843 .map_err(|e| ServerError::ValkeyContext {
844 source: e,
845 context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
846 })?;
847
848 struct Kept {
854 wp_id: WaitpointId,
855 wp_fields: Vec<Option<String>>,
856 total_matchers: usize,
857 }
858 let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
859 for ((wp_id, wp_slot), cond_slot) in wp_ids
860 .iter()
861 .zip(wp_slots)
862 .zip(cond_slots)
863 {
864 let wp_fields: Vec<Option<String>> =
865 wp_slot.value().map_err(|e| ServerError::ValkeyContext {
866 source: e,
867 context: format!("pipeline slot HMGET waitpoint {wp_id}"),
868 })?;
869
870 if wp_fields.iter().all(Option::is_none) {
873 let _ = cond_slot.value();
875 continue;
876 }
877 let state_ref = wp_fields
878 .first()
879 .and_then(|v| v.as_deref())
880 .unwrap_or("");
881 if state_ref != "pending" && state_ref != "active" {
882 let _ = cond_slot.value();
883 continue;
884 }
885 let token_ref = wp_fields
886 .get(2)
887 .and_then(|v| v.as_deref())
888 .unwrap_or("");
889 if token_ref.is_empty() {
890 let _ = cond_slot.value();
891 tracing::warn!(
892 waitpoint_id = %wp_id,
893 execution_id = %execution_id,
894 waitpoint_hash_key = %ctx.waitpoint(wp_id),
895 state = %state_ref,
896 "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
897 field is empty — likely storage corruption (half-populated write, \
898 operator edit, or interrupted script). Skipping this entry in the \
899 response. HGETALL the waitpoint_hash_key to inspect."
900 );
901 continue;
902 }
903
904 let total_matchers = cond_slot
905 .value()
906 .map_err(|e| ServerError::ValkeyContext {
907 source: e,
908 context: format!("pipeline slot HGET total_matchers {wp_id}"),
909 })?
910 .and_then(|s| s.parse::<usize>().ok())
911 .unwrap_or(0);
912
913 kept.push(Kept {
914 wp_id: wp_id.clone(),
915 wp_fields,
916 total_matchers,
917 });
918 }
919
920 if kept.is_empty() {
921 return Ok(Vec::new());
922 }
923
924 let mut pass2 = self.client.pipeline();
929 let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
930 let mut pass2_needed = false;
931 for k in &kept {
932 if k.total_matchers == 0 {
933 matcher_slots.push(None);
934 continue;
935 }
936 pass2_needed = true;
937 let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
938 cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
939 for i in 0..k.total_matchers {
940 cmd = cmd.arg(format!("matcher:{i}:name"));
941 }
942 matcher_slots.push(Some(cmd.finish()));
943 }
944 if pass2_needed {
945 pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
946 source: e,
947 context: "pipeline HMGET wp_condition matchers".into(),
948 })?;
949 }
950
951 let parse_ts = |raw: &str| -> Option<TimestampMs> {
952 if raw.is_empty() {
953 None
954 } else {
955 raw.parse::<i64>().ok().map(TimestampMs)
956 }
957 };
958
959 let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
960 for (k, slot) in kept.into_iter().zip(matcher_slots) {
961 let get = |i: usize| -> &str {
962 k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
963 };
964
965 let required_signal_names: Vec<String> = match slot {
968 None => Vec::new(),
969 Some(s) => {
970 let vals: Vec<Option<String>> =
971 s.value().map_err(|e| ServerError::ValkeyContext {
972 source: e,
973 context: format!(
974 "pipeline slot HMGET wp_condition matchers {}",
975 k.wp_id
976 ),
977 })?;
978 vals.into_iter()
979 .flatten()
980 .filter(|name| !name.is_empty())
981 .collect()
982 }
983 };
984
985 out.push(PendingWaitpointInfo {
986 waitpoint_id: k.wp_id,
987 waitpoint_key: get(1).to_owned(),
988 state: get(0).to_owned(),
989 waitpoint_token: WaitpointToken(get(2).to_owned()),
990 required_signal_names,
991 created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
992 activated_at: parse_ts(get(4)),
993 expires_at: parse_ts(get(5)),
994 });
995 }
996
997 Ok(out)
998 }
999
1000 pub async fn create_budget(
1004 &self,
1005 args: &CreateBudgetArgs,
1006 ) -> Result<CreateBudgetResult, ServerError> {
1007 let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1008 let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1009 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1010 let policies_index = keys::budget_policies_index(bctx.hash_tag());
1011
1012 let fcall_keys: Vec<String> = vec![
1015 bctx.definition(),
1016 bctx.limits(),
1017 bctx.usage(),
1018 resets_key,
1019 policies_index,
1020 ];
1021
1022 let dim_count = args.dimensions.len();
1026 let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1027 fcall_args.push(args.budget_id.to_string());
1028 fcall_args.push(args.scope_type.clone());
1029 fcall_args.push(args.scope_id.clone());
1030 fcall_args.push(args.enforcement_mode.clone());
1031 fcall_args.push(args.on_hard_limit.clone());
1032 fcall_args.push(args.on_soft_limit.clone());
1033 fcall_args.push(args.reset_interval_ms.to_string());
1034 fcall_args.push(args.now.to_string());
1035 fcall_args.push(dim_count.to_string());
1036 for dim in &args.dimensions {
1037 fcall_args.push(dim.clone());
1038 }
1039 for hard in &args.hard_limits {
1040 fcall_args.push(hard.to_string());
1041 }
1042 for soft in &args.soft_limits {
1043 fcall_args.push(soft.to_string());
1044 }
1045
1046 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1047 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1048
1049 let raw: Value = self
1050 .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1051 .await?;
1052
1053 parse_budget_create_result(&raw, &args.budget_id)
1054 }
1055
1056 pub async fn create_quota_policy(
1058 &self,
1059 args: &CreateQuotaPolicyArgs,
1060 ) -> Result<CreateQuotaPolicyResult, ServerError> {
1061 let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1062 let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1063
1064 let fcall_keys: Vec<String> = vec![
1067 qctx.definition(),
1068 qctx.window("requests_per_window"),
1069 qctx.concurrency(),
1070 qctx.admitted_set(),
1071 keys::quota_policies_index(qctx.hash_tag()),
1072 ];
1073
1074 let fcall_args: Vec<String> = vec![
1077 args.quota_policy_id.to_string(),
1078 args.window_seconds.to_string(),
1079 args.max_requests_per_window.to_string(),
1080 args.max_concurrent.to_string(),
1081 args.now.to_string(),
1082 ];
1083
1084 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1085 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1086
1087 let raw: Value = self
1088 .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1089 .await?;
1090
1091 parse_quota_create_result(&raw, &args.quota_policy_id)
1092 }
1093
1094 pub async fn get_budget_status(
1096 &self,
1097 budget_id: &BudgetId,
1098 ) -> Result<BudgetStatus, ServerError> {
1099 let partition = budget_partition(budget_id, &self.config.partition_config);
1100 let bctx = BudgetKeyContext::new(&partition, budget_id);
1101
1102 let def: HashMap<String, String> = self
1104 .client
1105 .hgetall(&bctx.definition())
1106 .await
1107 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
1108
1109 if def.is_empty() {
1110 return Err(ServerError::NotFound(format!(
1111 "budget not found: {budget_id}"
1112 )));
1113 }
1114
1115 let usage_raw: HashMap<String, String> = self
1117 .client
1118 .hgetall(&bctx.usage())
1119 .await
1120 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
1121 let usage: HashMap<String, u64> = usage_raw
1122 .into_iter()
1123 .filter(|(k, _)| k != "_init")
1124 .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1125 .collect();
1126
1127 let limits_raw: HashMap<String, String> = self
1129 .client
1130 .hgetall(&bctx.limits())
1131 .await
1132 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
1133 let mut hard_limits = HashMap::new();
1134 let mut soft_limits = HashMap::new();
1135 for (k, v) in &limits_raw {
1136 if let Some(dim) = k.strip_prefix("hard:") {
1137 hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1138 } else if let Some(dim) = k.strip_prefix("soft:") {
1139 soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1140 }
1141 }
1142
1143 let non_empty = |s: Option<&String>| -> Option<String> {
1144 s.filter(|v| !v.is_empty()).cloned()
1145 };
1146
1147 Ok(BudgetStatus {
1148 budget_id: budget_id.to_string(),
1149 scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1150 scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1151 enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1152 usage,
1153 hard_limits,
1154 soft_limits,
1155 breach_count: def
1156 .get("breach_count")
1157 .and_then(|v| v.parse().ok())
1158 .unwrap_or(0),
1159 soft_breach_count: def
1160 .get("soft_breach_count")
1161 .and_then(|v| v.parse().ok())
1162 .unwrap_or(0),
1163 last_breach_at: non_empty(def.get("last_breach_at")),
1164 last_breach_dim: non_empty(def.get("last_breach_dim")),
1165 next_reset_at: non_empty(def.get("next_reset_at")),
1166 created_at: non_empty(def.get("created_at")),
1167 })
1168 }
1169
1170 pub async fn report_usage(
1172 &self,
1173 budget_id: &BudgetId,
1174 args: &ReportUsageArgs,
1175 ) -> Result<ReportUsageResult, ServerError> {
1176 let partition = budget_partition(budget_id, &self.config.partition_config);
1177 let bctx = BudgetKeyContext::new(&partition, budget_id);
1178
1179 let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1181
1182 let dim_count = args.dimensions.len();
1184 let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1185 fcall_args.push(dim_count.to_string());
1186 for dim in &args.dimensions {
1187 fcall_args.push(dim.clone());
1188 }
1189 for delta in &args.deltas {
1190 fcall_args.push(delta.to_string());
1191 }
1192 fcall_args.push(args.now.to_string());
1193 let dedup_key_val = args
1194 .dedup_key
1195 .as_ref()
1196 .filter(|k| !k.is_empty())
1197 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1198 .unwrap_or_default();
1199 fcall_args.push(dedup_key_val);
1200
1201 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1202 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1203
1204 let raw: Value = self
1205 .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1206 .await?;
1207
1208 parse_report_usage_result(&raw)
1209 }
1210
1211 pub async fn reset_budget(
1213 &self,
1214 budget_id: &BudgetId,
1215 ) -> Result<ResetBudgetResult, ServerError> {
1216 let partition = budget_partition(budget_id, &self.config.partition_config);
1217 let bctx = BudgetKeyContext::new(&partition, budget_id);
1218 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1219
1220 let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1222
1223 let now = TimestampMs::now();
1225 let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1226
1227 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1228 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1229
1230 let raw: Value = self
1231 .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1232 .await?;
1233
1234 parse_reset_budget_result(&raw)
1235 }
1236
1237 pub async fn create_flow(
1241 &self,
1242 args: &CreateFlowArgs,
1243 ) -> Result<CreateFlowResult, ServerError> {
1244 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1245 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1246 let fidx = FlowIndexKeys::new(&partition);
1247
1248 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1250
1251 let fcall_args: Vec<String> = vec![
1253 args.flow_id.to_string(),
1254 args.flow_kind.clone(),
1255 args.namespace.to_string(),
1256 args.now.to_string(),
1257 ];
1258
1259 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1260 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1261
1262 let raw: Value = self
1263 .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1264 .await?;
1265
1266 parse_create_flow_result(&raw, &args.flow_id)
1267 }
1268
1269 pub async fn add_execution_to_flow(
1307 &self,
1308 args: &AddExecutionToFlowArgs,
1309 ) -> Result<AddExecutionToFlowResult, ServerError> {
1310 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1311 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1312 let fidx = FlowIndexKeys::new(&partition);
1313
1314 let exec_partition =
1318 execution_partition(&args.execution_id, &self.config.partition_config);
1319 let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1320
1321 if exec_partition.index != partition.index {
1330 return Err(ServerError::PartitionMismatch(format!(
1331 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1332 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1333 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1334 exec_p = exec_partition.index,
1335 flow_p = partition.index,
1336 )));
1337 }
1338
1339 let fcall_keys: Vec<String> = vec![
1341 fctx.core(),
1342 fctx.members(),
1343 fidx.flow_index(),
1344 ectx.core(),
1345 ];
1346
1347 let fcall_args: Vec<String> = vec![
1349 args.flow_id.to_string(),
1350 args.execution_id.to_string(),
1351 args.now.to_string(),
1352 ];
1353
1354 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1355 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1356
1357 let raw: Value = self
1358 .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1359 .await?;
1360
1361 parse_add_execution_to_flow_result(&raw)
1362 }
1363
1364 pub async fn cancel_flow(
1406 &self,
1407 args: &CancelFlowArgs,
1408 ) -> Result<CancelFlowResult, ServerError> {
1409 self.cancel_flow_inner(args, false).await
1410 }
1411
1412 pub async fn cancel_flow_wait(
1416 &self,
1417 args: &CancelFlowArgs,
1418 ) -> Result<CancelFlowResult, ServerError> {
1419 self.cancel_flow_inner(args, true).await
1420 }
1421
1422 async fn cancel_flow_inner(
1423 &self,
1424 args: &CancelFlowArgs,
1425 wait: bool,
1426 ) -> Result<CancelFlowResult, ServerError> {
1427 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1428 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1429 let fidx = FlowIndexKeys::new(&partition);
1430
1431 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1433
1434 let fcall_args: Vec<String> = vec![
1436 args.flow_id.to_string(),
1437 args.reason.clone(),
1438 args.cancellation_policy.clone(),
1439 args.now.to_string(),
1440 ];
1441
1442 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1443 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1444
1445 let raw: Value = self
1446 .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1447 .await?;
1448
1449 let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1450 ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1451 (policy, member_execution_ids)
1452 }
1453 ParsedCancelFlow::AlreadyTerminal => {
1459 let flow_meta: Vec<Option<String>> = self
1460 .client
1461 .cmd("HMGET")
1462 .arg(fctx.core())
1463 .arg("cancellation_policy")
1464 .arg("cancel_reason")
1465 .execute()
1466 .await
1467 .map_err(|e| ServerError::ValkeyContext {
1468 source: e,
1469 context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
1470 })?;
1471 let stored_policy = flow_meta
1472 .first()
1473 .and_then(|v| v.as_ref())
1474 .filter(|s| !s.is_empty())
1475 .cloned();
1476 let stored_reason = flow_meta
1477 .get(1)
1478 .and_then(|v| v.as_ref())
1479 .filter(|s| !s.is_empty())
1480 .cloned();
1481 let all_members: Vec<String> = self
1482 .client
1483 .cmd("SMEMBERS")
1484 .arg(fctx.members())
1485 .execute()
1486 .await
1487 .map_err(|e| ServerError::ValkeyContext {
1488 source: e,
1489 context: "SMEMBERS flow members (already terminal)".into(),
1490 })?;
1491 let total_members = all_members.len();
1498 let stored_members: Vec<String> = all_members
1499 .into_iter()
1500 .take(ALREADY_TERMINAL_MEMBER_CAP)
1501 .collect();
1502 tracing::debug!(
1503 flow_id = %args.flow_id,
1504 stored_policy = stored_policy.as_deref().unwrap_or(""),
1505 stored_reason = stored_reason.as_deref().unwrap_or(""),
1506 total_members,
1507 returned_members = stored_members.len(),
1508 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1509 );
1510 return Ok(CancelFlowResult::Cancelled {
1511 cancellation_policy: stored_policy
1515 .unwrap_or_else(|| args.cancellation_policy.clone()),
1516 member_execution_ids: stored_members,
1517 });
1518 }
1519 };
1520 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1521
1522 if !needs_dispatch {
1523 return Ok(CancelFlowResult::Cancelled {
1524 cancellation_policy: policy,
1525 member_execution_ids: members,
1526 });
1527 }
1528
1529 if wait {
1530 for eid_str in &members {
1532 if let Err(e) = cancel_member_execution(
1533 &self.client,
1534 &self.config.partition_config,
1535 eid_str,
1536 &args.reason,
1537 args.now,
1538 )
1539 .await
1540 {
1541 tracing::warn!(
1542 execution_id = %eid_str,
1543 error = %e,
1544 "cancel_flow(wait): individual execution cancel failed (may be terminal)"
1545 );
1546 }
1547 }
1548 return Ok(CancelFlowResult::Cancelled {
1549 cancellation_policy: policy,
1550 member_execution_ids: members,
1551 });
1552 }
1553
1554 let client = self.client.clone();
1557 let partition_config = self.config.partition_config;
1558 let reason = args.reason.clone();
1559 let now = args.now;
1560 let dispatch_members = members.clone();
1561 let flow_id = args.flow_id.clone();
1562 let mut guard = self.background_tasks.lock().await;
1568
1569 while let Some(joined) = guard.try_join_next() {
1576 if let Err(e) = joined {
1577 tracing::warn!(
1578 error = %e,
1579 "cancel_flow: background dispatch task panicked or was aborted"
1580 );
1581 }
1582 }
1583
1584 guard.spawn(async move {
1585 use futures::stream::StreamExt;
1592 const CONCURRENCY: usize = 16;
1593
1594 let member_count = dispatch_members.len();
1595 let flow_id_for_log = flow_id.clone();
1596 futures::stream::iter(dispatch_members)
1597 .map(|eid_str| {
1598 let client = client.clone();
1599 let reason = reason.clone();
1600 let flow_id = flow_id.clone();
1601 async move {
1602 if let Err(e) = cancel_member_execution(
1603 &client,
1604 &partition_config,
1605 &eid_str,
1606 &reason,
1607 now,
1608 )
1609 .await
1610 {
1611 tracing::warn!(
1612 flow_id = %flow_id,
1613 execution_id = %eid_str,
1614 error = %e,
1615 "cancel_flow(async): individual execution cancel failed (may be terminal)"
1616 );
1617 }
1618 }
1619 })
1620 .buffer_unordered(CONCURRENCY)
1621 .for_each(|()| async {})
1622 .await;
1623
1624 tracing::debug!(
1625 flow_id = %flow_id_for_log,
1626 member_count,
1627 concurrency = CONCURRENCY,
1628 "cancel_flow: background member dispatch complete"
1629 );
1630 });
1631 drop(guard);
1632
1633 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1634 Ok(CancelFlowResult::CancellationScheduled {
1635 cancellation_policy: policy,
1636 member_count,
1637 member_execution_ids: members,
1638 })
1639 }
1640
1641 pub async fn stage_dependency_edge(
1646 &self,
1647 args: &StageDependencyEdgeArgs,
1648 ) -> Result<StageDependencyEdgeResult, ServerError> {
1649 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1650 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1651
1652 let fcall_keys: Vec<String> = vec![
1654 fctx.core(),
1655 fctx.members(),
1656 fctx.edge(&args.edge_id),
1657 fctx.outgoing(&args.upstream_execution_id),
1658 fctx.incoming(&args.downstream_execution_id),
1659 fctx.grant(&args.edge_id.to_string()),
1660 ];
1661
1662 let fcall_args: Vec<String> = vec![
1665 args.flow_id.to_string(),
1666 args.edge_id.to_string(),
1667 args.upstream_execution_id.to_string(),
1668 args.downstream_execution_id.to_string(),
1669 args.dependency_kind.clone(),
1670 args.data_passing_ref.clone().unwrap_or_default(),
1671 args.expected_graph_revision.to_string(),
1672 args.now.to_string(),
1673 ];
1674
1675 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1676 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1677
1678 let raw: Value = self
1679 .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1680 .await?;
1681
1682 parse_stage_dependency_edge_result(&raw)
1683 }
1684
1685 pub async fn apply_dependency_to_child(
1690 &self,
1691 args: &ApplyDependencyToChildArgs,
1692 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1693 let partition = execution_partition(
1694 &args.downstream_execution_id,
1695 &self.config.partition_config,
1696 );
1697 let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1698 let idx = IndexKeys::new(&partition);
1699
1700 let lane_str: Option<String> = self
1702 .client
1703 .hget(&ctx.core(), "lane_id")
1704 .await
1705 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1706 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1707
1708 let fcall_keys: Vec<String> = vec![
1711 ctx.core(),
1712 ctx.deps_meta(),
1713 ctx.deps_unresolved(),
1714 ctx.dep_edge(&args.edge_id),
1715 idx.lane_eligible(&lane),
1716 idx.lane_blocked_dependencies(&lane),
1717 ctx.deps_all_edges(),
1718 ];
1719
1720 let fcall_args: Vec<String> = vec![
1723 args.flow_id.to_string(),
1724 args.edge_id.to_string(),
1725 args.upstream_execution_id.to_string(),
1726 args.graph_revision.to_string(),
1727 args.dependency_kind.clone(),
1728 args.data_passing_ref.clone().unwrap_or_default(),
1729 args.now.to_string(),
1730 ];
1731
1732 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1733 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1734
1735 let raw: Value = self
1736 .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1737 .await?;
1738
1739 parse_apply_dependency_result(&raw)
1740 }
1741
1742 pub async fn deliver_signal(
1749 &self,
1750 args: &DeliverSignalArgs,
1751 ) -> Result<DeliverSignalResult, ServerError> {
1752 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1753 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1754 let idx = IndexKeys::new(&partition);
1755
1756 let lane_str: Option<String> = self
1758 .client
1759 .hget(&ctx.core(), "lane_id")
1760 .await
1761 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1762 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1763
1764 let wp_id = &args.waitpoint_id;
1765 let sig_id = &args.signal_id;
1766 let idem_key = args
1767 .idempotency_key
1768 .as_ref()
1769 .filter(|k| !k.is_empty())
1770 .map(|k| ctx.signal_dedup(wp_id, k))
1771 .unwrap_or_else(|| ctx.noop());
1772
1773 let fcall_keys: Vec<String> = vec![
1779 ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
1794
1795 let fcall_args: Vec<String> = vec![
1802 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
1810 .map(|p| String::from_utf8_lossy(p).into_owned())
1811 .unwrap_or_default(), args.payload_encoding
1813 .clone()
1814 .unwrap_or_else(|| "json".to_owned()), args.idempotency_key
1816 .clone()
1817 .unwrap_or_default(), args.correlation_id
1819 .clone()
1820 .unwrap_or_default(), args.target_scope.clone(), args.created_at
1823 .map(|ts| ts.to_string())
1824 .unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
1829 .unwrap_or(10_000)
1830 .to_string(), args.waitpoint_token.as_str().to_owned(), ];
1835
1836 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1837 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1838
1839 let raw: Value = self
1840 .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
1841 .await?;
1842
1843 parse_deliver_signal_result(&raw, &args.signal_id)
1844 }
1845
1846 pub async fn change_priority(
1850 &self,
1851 execution_id: &ExecutionId,
1852 new_priority: i32,
1853 ) -> Result<ChangePriorityResult, ServerError> {
1854 let partition = execution_partition(execution_id, &self.config.partition_config);
1855 let ctx = ExecKeyContext::new(&partition, execution_id);
1856 let idx = IndexKeys::new(&partition);
1857
1858 let lane_str: Option<String> = self
1860 .client
1861 .hget(&ctx.core(), "lane_id")
1862 .await
1863 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1864 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1865
1866 let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
1868
1869 let fcall_args: Vec<String> = vec![
1871 execution_id.to_string(),
1872 new_priority.to_string(),
1873 ];
1874
1875 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1876 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1877
1878 let raw: Value = self
1879 .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
1880 .await?;
1881
1882 parse_change_priority_result(&raw, execution_id)
1883 }
1884
1885 pub async fn claim_for_worker(
1899 &self,
1900 lane: &LaneId,
1901 worker_id: &WorkerId,
1902 worker_instance_id: &WorkerInstanceId,
1903 worker_capabilities: &std::collections::BTreeSet<String>,
1904 grant_ttl_ms: u64,
1905 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1906 let scheduler = ff_scheduler::Scheduler::new(
1907 self.client.clone(),
1908 self.config.partition_config,
1909 );
1910 scheduler
1911 .claim_for_worker(
1912 lane,
1913 worker_id,
1914 worker_instance_id,
1915 worker_capabilities,
1916 grant_ttl_ms,
1917 )
1918 .await
1919 .map_err(|e| match e {
1920 ff_scheduler::SchedulerError::Valkey(inner) => {
1921 ServerError::Valkey(inner)
1922 }
1923 ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
1924 ServerError::ValkeyContext { source, context }
1925 }
1926 ff_scheduler::SchedulerError::Config(msg) => {
1927 ServerError::InvalidInput(msg)
1928 }
1929 })
1930 }
1931
1932 pub async fn revoke_lease(
1934 &self,
1935 execution_id: &ExecutionId,
1936 ) -> Result<RevokeLeaseResult, ServerError> {
1937 let partition = execution_partition(execution_id, &self.config.partition_config);
1938 let ctx = ExecKeyContext::new(&partition, execution_id);
1939 let idx = IndexKeys::new(&partition);
1940
1941 let wiid_str: Option<String> = self
1943 .client
1944 .hget(&ctx.core(), "current_worker_instance_id")
1945 .await
1946 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
1947 let wiid = match wiid_str {
1948 Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
1949 _ => {
1950 return Err(ServerError::NotFound(format!(
1951 "no active lease for execution {execution_id} (no current_worker_instance_id)"
1952 )));
1953 }
1954 };
1955
1956 let fcall_keys: Vec<String> = vec![
1958 ctx.core(),
1959 ctx.lease_current(),
1960 ctx.lease_history(),
1961 idx.lease_expiry(),
1962 idx.worker_leases(&wiid),
1963 ];
1964
1965 let fcall_args: Vec<String> = vec![
1967 execution_id.to_string(),
1968 String::new(), "operator_revoke".to_owned(),
1970 ];
1971
1972 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1973 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1974
1975 let raw: Value = self
1976 .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
1977 .await?;
1978
1979 parse_revoke_lease_result(&raw)
1980 }
1981
1982 pub async fn get_execution(
1984 &self,
1985 execution_id: &ExecutionId,
1986 ) -> Result<ExecutionInfo, ServerError> {
1987 let partition = execution_partition(execution_id, &self.config.partition_config);
1988 let ctx = ExecKeyContext::new(&partition, execution_id);
1989
1990 let fields: HashMap<String, String> = self
1991 .client
1992 .hgetall(&ctx.core())
1993 .await
1994 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
1995
1996 if fields.is_empty() {
1997 return Err(ServerError::NotFound(format!(
1998 "execution not found: {execution_id}"
1999 )));
2000 }
2001
2002 let parse_enum = |field: &str| -> String {
2003 fields.get(field).cloned().unwrap_or_default()
2004 };
2005 fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2006 let quoted = format!("\"{raw}\"");
2007 serde_json::from_str("ed).map_err(|e| {
2008 ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2009 })
2010 }
2011
2012 let lp_str = parse_enum("lifecycle_phase");
2013 let os_str = parse_enum("ownership_state");
2014 let es_str = parse_enum("eligibility_state");
2015 let br_str = parse_enum("blocking_reason");
2016 let to_str = parse_enum("terminal_outcome");
2017 let as_str = parse_enum("attempt_state");
2018 let ps_str = parse_enum("public_state");
2019
2020 let state_vector = StateVector {
2021 lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2022 ownership_state: deserialize("ownership_state", &os_str)?,
2023 eligibility_state: deserialize("eligibility_state", &es_str)?,
2024 blocking_reason: deserialize("blocking_reason", &br_str)?,
2025 terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2026 attempt_state: deserialize("attempt_state", &as_str)?,
2027 public_state: deserialize("public_state", &ps_str)?,
2028 };
2029
2030 let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2037
2038 let started_at_opt = fields
2045 .get("started_at")
2046 .filter(|s| !s.is_empty())
2047 .cloned();
2048 let completed_at_opt = fields
2049 .get("completed_at")
2050 .filter(|s| !s.is_empty())
2051 .cloned();
2052
2053 Ok(ExecutionInfo {
2054 execution_id: execution_id.clone(),
2055 namespace: parse_enum("namespace"),
2056 lane_id: parse_enum("lane_id"),
2057 priority: fields
2058 .get("priority")
2059 .and_then(|v| v.parse().ok())
2060 .unwrap_or(0),
2061 execution_kind: parse_enum("execution_kind"),
2062 state_vector,
2063 public_state: deserialize("public_state", &ps_str)?,
2064 created_at: parse_enum("created_at"),
2065 started_at: started_at_opt,
2066 completed_at: completed_at_opt,
2067 current_attempt_index: fields
2068 .get("current_attempt_index")
2069 .and_then(|v| v.parse().ok())
2070 .unwrap_or(0),
2071 flow_id: flow_id_val,
2072 blocking_detail: parse_enum("blocking_detail"),
2073 })
2074 }
2075
2076 pub async fn list_executions(
2080 &self,
2081 partition_id: u16,
2082 lane: &LaneId,
2083 state_filter: &str,
2084 offset: u64,
2085 limit: u64,
2086 ) -> Result<ListExecutionsResult, ServerError> {
2087 let partition = ff_core::partition::Partition {
2088 family: ff_core::partition::PartitionFamily::Execution,
2089 index: partition_id,
2090 };
2091 let idx = IndexKeys::new(&partition);
2092
2093 let zset_key = match state_filter {
2094 "eligible" => idx.lane_eligible(lane),
2095 "delayed" => idx.lane_delayed(lane),
2096 "terminal" => idx.lane_terminal(lane),
2097 "suspended" => idx.lane_suspended(lane),
2098 "active" => idx.lane_active(lane),
2099 other => {
2100 return Err(ServerError::InvalidInput(format!(
2101 "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2102 )));
2103 }
2104 };
2105
2106 let eids: Vec<String> = self
2108 .client
2109 .cmd("ZRANGE")
2110 .arg(&zset_key)
2111 .arg("-inf")
2112 .arg("+inf")
2113 .arg("BYSCORE")
2114 .arg("LIMIT")
2115 .arg(offset)
2116 .arg(limit)
2117 .execute()
2118 .await
2119 .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
2120
2121 if eids.is_empty() {
2122 return Ok(ListExecutionsResult {
2123 executions: vec![],
2124 total_returned: 0,
2125 });
2126 }
2127
2128 let mut parsed = Vec::with_capacity(eids.len());
2130 for eid_str in &eids {
2131 match ExecutionId::parse(eid_str) {
2132 Ok(id) => parsed.push(id),
2133 Err(e) => {
2134 tracing::warn!(
2135 raw_id = %eid_str,
2136 error = %e,
2137 zset = %zset_key,
2138 "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2139 );
2140 }
2141 }
2142 }
2143
2144 if parsed.is_empty() {
2145 return Ok(ListExecutionsResult {
2146 executions: vec![],
2147 total_returned: 0,
2148 });
2149 }
2150
2151 let mut pipe = self.client.pipeline();
2153 let mut slots = Vec::with_capacity(parsed.len());
2154 for eid in &parsed {
2155 let ep = execution_partition(eid, &self.config.partition_config);
2156 let ctx = ExecKeyContext::new(&ep, eid);
2157 let slot = pipe
2158 .cmd::<Vec<Option<String>>>("HMGET")
2159 .arg(ctx.core())
2160 .arg("namespace")
2161 .arg("lane_id")
2162 .arg("execution_kind")
2163 .arg("public_state")
2164 .arg("priority")
2165 .arg("created_at")
2166 .finish();
2167 slots.push(slot);
2168 }
2169
2170 pipe.execute()
2171 .await
2172 .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
2173
2174 let mut summaries = Vec::with_capacity(parsed.len());
2175 for (eid, slot) in parsed.into_iter().zip(slots) {
2176 let fields: Vec<Option<String>> = slot.value()
2177 .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
2178
2179 let field = |i: usize| -> String {
2180 fields
2181 .get(i)
2182 .and_then(|v| v.as_ref())
2183 .cloned()
2184 .unwrap_or_default()
2185 };
2186
2187 summaries.push(ExecutionSummary {
2188 execution_id: eid,
2189 namespace: field(0),
2190 lane_id: field(1),
2191 execution_kind: field(2),
2192 public_state: field(3),
2193 priority: field(4).parse().unwrap_or(0),
2194 created_at: field(5),
2195 });
2196 }
2197
2198 let total = summaries.len();
2199 Ok(ListExecutionsResult {
2200 executions: summaries,
2201 total_returned: total,
2202 })
2203 }
2204
2205 pub async fn replay_execution(
2210 &self,
2211 execution_id: &ExecutionId,
2212 ) -> Result<ReplayExecutionResult, ServerError> {
2213 let partition = execution_partition(execution_id, &self.config.partition_config);
2214 let ctx = ExecKeyContext::new(&partition, execution_id);
2215 let idx = IndexKeys::new(&partition);
2216
2217 let dyn_fields: Vec<Option<String>> = self
2229 .client
2230 .cmd("HMGET")
2231 .arg(ctx.core())
2232 .arg("lane_id")
2233 .arg("flow_id")
2234 .arg("terminal_outcome")
2235 .execute()
2236 .await
2237 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
2238 let lane = LaneId::new(
2239 dyn_fields
2240 .first()
2241 .and_then(|v| v.as_ref())
2242 .cloned()
2243 .unwrap_or_else(|| "default".to_owned()),
2244 );
2245 let flow_id_str = dyn_fields
2246 .get(1)
2247 .and_then(|v| v.as_ref())
2248 .cloned()
2249 .unwrap_or_default();
2250 let terminal_outcome = dyn_fields
2251 .get(2)
2252 .and_then(|v| v.as_ref())
2253 .cloned()
2254 .unwrap_or_default();
2255
2256 let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2257
2258 let mut fcall_keys: Vec<String> = vec![
2260 ctx.core(),
2261 idx.lane_terminal(&lane),
2262 idx.lane_eligible(&lane),
2263 ctx.lease_history(),
2264 ];
2265
2266 let now = TimestampMs::now();
2268 let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2269
2270 if is_skipped_flow_member {
2271 let flow_id = FlowId::parse(&flow_id_str)
2275 .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2276 let flow_part =
2277 flow_partition(&flow_id, &self.config.partition_config);
2278 let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2279 let edge_ids: Vec<String> = self
2280 .client
2281 .cmd("SMEMBERS")
2282 .arg(flow_ctx.incoming(execution_id))
2283 .execute()
2284 .await
2285 .map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
2286
2287 fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
2292 let edge_id = EdgeId::parse(eid_str)
2293 .unwrap_or_else(|_| EdgeId::new());
2294 fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
2297 }
2298
2299 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2300 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2301
2302 let raw: Value = self
2303 .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2304 .await?;
2305
2306 parse_replay_result(&raw)
2307 }
2308
2309 pub async fn read_attempt_stream(
2321 &self,
2322 execution_id: &ExecutionId,
2323 attempt_index: AttemptIndex,
2324 from_id: &str,
2325 to_id: &str,
2326 count_limit: u64,
2327 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2328 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2329
2330 if count_limit == 0 {
2331 return Err(ServerError::InvalidInput(
2332 "count_limit must be >= 1".to_owned(),
2333 ));
2334 }
2335
2336 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2341 Ok(p) => p,
2342 Err(tokio::sync::TryAcquireError::NoPermits) => {
2343 return Err(ServerError::ConcurrencyLimitExceeded(
2344 "stream_ops",
2345 self.config.max_concurrent_stream_ops,
2346 ));
2347 }
2348 Err(tokio::sync::TryAcquireError::Closed) => {
2349 return Err(ServerError::OperationFailed(
2350 "stream semaphore closed (server shutting down)".into(),
2351 ));
2352 }
2353 };
2354
2355 let args = ReadFramesArgs {
2356 execution_id: execution_id.clone(),
2357 attempt_index,
2358 from_id: from_id.to_owned(),
2359 to_id: to_id.to_owned(),
2360 count_limit,
2361 };
2362
2363 let partition = execution_partition(execution_id, &self.config.partition_config);
2364 let ctx = ExecKeyContext::new(&partition, execution_id);
2365 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2366
2367 let result = ff_script::functions::stream::ff_read_attempt_stream(
2371 &self.tail_client, &keys, &args,
2372 )
2373 .await
2374 .map_err(script_error_to_server);
2375
2376 drop(permit);
2377
2378 match result? {
2379 ReadFramesResult::Frames(f) => Ok(f),
2380 }
2381 }
2382
2383 pub async fn tail_attempt_stream(
2401 &self,
2402 execution_id: &ExecutionId,
2403 attempt_index: AttemptIndex,
2404 last_id: &str,
2405 block_ms: u64,
2406 count_limit: u64,
2407 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2408 if count_limit == 0 {
2409 return Err(ServerError::InvalidInput(
2410 "count_limit must be >= 1".to_owned(),
2411 ));
2412 }
2413
2414 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2430 Ok(p) => p,
2431 Err(tokio::sync::TryAcquireError::NoPermits) => {
2432 return Err(ServerError::ConcurrencyLimitExceeded(
2433 "stream_ops",
2434 self.config.max_concurrent_stream_ops,
2435 ));
2436 }
2437 Err(tokio::sync::TryAcquireError::Closed) => {
2438 return Err(ServerError::OperationFailed(
2439 "stream semaphore closed (server shutting down)".into(),
2440 ));
2441 }
2442 };
2443
2444 let partition = execution_partition(execution_id, &self.config.partition_config);
2445 let ctx = ExecKeyContext::new(&partition, execution_id);
2446 let stream_key = ctx.stream(attempt_index);
2447 let stream_meta_key = ctx.stream_meta(attempt_index);
2448
2449 let _xread_guard = self.xread_block_lock.lock().await;
2457
2458 let result = ff_script::stream_tail::xread_block(
2459 &self.tail_client,
2460 &stream_key,
2461 &stream_meta_key,
2462 last_id,
2463 block_ms,
2464 count_limit,
2465 )
2466 .await
2467 .map_err(script_error_to_server);
2468
2469 drop(_xread_guard);
2470 drop(permit);
2471 result
2472 }
2473
2474 pub async fn shutdown(self) {
2497 tracing::info!("shutting down FlowFabric server");
2498
2499 self.stream_semaphore.close();
2504 tracing::info!(
2505 "stream semaphore closed; no new read/tail attempts will be accepted"
2506 );
2507
2508 let drain_timeout = Duration::from_secs(15);
2512 let background = self.background_tasks.clone();
2513 let drain = async move {
2514 let mut guard = background.lock().await;
2515 while guard.join_next().await.is_some() {}
2516 };
2517 match tokio::time::timeout(drain_timeout, drain).await {
2518 Ok(()) => {}
2519 Err(_) => {
2520 tracing::warn!(
2521 timeout_s = drain_timeout.as_secs(),
2522 "shutdown: background tasks did not finish in time, aborting"
2523 );
2524 self.background_tasks.lock().await.abort_all();
2525 }
2526 }
2527
2528 self.engine.shutdown().await;
2529 tracing::info!("FlowFabric server shutdown complete");
2530 }
2531}
2532
2533const REQUIRED_VALKEY_MAJOR: u32 = 8;
2538
2539const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2544
2545async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2570 let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2571 let mut backoff = Duration::from_millis(200);
2572 loop {
2573 let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2574 match query_valkey_version(client).await {
2575 Ok(detected_major) if detected_major >= REQUIRED_VALKEY_MAJOR => {
2576 tracing::info!(
2577 detected_major,
2578 required = REQUIRED_VALKEY_MAJOR,
2579 "Valkey version accepted"
2580 );
2581 return Ok(());
2582 }
2583 Ok(detected_major) => (
2584 true,
2588 ServerError::ValkeyVersionTooLow {
2589 detected: detected_major.to_string(),
2590 required: format!("{REQUIRED_VALKEY_MAJOR}.0"),
2591 },
2592 format!("detected_major={detected_major} < required={REQUIRED_VALKEY_MAJOR}"),
2593 ),
2594 Err(e) => {
2595 let retryable = e
2600 .valkey_kind()
2601 .map(ff_script::retry::is_retryable_kind)
2602 .unwrap_or(true);
2606 let detail = e.to_string();
2607 (retryable, e, detail)
2608 }
2609 };
2610
2611 if !should_retry {
2612 return Err(err_for_budget_exhaust);
2613 }
2614 if tokio::time::Instant::now() >= deadline {
2615 return Err(err_for_budget_exhaust);
2616 }
2617 tracing::warn!(
2618 backoff_ms = backoff.as_millis() as u64,
2619 detail = %log_detail,
2620 "valkey version check transient failure; retrying"
2621 );
2622 tokio::time::sleep(backoff).await;
2623 backoff = (backoff * 2).min(Duration::from_secs(5));
2624 }
2625}
2626
2627async fn query_valkey_version(client: &Client) -> Result<u32, ServerError> {
2639 let raw: Value = client
2640 .cmd("INFO")
2641 .arg("server")
2642 .execute()
2643 .await
2644 .map_err(|e| ServerError::ValkeyContext {
2645 source: e,
2646 context: "INFO server".into(),
2647 })?;
2648 let info_body = extract_info_body(&raw)?;
2649 parse_valkey_major_version(&info_body)
2650}
2651
2652fn extract_info_body(raw: &Value) -> Result<String, ServerError> {
2659 match raw {
2660 Value::BulkString(bytes) => Ok(String::from_utf8_lossy(bytes).into_owned()),
2661 Value::VerbatimString { text, .. } => Ok(text.clone()),
2662 Value::SimpleString(s) => Ok(s.clone()),
2663 Value::Map(entries) => {
2664 let (_, body) = entries.first().ok_or_else(|| {
2668 ServerError::OperationFailed(
2669 "valkey version check: cluster INFO returned empty map".into(),
2670 )
2671 })?;
2672 extract_info_body(body)
2673 }
2674 other => Err(ServerError::OperationFailed(format!(
2675 "valkey version check: unexpected INFO shape: {other:?}"
2676 ))),
2677 }
2678}
2679
2680fn parse_valkey_major_version(info: &str) -> Result<u32, ServerError> {
2689 let extract_major = |line: &str| -> Result<u32, ServerError> {
2690 let trimmed = line.trim();
2691 let major_str = trimmed.split('.').next().unwrap_or("").trim();
2692 if major_str.is_empty() {
2693 return Err(ServerError::OperationFailed(format!(
2694 "valkey version check: empty version field in '{trimmed}'"
2695 )));
2696 }
2697 major_str.parse::<u32>().map_err(|_| {
2698 ServerError::OperationFailed(format!(
2699 "valkey version check: non-numeric major in '{trimmed}'"
2700 ))
2701 })
2702 };
2703 if let Some(valkey_line) = info
2705 .lines()
2706 .find_map(|line| line.strip_prefix("valkey_version:"))
2707 {
2708 return extract_major(valkey_line);
2709 }
2710 if let Some(redis_line) = info
2713 .lines()
2714 .find_map(|line| line.strip_prefix("redis_version:"))
2715 {
2716 return extract_major(redis_line);
2717 }
2718 Err(ServerError::OperationFailed(
2719 "valkey version check: INFO missing valkey_version and redis_version".into(),
2720 ))
2721}
2722
2723async fn validate_or_create_partition_config(
2730 client: &Client,
2731 config: &PartitionConfig,
2732) -> Result<(), ServerError> {
2733 let key = keys::global_config_partitions();
2734
2735 let existing: HashMap<String, String> = client
2736 .hgetall(&key)
2737 .await
2738 .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
2739
2740 if existing.is_empty() {
2741 tracing::info!("first boot: creating {key}");
2743 client
2744 .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
2745 .await
2746 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
2747 client
2748 .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
2749 .await
2750 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
2751 client
2752 .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
2753 .await
2754 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
2755 return Ok(());
2756 }
2757
2758 let check = |field: &str, expected: u16| -> Result<(), ServerError> {
2760 let stored: u16 = existing
2761 .get(field)
2762 .and_then(|v| v.parse().ok())
2763 .unwrap_or(0);
2764 if stored != expected {
2765 return Err(ServerError::PartitionMismatch(format!(
2766 "{field}: stored={stored}, config={expected}. \
2767 Partition counts are fixed at deployment time. \
2768 Either fix your config or migrate the data."
2769 )));
2770 }
2771 Ok(())
2772 };
2773
2774 check("num_flow_partitions", config.num_flow_partitions)?;
2775 check("num_budget_partitions", config.num_budget_partitions)?;
2776 check("num_quota_partitions", config.num_quota_partitions)?;
2777
2778 tracing::info!("partition config validated against stored {key}");
2779 Ok(())
2780}
2781
2782const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
2788
2789enum PartitionBootOutcome {
2792 Match,
2794 Mismatch,
2796 Repaired,
2798 Installed,
2800}
2801
2802const BOOT_INIT_CONCURRENCY: usize = 16;
2807
2808async fn init_one_partition(
2809 client: &Client,
2810 partition: Partition,
2811 secret_hex: &str,
2812) -> Result<PartitionBootOutcome, ServerError> {
2813 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
2814
2815 let stored_kid: Option<String> = client
2823 .cmd("HGET")
2824 .arg(&key)
2825 .arg("current_kid")
2826 .execute()
2827 .await
2828 .map_err(|e| ServerError::ValkeyContext {
2829 source: e,
2830 context: format!("HGET {key} current_kid (init probe)"),
2831 })?;
2832
2833 if let Some(stored_kid) = stored_kid {
2834 let field = format!("secret:{stored_kid}");
2838 let stored_secret: Option<String> = client
2839 .hget(&key, &field)
2840 .await
2841 .map_err(|e| ServerError::ValkeyContext {
2842 source: e,
2843 context: format!("HGET {key} secret:<kid> (init check)"),
2844 })?;
2845 if stored_secret.is_none() {
2846 client
2852 .hset(&key, &field, secret_hex)
2853 .await
2854 .map_err(|e| ServerError::ValkeyContext {
2855 source: e,
2856 context: format!("HSET {key} secret:<kid> (repair torn write)"),
2857 })?;
2858 return Ok(PartitionBootOutcome::Repaired);
2859 }
2860 if stored_secret.as_deref() != Some(secret_hex) {
2861 return Ok(PartitionBootOutcome::Mismatch);
2862 }
2863 return Ok(PartitionBootOutcome::Match);
2864 }
2865
2866 let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
2870 let _: i64 = client
2871 .cmd("HSET")
2872 .arg(&key)
2873 .arg("current_kid")
2874 .arg(WAITPOINT_HMAC_INITIAL_KID)
2875 .arg(&secret_field)
2876 .arg(secret_hex)
2877 .execute()
2878 .await
2879 .map_err(|e| ServerError::ValkeyContext {
2880 source: e,
2881 context: format!("HSET {key} (init waitpoint HMAC atomic)"),
2882 })?;
2883 Ok(PartitionBootOutcome::Installed)
2884}
2885
2886async fn initialize_waitpoint_hmac_secret(
2898 client: &Client,
2899 partition_config: &PartitionConfig,
2900 secret_hex: &str,
2901) -> Result<(), ServerError> {
2902 use futures::stream::{FuturesUnordered, StreamExt};
2903
2904 let n = partition_config.num_flow_partitions;
2905 tracing::info!(
2906 partitions = n,
2907 concurrency = BOOT_INIT_CONCURRENCY,
2908 "installing waitpoint HMAC secret across {n} execution partitions"
2909 );
2910
2911 let mut mismatch_count: u16 = 0;
2912 let mut repaired_count: u16 = 0;
2913 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
2914 let mut next_index: u16 = 0;
2915
2916 loop {
2917 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
2918 let partition = Partition {
2919 family: PartitionFamily::Execution,
2920 index: next_index,
2921 };
2922 let client = client.clone();
2923 let secret_hex = secret_hex.to_owned();
2924 pending.push(async move {
2925 init_one_partition(&client, partition, &secret_hex).await
2926 });
2927 next_index += 1;
2928 }
2929 match pending.next().await {
2930 Some(res) => match res? {
2931 PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
2932 PartitionBootOutcome::Mismatch => mismatch_count += 1,
2933 PartitionBootOutcome::Repaired => repaired_count += 1,
2934 },
2935 None => break,
2936 }
2937 }
2938
2939 if repaired_count > 0 {
2940 tracing::warn!(
2941 repaired_partitions = repaired_count,
2942 total_partitions = n,
2943 "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
2944 (current_kid present but secret:<kid> missing, likely crash during prior boot)"
2945 );
2946 }
2947
2948 if mismatch_count > 0 {
2949 tracing::warn!(
2950 mismatched_partitions = mismatch_count,
2951 total_partitions = n,
2952 "stored/env secret mismatch on {mismatch_count} partitions — \
2953 env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
2954 run POST /v1/admin/rotate-waitpoint-secret to sync"
2955 );
2956 }
2957
2958 tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
2959 Ok(())
2960}
2961
2962#[derive(Debug, Clone, serde::Serialize)]
2964pub struct RotateWaitpointSecretResult {
2965 pub rotated: u16,
2967 pub failed: Vec<u16>,
2972 #[serde(default)]
2978 pub in_progress: Vec<u16>,
2979 pub new_kid: String,
2981}
2982
2983enum RotationStepOutcome {
2987 Rotated,
2988 InProgress,
2989}
2990
2991impl Server {
2992 pub async fn rotate_waitpoint_secret(
3000 &self,
3001 new_kid: &str,
3002 new_secret_hex: &str,
3003 ) -> Result<RotateWaitpointSecretResult, ServerError> {
3004 if new_kid.is_empty() || new_kid.contains(':') {
3005 return Err(ServerError::OperationFailed(
3006 "new_kid must be non-empty and must not contain ':'".into(),
3007 ));
3008 }
3009 if new_secret_hex.is_empty()
3010 || !new_secret_hex.len().is_multiple_of(2)
3011 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3012 {
3013 return Err(ServerError::OperationFailed(
3014 "new_secret_hex must be a non-empty even-length hex string".into(),
3015 ));
3016 }
3017
3018 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3026 Ok(p) => p,
3027 Err(tokio::sync::TryAcquireError::NoPermits) => {
3028 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3029 }
3030 Err(tokio::sync::TryAcquireError::Closed) => {
3031 return Err(ServerError::OperationFailed(
3032 "admin rotate semaphore closed (server shutting down)".into(),
3033 ));
3034 }
3035 };
3036
3037 let n = self.config.partition_config.num_flow_partitions;
3038 let grace_ms = self.config.waitpoint_hmac_grace_ms;
3039 let now_ms = std::time::SystemTime::now()
3040 .duration_since(std::time::UNIX_EPOCH)
3041 .expect("system clock before unix epoch")
3042 .as_millis() as i64;
3043 let previous_expires_at = now_ms + grace_ms as i64;
3044
3045 use futures::stream::{FuturesUnordered, StreamExt};
3056
3057 let mut rotated = 0u16;
3058 let mut failed = Vec::new();
3059 let mut in_progress: Vec<u16> = Vec::new();
3060 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3061 let mut next_index: u16 = 0;
3062
3063 loop {
3064 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3065 let partition = Partition {
3066 family: PartitionFamily::Execution,
3067 index: next_index,
3068 };
3069 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3070 let idx = next_index;
3071 let new_kid_owned = new_kid.to_owned();
3076 let new_secret_owned = new_secret_hex.to_owned();
3077 let fut = async move {
3081 let outcome = self
3082 .rotate_single_partition(
3083 &key,
3084 &new_kid_owned,
3085 &new_secret_owned,
3086 previous_expires_at,
3087 )
3088 .await;
3089 (idx, partition, outcome)
3090 };
3091 pending.push(fut);
3092 next_index += 1;
3093 }
3094 match pending.next().await {
3095 Some((idx, partition, outcome)) => match outcome {
3096 Ok(RotationStepOutcome::Rotated) => {
3097 rotated += 1;
3098 tracing::debug!(
3106 partition = %partition,
3107 new_kid = %new_kid,
3108 "waitpoint_hmac_rotated"
3109 );
3110 }
3111 Ok(RotationStepOutcome::InProgress) => {
3112 in_progress.push(idx);
3117 tracing::debug!(
3118 partition = %partition,
3119 new_kid = %new_kid,
3120 "waitpoint_hmac_rotation_in_progress_skipped"
3121 );
3122 }
3123 Err(e) => {
3124 tracing::error!(
3128 target: "audit",
3129 partition = %partition,
3130 err = %e,
3131 "waitpoint_hmac_rotation_failed"
3132 );
3133 failed.push(idx);
3134 }
3135 },
3136 None => break,
3137 }
3138 }
3139
3140 tracing::info!(
3144 target: "audit",
3145 new_kid = %new_kid,
3146 total_partitions = n,
3147 rotated,
3148 failed_count = failed.len(),
3149 in_progress_count = in_progress.len(),
3150 "waitpoint_hmac_rotation_complete"
3151 );
3152
3153 Ok(RotateWaitpointSecretResult {
3154 rotated,
3155 failed,
3156 in_progress,
3157 new_kid: new_kid.to_owned(),
3158 })
3159 }
3160
3161 async fn rotate_single_partition(
3174 &self,
3175 key: &str,
3176 new_kid: &str,
3177 new_secret_hex: &str,
3178 previous_expires_at: i64,
3179 ) -> Result<RotationStepOutcome, ServerError> {
3180 let lock_key = rotate_lock_key_for(key);
3184 const LOCK_TTL_MS: u64 = 10_000;
3185 let acquired: Option<String> = self
3186 .client
3187 .cmd("SET")
3188 .arg(&lock_key)
3189 .arg("1")
3190 .arg("NX")
3191 .arg("PX")
3192 .arg(LOCK_TTL_MS.to_string().as_str())
3193 .execute()
3194 .await
3195 .map_err(|e| ServerError::ValkeyContext {
3196 source: e,
3197 context: format!("SET NX {lock_key}"),
3198 })?;
3199 if acquired.is_none() {
3200 return Ok(RotationStepOutcome::InProgress);
3205 }
3206
3207 let result = self
3211 .rotate_single_partition_locked(key, new_kid, new_secret_hex, previous_expires_at)
3212 .await;
3213 let _: Option<i64> = self
3214 .client
3215 .cmd("DEL")
3216 .arg(&lock_key)
3217 .execute()
3218 .await
3219 .ok()
3220 .flatten();
3221 result.map(|()| RotationStepOutcome::Rotated)
3222 }
3223
3224 async fn rotate_single_partition_locked(
3225 &self,
3226 key: &str,
3227 new_kid: &str,
3228 new_secret_hex: &str,
3229 previous_expires_at: i64,
3230 ) -> Result<(), ServerError> {
3231 let current_kid: Option<String> = self
3232 .client
3233 .hget(key, "current_kid")
3234 .await
3235 .map_err(|e| ServerError::ValkeyContext {
3236 source: e,
3237 context: format!("HGET {key} current_kid"),
3238 })?;
3239
3240 if current_kid.as_deref() == Some(new_kid) {
3241 let field = format!("secret:{new_kid}");
3249 let stored_secret: Option<String> = self
3250 .client
3251 .hget(key, &field)
3252 .await
3253 .map_err(|e| ServerError::ValkeyContext {
3254 source: e,
3255 context: format!("HGET {key} secret:<kid> (idempotency check)"),
3256 })?;
3257 match stored_secret.as_deref() {
3258 Some(s) if s == new_secret_hex => {
3259 return Ok(());
3261 }
3262 Some(_) => {
3263 return Err(ServerError::OperationFailed(format!(
3271 "rotation conflict: kid {new_kid} already installed with a \
3272 different secret. Either use a fresh kid or restore the \
3273 original secret for this kid before retrying."
3274 )));
3275 }
3276 None => {
3277 self.client
3281 .hset(key, &field, new_secret_hex)
3282 .await
3283 .map_err(|e| ServerError::ValkeyContext {
3284 source: e,
3285 context: format!("HSET {key} secret:<kid> (torn-write repair)"),
3286 })?;
3287 return Ok(());
3288 }
3289 }
3290 }
3291
3292 let full_hash: std::collections::HashMap<String, String> = self
3306 .client
3307 .hgetall(key)
3308 .await
3309 .map_err(|e| ServerError::ValkeyContext {
3310 source: e,
3311 context: format!("HGETALL {key} (orphan scan)"),
3312 })?;
3313 let now_ms = std::time::SystemTime::now()
3314 .duration_since(std::time::UNIX_EPOCH)
3315 .expect("system clock before unix epoch")
3316 .as_millis() as i64;
3317 let mut expired_kids: Vec<String> = Vec::new();
3318 for (field, value) in &full_hash {
3319 if let Some(kid) = field.strip_prefix("expires_at:") {
3320 let exp = value.parse::<i64>().unwrap_or(0);
3321 if exp <= 0 || exp <= now_ms {
3322 expired_kids.push(kid.to_owned());
3323 }
3324 }
3325 }
3326 if !expired_kids.is_empty() {
3327 let mut hdel = self.client.cmd("HDEL").arg(key);
3328 for kid in &expired_kids {
3329 hdel = hdel.arg(format!("expires_at:{kid}")).arg(format!("secret:{kid}"));
3330 }
3331 let _: i64 = hdel.execute().await.map_err(|e| ServerError::ValkeyContext {
3332 source: e,
3333 context: format!("HDEL {key} expired kids (orphan GC)"),
3334 })?;
3335 }
3336
3337 let mut hset = self.client.cmd("HSET").arg(key);
3348 let prev_expires_str = previous_expires_at.to_string();
3349 let cur_owned;
3350 let outgoing_expires_field;
3351 if let Some(cur) = current_kid {
3352 cur_owned = cur;
3353 outgoing_expires_field = format!("expires_at:{cur_owned}");
3354 hset = hset
3355 .arg("previous_kid")
3356 .arg(cur_owned.as_str())
3357 .arg("previous_expires_at")
3358 .arg(prev_expires_str.as_str())
3359 .arg(outgoing_expires_field.as_str())
3360 .arg(prev_expires_str.as_str());
3361 }
3362 let secret_field = format!("secret:{new_kid}");
3363 let _: i64 = hset
3364 .arg("current_kid")
3365 .arg(new_kid)
3366 .arg(&secret_field)
3367 .arg(new_secret_hex)
3368 .execute()
3369 .await
3370 .map_err(|e| ServerError::ValkeyContext {
3371 source: e,
3372 context: format!("HSET {key} (rotate atomic)"),
3373 })?;
3374 Ok(())
3375 }
3376}
3377
3378fn rotate_lock_key_for(secrets_key: &str) -> String {
3382 match secrets_key.rsplit_once(':') {
3383 Some((prefix, _last)) => format!("{prefix}:rotate_lock"),
3384 None => format!("{secrets_key}:rotate_lock"),
3385 }
3386}
3387
3388fn parse_create_result(
3391 raw: &Value,
3392 execution_id: &ExecutionId,
3393) -> Result<CreateExecutionResult, ServerError> {
3394 let arr = match raw {
3395 Value::Array(arr) => arr,
3396 _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3397 };
3398
3399 let status = match arr.first() {
3400 Some(Ok(Value::Int(n))) => *n,
3401 _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3402 };
3403
3404 if status == 1 {
3405 let sub = arr
3407 .get(1)
3408 .and_then(|v| match v {
3409 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3410 Ok(Value::SimpleString(s)) => Some(s.clone()),
3411 _ => None,
3412 })
3413 .unwrap_or_default();
3414
3415 if sub == "DUPLICATE" {
3416 Ok(CreateExecutionResult::Duplicate {
3417 execution_id: execution_id.clone(),
3418 })
3419 } else {
3420 Ok(CreateExecutionResult::Created {
3421 execution_id: execution_id.clone(),
3422 public_state: PublicState::Waiting,
3423 })
3424 }
3425 } else {
3426 let error_code = arr
3427 .get(1)
3428 .and_then(|v| match v {
3429 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3430 Ok(Value::SimpleString(s)) => Some(s.clone()),
3431 _ => None,
3432 })
3433 .unwrap_or_else(|| "unknown".to_owned());
3434 Err(ServerError::OperationFailed(format!(
3435 "ff_create_execution failed: {error_code}"
3436 )))
3437 }
3438}
3439
3440fn parse_cancel_result(
3441 raw: &Value,
3442 execution_id: &ExecutionId,
3443) -> Result<CancelExecutionResult, ServerError> {
3444 let arr = match raw {
3445 Value::Array(arr) => arr,
3446 _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3447 };
3448
3449 let status = match arr.first() {
3450 Some(Ok(Value::Int(n))) => *n,
3451 _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3452 };
3453
3454 if status == 1 {
3455 Ok(CancelExecutionResult::Cancelled {
3456 execution_id: execution_id.clone(),
3457 public_state: PublicState::Cancelled,
3458 })
3459 } else {
3460 let error_code = arr
3461 .get(1)
3462 .and_then(|v| match v {
3463 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3464 Ok(Value::SimpleString(s)) => Some(s.clone()),
3465 _ => None,
3466 })
3467 .unwrap_or_else(|| "unknown".to_owned());
3468 Err(ServerError::OperationFailed(format!(
3469 "ff_cancel_execution failed: {error_code}"
3470 )))
3471 }
3472}
3473
3474fn parse_budget_create_result(
3475 raw: &Value,
3476 budget_id: &BudgetId,
3477) -> Result<CreateBudgetResult, ServerError> {
3478 let arr = match raw {
3479 Value::Array(arr) => arr,
3480 _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3481 };
3482
3483 let status = match arr.first() {
3484 Some(Ok(Value::Int(n))) => *n,
3485 _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3486 };
3487
3488 if status == 1 {
3489 let sub = arr
3490 .get(1)
3491 .and_then(|v| match v {
3492 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3493 Ok(Value::SimpleString(s)) => Some(s.clone()),
3494 _ => None,
3495 })
3496 .unwrap_or_default();
3497
3498 if sub == "ALREADY_SATISFIED" {
3499 Ok(CreateBudgetResult::AlreadySatisfied {
3500 budget_id: budget_id.clone(),
3501 })
3502 } else {
3503 Ok(CreateBudgetResult::Created {
3504 budget_id: budget_id.clone(),
3505 })
3506 }
3507 } else {
3508 let error_code = arr
3509 .get(1)
3510 .and_then(|v| match v {
3511 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3512 Ok(Value::SimpleString(s)) => Some(s.clone()),
3513 _ => None,
3514 })
3515 .unwrap_or_else(|| "unknown".to_owned());
3516 Err(ServerError::OperationFailed(format!(
3517 "ff_create_budget failed: {error_code}"
3518 )))
3519 }
3520}
3521
3522fn parse_quota_create_result(
3523 raw: &Value,
3524 quota_policy_id: &QuotaPolicyId,
3525) -> Result<CreateQuotaPolicyResult, ServerError> {
3526 let arr = match raw {
3527 Value::Array(arr) => arr,
3528 _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3529 };
3530
3531 let status = match arr.first() {
3532 Some(Ok(Value::Int(n))) => *n,
3533 _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3534 };
3535
3536 if status == 1 {
3537 let sub = arr
3538 .get(1)
3539 .and_then(|v| match v {
3540 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3541 Ok(Value::SimpleString(s)) => Some(s.clone()),
3542 _ => None,
3543 })
3544 .unwrap_or_default();
3545
3546 if sub == "ALREADY_SATISFIED" {
3547 Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3548 quota_policy_id: quota_policy_id.clone(),
3549 })
3550 } else {
3551 Ok(CreateQuotaPolicyResult::Created {
3552 quota_policy_id: quota_policy_id.clone(),
3553 })
3554 }
3555 } else {
3556 let error_code = arr
3557 .get(1)
3558 .and_then(|v| match v {
3559 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3560 Ok(Value::SimpleString(s)) => Some(s.clone()),
3561 _ => None,
3562 })
3563 .unwrap_or_else(|| "unknown".to_owned());
3564 Err(ServerError::OperationFailed(format!(
3565 "ff_create_quota_policy failed: {error_code}"
3566 )))
3567 }
3568}
3569
3570fn parse_create_flow_result(
3573 raw: &Value,
3574 flow_id: &FlowId,
3575) -> Result<CreateFlowResult, ServerError> {
3576 let arr = match raw {
3577 Value::Array(arr) => arr,
3578 _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3579 };
3580 let status = match arr.first() {
3581 Some(Ok(Value::Int(n))) => *n,
3582 _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3583 };
3584 if status == 1 {
3585 let sub = fcall_field_str(arr, 1);
3586 if sub == "ALREADY_SATISFIED" {
3587 Ok(CreateFlowResult::AlreadySatisfied {
3588 flow_id: flow_id.clone(),
3589 })
3590 } else {
3591 Ok(CreateFlowResult::Created {
3592 flow_id: flow_id.clone(),
3593 })
3594 }
3595 } else {
3596 let error_code = fcall_field_str(arr, 1);
3597 Err(ServerError::OperationFailed(format!(
3598 "ff_create_flow failed: {error_code}"
3599 )))
3600 }
3601}
3602
3603fn parse_add_execution_to_flow_result(
3604 raw: &Value,
3605) -> Result<AddExecutionToFlowResult, ServerError> {
3606 let arr = match raw {
3607 Value::Array(arr) => arr,
3608 _ => {
3609 return Err(ServerError::Script(
3610 "ff_add_execution_to_flow: expected Array".into(),
3611 ))
3612 }
3613 };
3614 let status = match arr.first() {
3615 Some(Ok(Value::Int(n))) => *n,
3616 _ => {
3617 return Err(ServerError::Script(
3618 "ff_add_execution_to_flow: bad status code".into(),
3619 ))
3620 }
3621 };
3622 if status == 1 {
3623 let sub = fcall_field_str(arr, 1);
3624 let eid_str = fcall_field_str(arr, 2);
3625 let nc_str = fcall_field_str(arr, 3);
3626 let eid = ExecutionId::parse(&eid_str)
3627 .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3628 let nc: u32 = nc_str.parse().unwrap_or(0);
3629 if sub == "ALREADY_SATISFIED" {
3630 Ok(AddExecutionToFlowResult::AlreadyMember {
3631 execution_id: eid,
3632 node_count: nc,
3633 })
3634 } else {
3635 Ok(AddExecutionToFlowResult::Added {
3636 execution_id: eid,
3637 new_node_count: nc,
3638 })
3639 }
3640 } else {
3641 let error_code = fcall_field_str(arr, 1);
3642 Err(ServerError::OperationFailed(format!(
3643 "ff_add_execution_to_flow failed: {error_code}"
3644 )))
3645 }
3646}
3647
3648enum ParsedCancelFlow {
3654 Cancelled {
3655 policy: String,
3656 member_execution_ids: Vec<String>,
3657 },
3658 AlreadyTerminal,
3659}
3660
3661fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3667 let arr = match raw {
3668 Value::Array(arr) => arr,
3669 _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3670 };
3671 let status = match arr.first() {
3672 Some(Ok(Value::Int(n))) => *n,
3673 _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3674 };
3675 if status != 1 {
3676 let error_code = fcall_field_str(arr, 1);
3677 if error_code == "flow_already_terminal" {
3678 return Ok(ParsedCancelFlow::AlreadyTerminal);
3679 }
3680 return Err(ServerError::OperationFailed(format!(
3681 "ff_cancel_flow failed: {error_code}"
3682 )));
3683 }
3684 let policy = fcall_field_str(arr, 2);
3686 let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3689 for i in 3..arr.len() {
3690 members.push(fcall_field_str(arr, i));
3691 }
3692 Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3693}
3694
3695fn parse_stage_dependency_edge_result(
3696 raw: &Value,
3697) -> Result<StageDependencyEdgeResult, ServerError> {
3698 let arr = match raw {
3699 Value::Array(arr) => arr,
3700 _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3701 };
3702 let status = match arr.first() {
3703 Some(Ok(Value::Int(n))) => *n,
3704 _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3705 };
3706 if status == 1 {
3707 let edge_id_str = fcall_field_str(arr, 2);
3708 let rev_str = fcall_field_str(arr, 3);
3709 let edge_id = EdgeId::parse(&edge_id_str)
3710 .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3711 let rev: u64 = rev_str.parse().unwrap_or(0);
3712 Ok(StageDependencyEdgeResult::Staged {
3713 edge_id,
3714 new_graph_revision: rev,
3715 })
3716 } else {
3717 let error_code = fcall_field_str(arr, 1);
3718 Err(ServerError::OperationFailed(format!(
3719 "ff_stage_dependency_edge failed: {error_code}"
3720 )))
3721 }
3722}
3723
3724fn parse_apply_dependency_result(
3725 raw: &Value,
3726) -> Result<ApplyDependencyToChildResult, ServerError> {
3727 let arr = match raw {
3728 Value::Array(arr) => arr,
3729 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3730 };
3731 let status = match arr.first() {
3732 Some(Ok(Value::Int(n))) => *n,
3733 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3734 };
3735 if status == 1 {
3736 let sub = fcall_field_str(arr, 1);
3737 if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3738 Ok(ApplyDependencyToChildResult::AlreadyApplied)
3739 } else {
3740 let count_str = fcall_field_str(arr, 2);
3742 let count: u32 = count_str.parse().unwrap_or(0);
3743 Ok(ApplyDependencyToChildResult::Applied {
3744 unsatisfied_count: count,
3745 })
3746 }
3747 } else {
3748 let error_code = fcall_field_str(arr, 1);
3749 Err(ServerError::OperationFailed(format!(
3750 "ff_apply_dependency_to_child failed: {error_code}"
3751 )))
3752 }
3753}
3754
3755fn parse_deliver_signal_result(
3756 raw: &Value,
3757 signal_id: &SignalId,
3758) -> Result<DeliverSignalResult, ServerError> {
3759 let arr = match raw {
3760 Value::Array(arr) => arr,
3761 _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3762 };
3763 let status = match arr.first() {
3764 Some(Ok(Value::Int(n))) => *n,
3765 _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3766 };
3767 if status == 1 {
3768 let sub = fcall_field_str(arr, 1);
3769 if sub == "DUPLICATE" {
3770 let existing_str = fcall_field_str(arr, 2);
3772 let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3773 Ok(DeliverSignalResult::Duplicate {
3774 existing_signal_id: existing_id,
3775 })
3776 } else {
3777 let effect = fcall_field_str(arr, 3);
3779 Ok(DeliverSignalResult::Accepted {
3780 signal_id: signal_id.clone(),
3781 effect,
3782 })
3783 }
3784 } else {
3785 let error_code = fcall_field_str(arr, 1);
3786 Err(ServerError::OperationFailed(format!(
3787 "ff_deliver_signal failed: {error_code}"
3788 )))
3789 }
3790}
3791
3792fn parse_change_priority_result(
3793 raw: &Value,
3794 execution_id: &ExecutionId,
3795) -> Result<ChangePriorityResult, ServerError> {
3796 let arr = match raw {
3797 Value::Array(arr) => arr,
3798 _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3799 };
3800 let status = match arr.first() {
3801 Some(Ok(Value::Int(n))) => *n,
3802 _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3803 };
3804 if status == 1 {
3805 Ok(ChangePriorityResult::Changed {
3806 execution_id: execution_id.clone(),
3807 })
3808 } else {
3809 let error_code = fcall_field_str(arr, 1);
3810 Err(ServerError::OperationFailed(format!(
3811 "ff_change_priority failed: {error_code}"
3812 )))
3813 }
3814}
3815
3816fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3817 let arr = match raw {
3818 Value::Array(arr) => arr,
3819 _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3820 };
3821 let status = match arr.first() {
3822 Some(Ok(Value::Int(n))) => *n,
3823 _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3824 };
3825 if status == 1 {
3826 let unsatisfied = fcall_field_str(arr, 2);
3828 let ps = if unsatisfied == "0" {
3829 PublicState::Waiting
3830 } else {
3831 PublicState::WaitingChildren
3832 };
3833 Ok(ReplayExecutionResult::Replayed { public_state: ps })
3834 } else {
3835 let error_code = fcall_field_str(arr, 1);
3836 Err(ServerError::OperationFailed(format!(
3837 "ff_replay_execution failed: {error_code}"
3838 )))
3839 }
3840}
3841
3842fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3853 match e {
3854 ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
3855 source: valkey_err,
3856 context: "stream FCALL transport".into(),
3857 },
3858 other => ServerError::Script(other.to_string()),
3859 }
3860}
3861
3862fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3863 match arr.get(index) {
3864 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3865 Some(Ok(Value::SimpleString(s))) => s.clone(),
3866 Some(Ok(Value::Int(n))) => n.to_string(),
3867 _ => String::new(),
3868 }
3869}
3870
3871fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3875 let arr = match raw {
3876 Value::Array(arr) => arr,
3877 _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3878 };
3879 let status_code = match arr.first() {
3880 Some(Ok(Value::Int(n))) => *n,
3881 _ => {
3882 return Err(ServerError::Script(
3883 "ff_report_usage_and_check: expected Int status code".into(),
3884 ))
3885 }
3886 };
3887 if status_code != 1 {
3888 let error_code = fcall_field_str(arr, 1);
3889 return Err(ServerError::OperationFailed(format!(
3890 "ff_report_usage_and_check failed: {error_code}"
3891 )));
3892 }
3893 let sub_status = fcall_field_str(arr, 1);
3894 match sub_status.as_str() {
3895 "OK" => Ok(ReportUsageResult::Ok),
3896 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3897 "SOFT_BREACH" => {
3898 let dim = fcall_field_str(arr, 2);
3899 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3900 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3901 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3902 }
3903 "HARD_BREACH" => {
3904 let dim = fcall_field_str(arr, 2);
3905 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3906 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3907 Ok(ReportUsageResult::HardBreach {
3908 dimension: dim,
3909 current_usage: current,
3910 hard_limit: limit,
3911 })
3912 }
3913 _ => Err(ServerError::OperationFailed(format!(
3914 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3915 ))),
3916 }
3917}
3918
3919fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3920 let arr = match raw {
3921 Value::Array(arr) => arr,
3922 _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3923 };
3924 let status = match arr.first() {
3925 Some(Ok(Value::Int(n))) => *n,
3926 _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3927 };
3928 if status == 1 {
3929 let sub = fcall_field_str(arr, 1);
3930 if sub == "ALREADY_SATISFIED" {
3931 let reason = fcall_field_str(arr, 2);
3932 Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3933 } else {
3934 let lid = fcall_field_str(arr, 2);
3935 let epoch = fcall_field_str(arr, 3);
3936 Ok(RevokeLeaseResult::Revoked {
3937 lease_id: lid,
3938 lease_epoch: epoch,
3939 })
3940 }
3941 } else {
3942 let error_code = fcall_field_str(arr, 1);
3943 Err(ServerError::OperationFailed(format!(
3944 "ff_revoke_lease failed: {error_code}"
3945 )))
3946 }
3947}
3948
3949fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3955 if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
3956 return true;
3957 }
3958 e.detail()
3959 .map(|d| {
3960 d.contains("Function not loaded")
3961 || d.contains("No matching function")
3962 || d.contains("function not found")
3963 })
3964 .unwrap_or(false)
3965 || e.to_string().contains("Function not loaded")
3966}
3967
3968async fn fcall_with_reload_on_client(
3971 client: &Client,
3972 function: &str,
3973 keys: &[&str],
3974 args: &[&str],
3975) -> Result<Value, ServerError> {
3976 match client.fcall(function, keys, args).await {
3977 Ok(v) => Ok(v),
3978 Err(e) if is_function_not_loaded(&e) => {
3979 tracing::warn!(function, "Lua library not found on server, reloading");
3980 ff_script::loader::ensure_library(client)
3981 .await
3982 .map_err(ServerError::LibraryLoad)?;
3983 client
3984 .fcall(function, keys, args)
3985 .await
3986 .map_err(ServerError::Valkey)
3987 }
3988 Err(e) => Err(ServerError::Valkey(e)),
3989 }
3990}
3991
3992async fn build_cancel_execution_fcall(
3996 client: &Client,
3997 partition_config: &PartitionConfig,
3998 args: &CancelExecutionArgs,
3999) -> Result<(Vec<String>, Vec<String>), ServerError> {
4000 let partition = execution_partition(&args.execution_id, partition_config);
4001 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4002 let idx = IndexKeys::new(&partition);
4003
4004 let lane_str: Option<String> = client
4005 .hget(&ctx.core(), "lane_id")
4006 .await
4007 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
4008 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4009
4010 let dyn_fields: Vec<Option<String>> = client
4011 .cmd("HMGET")
4012 .arg(ctx.core())
4013 .arg("current_attempt_index")
4014 .arg("current_waitpoint_id")
4015 .arg("current_worker_instance_id")
4016 .execute()
4017 .await
4018 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
4019
4020 let att_idx_val = dyn_fields.first()
4021 .and_then(|v| v.as_ref())
4022 .and_then(|s| s.parse::<u32>().ok())
4023 .unwrap_or(0);
4024 let att_idx = AttemptIndex::new(att_idx_val);
4025 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4026 let wp_id = if wp_id_str.is_empty() {
4027 WaitpointId::new()
4028 } else {
4029 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4030 };
4031 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4032 let wiid = WorkerInstanceId::new(&wiid_str);
4033
4034 let keys: Vec<String> = vec![
4035 ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
4057 let argv: Vec<String> = vec![
4058 args.execution_id.to_string(),
4059 args.reason.clone(),
4060 args.source.to_string(),
4061 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4062 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4063 ];
4064 Ok((keys, argv))
4065}
4066
4067const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4071
4072fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
4077 match e {
4078 ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
4079 Some(err.kind())
4080 }
4081 ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
4082 _ => None,
4083 }
4084}
4085
4086async fn cancel_member_execution(
4097 client: &Client,
4098 partition_config: &PartitionConfig,
4099 eid_str: &str,
4100 reason: &str,
4101 now: TimestampMs,
4102) -> Result<(), ServerError> {
4103 let execution_id = ExecutionId::parse(eid_str)
4104 .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4105 let args = CancelExecutionArgs {
4106 execution_id: execution_id.clone(),
4107 reason: reason.to_owned(),
4108 source: CancelSource::OperatorOverride,
4109 lease_id: None,
4110 lease_epoch: None,
4111 attempt_id: None,
4112 now,
4113 };
4114
4115 let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4116 for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4117 let is_last = attempt_idx + 1 == attempts;
4118 match try_cancel_member_once(client, partition_config, &args).await {
4119 Ok(()) => return Ok(()),
4120 Err(e) => {
4121 let retryable = extract_valkey_kind(&e)
4125 .map(ff_script::retry::is_retryable_kind)
4126 .unwrap_or(false);
4127 if !retryable || is_last {
4128 return Err(e);
4129 }
4130 tracing::debug!(
4131 execution_id = %execution_id,
4132 attempt = attempt_idx + 1,
4133 delay_ms = *delay_ms,
4134 error = %e,
4135 "cancel_member_execution: transient error, retrying"
4136 );
4137 tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4138 }
4139 }
4140 }
4141 Err(ServerError::OperationFailed(format!(
4145 "cancel_member_execution: retries exhausted for {execution_id}"
4146 )))
4147}
4148
4149async fn try_cancel_member_once(
4152 client: &Client,
4153 partition_config: &PartitionConfig,
4154 args: &CancelExecutionArgs,
4155) -> Result<(), ServerError> {
4156 let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4157 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4158 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4159 let raw =
4160 fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4161 parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4162}
4163
4164fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4165 let arr = match raw {
4166 Value::Array(arr) => arr,
4167 _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4168 };
4169 let status = match arr.first() {
4170 Some(Ok(Value::Int(n))) => *n,
4171 _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4172 };
4173 if status == 1 {
4174 let next_str = fcall_field_str(arr, 2);
4175 let next_ms: i64 = next_str.parse().unwrap_or(0);
4176 Ok(ResetBudgetResult::Reset {
4177 next_reset_at: TimestampMs::from_millis(next_ms),
4178 })
4179 } else {
4180 let error_code = fcall_field_str(arr, 1);
4181 Err(ServerError::OperationFailed(format!(
4182 "ff_reset_budget failed: {error_code}"
4183 )))
4184 }
4185}
4186
4187#[cfg(test)]
4188mod tests {
4189 use super::*;
4190 use ferriskey::ErrorKind;
4191
4192 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4193 ferriskey::Error::from((kind, "synthetic"))
4194 }
4195
4196 #[test]
4197 fn is_retryable_valkey_variant_uses_kind_table() {
4198 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
4199 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4200 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4201 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4202 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4203
4204 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4205 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4206 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4207 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
4208 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
4209 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4210 }
4211
4212 #[test]
4213 fn is_retryable_valkey_context_uses_kind_table() {
4214 let err = ServerError::ValkeyContext {
4215 source: mk_fk_err(ErrorKind::IoError),
4216 context: "HGET test".into(),
4217 };
4218 assert!(err.is_retryable());
4219
4220 let err = ServerError::ValkeyContext {
4221 source: mk_fk_err(ErrorKind::AuthenticationFailed),
4222 context: "auth".into(),
4223 };
4224 assert!(!err.is_retryable());
4225 }
4226
4227 #[test]
4228 fn is_retryable_library_load_delegates_to_inner_kind() {
4229 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4230 mk_fk_err(ErrorKind::IoError),
4231 ));
4232 assert!(err.is_retryable());
4233
4234 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4235 mk_fk_err(ErrorKind::AuthenticationFailed),
4236 ));
4237 assert!(!err.is_retryable());
4238
4239 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4240 expected: "1".into(),
4241 got: "2".into(),
4242 });
4243 assert!(!err.is_retryable());
4244 }
4245
4246 #[test]
4247 fn is_retryable_business_logic_variants_are_false() {
4248 assert!(!ServerError::NotFound("x".into()).is_retryable());
4249 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4250 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4251 assert!(!ServerError::Script("x".into()).is_retryable());
4252 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4253 }
4254
4255 #[test]
4256 fn valkey_kind_delegates_through_library_load() {
4257 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4258 mk_fk_err(ErrorKind::ClusterDown),
4259 ));
4260 assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
4261
4262 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4263 expected: "1".into(),
4264 got: "2".into(),
4265 });
4266 assert_eq!(err.valkey_kind(), None);
4267 }
4268
4269 #[test]
4272 fn parse_valkey_major_prefers_valkey_version_over_redis_version() {
4273 let info = "\
4277# Server\r\n\
4278redis_version:7.2.4\r\n\
4279valkey_version:9.0.3\r\n\
4280server_mode:cluster\r\n\
4281os:Linux\r\n";
4282 assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4283 }
4284
4285 #[test]
4286 fn parse_valkey_major_real_valkey_8_cluster_body() {
4287 let info = "\
4291# Server\r\n\
4292redis_version:7.2.4\r\n\
4293server_name:valkey\r\n\
4294valkey_version:9.0.3\r\n\
4295valkey_release_stage:ga\r\n\
4296redis_git_sha1:00000000\r\n\
4297server_mode:cluster\r\n";
4298 assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4299 }
4300
4301 #[test]
4302 fn parse_valkey_major_falls_back_to_redis_version_on_valkey_7() {
4303 let info = "# Server\r\nredis_version:7.2.4\r\nfoo:bar\r\n";
4305 assert_eq!(parse_valkey_major_version(info).unwrap(), 7);
4306 }
4307
4308 #[test]
4309 fn parse_valkey_major_errors_when_no_version_field() {
4310 let info = "# Server\r\nfoo:bar\r\n";
4311 let err = parse_valkey_major_version(info).unwrap_err();
4312 assert!(matches!(err, ServerError::OperationFailed(_)));
4313 assert!(
4314 err.to_string().contains("missing"),
4315 "expected 'missing' in message, got: {err}"
4316 );
4317 }
4318
4319 #[test]
4320 fn parse_valkey_major_errors_on_non_numeric() {
4321 let info = "valkey_version:invalid.x.y\n";
4322 let err = parse_valkey_major_version(info).unwrap_err();
4323 assert!(matches!(err, ServerError::OperationFailed(_)));
4324 assert!(err.to_string().contains("non-numeric major"));
4325 }
4326
4327 #[test]
4328 fn extract_info_body_unwraps_cluster_map() {
4329 let body_text = "\
4332# Server\r\n\
4333redis_version:7.2.4\r\n\
4334valkey_version:9.0.3\r\n";
4335 let node_entry = (
4336 Value::SimpleString("127.0.0.1:7000".to_string()),
4337 Value::VerbatimString {
4338 format: ferriskey::value::VerbatimFormat::Text,
4339 text: body_text.to_string(),
4340 },
4341 );
4342 let map = Value::Map(vec![node_entry]);
4343 let body = extract_info_body(&map).unwrap();
4344 assert_eq!(body, body_text);
4345 assert_eq!(parse_valkey_major_version(&body).unwrap(), 9);
4346 }
4347
4348 #[test]
4349 fn extract_info_body_handles_simple_string() {
4350 let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4351 let v = Value::SimpleString(body_text.to_string());
4352 let body = extract_info_body(&v).unwrap();
4353 assert_eq!(body, body_text);
4354 }
4355
4356 #[test]
4357 fn valkey_version_too_low_is_not_retryable() {
4358 let err = ServerError::ValkeyVersionTooLow {
4359 detected: "7".into(),
4360 required: "8.0".into(),
4361 };
4362 assert!(!err.is_retryable());
4363 assert_eq!(err.valkey_kind(), None);
4364 }
4365
4366 #[test]
4367 fn valkey_version_too_low_error_message_includes_both_versions() {
4368 let err = ServerError::ValkeyVersionTooLow {
4369 detected: "7".into(),
4370 required: "8.0".into(),
4371 };
4372 let msg = err.to_string();
4373 assert!(msg.contains("7"), "detected version in message: {msg}");
4374 assert!(msg.contains("8.0"), "required version in message: {msg}");
4375 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4376 }
4377}