1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
15 ListExecutionsPage, PendingWaitpointInfo, ReplayExecutionResult,
16 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17 RevokeLeaseResult,
18 RotateWaitpointHmacSecretArgs,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22 self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23 IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26 budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27 PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53fn validate_create_budget_dimensions(
63 dimensions: &[String],
64 hard_limits: &[u64],
65 soft_limits: &[u64],
66) -> Result<(), ServerError> {
67 let dim_count = dimensions.len();
68 if dim_count > MAX_BUDGET_DIMENSIONS {
69 return Err(ServerError::InvalidInput(format!(
70 "too_many_dimensions: limit={}, got={}",
71 MAX_BUDGET_DIMENSIONS, dim_count
72 )));
73 }
74 if hard_limits.len() != dim_count {
75 return Err(ServerError::InvalidInput(format!(
76 "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77 dim_count,
78 hard_limits.len()
79 )));
80 }
81 if soft_limits.len() != dim_count {
82 return Err(ServerError::InvalidInput(format!(
83 "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84 dim_count,
85 soft_limits.len()
86 )));
87 }
88 Ok(())
89}
90
91fn validate_report_usage_dimensions(
97 dimensions: &[String],
98 deltas: &[u64],
99) -> Result<(), ServerError> {
100 let dim_count = dimensions.len();
101 if dim_count > MAX_BUDGET_DIMENSIONS {
102 return Err(ServerError::InvalidInput(format!(
103 "too_many_dimensions: limit={}, got={}",
104 MAX_BUDGET_DIMENSIONS, dim_count
105 )));
106 }
107 if deltas.len() != dim_count {
108 return Err(ServerError::InvalidInput(format!(
109 "dimension_delta_array_mismatch: dimensions={} deltas={}",
110 dim_count,
111 deltas.len()
112 )));
113 }
114 Ok(())
115}
116
117pub struct Server {
122 client: Client,
123 tail_client: Client,
144 stream_semaphore: Arc<tokio::sync::Semaphore>,
157 xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194 engine: Engine,
195 config: ServerConfig,
196 scheduler: Arc<ff_scheduler::Scheduler>,
201 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204 metrics: Arc<ff_observability::Metrics>,
212}
213
214#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217 #[error("backend: {0}")]
224 Backend(#[from] ff_core::BackendError),
225 #[error("backend ({context}): {source}")]
228 BackendContext {
229 #[source]
230 source: ff_core::BackendError,
231 context: String,
232 },
233 #[error("config: {0}")]
234 Config(#[from] crate::config::ConfigError),
235 #[error("library load: {0}")]
236 LibraryLoad(#[from] ff_script::loader::LoadError),
237 #[error("partition mismatch: {0}")]
238 PartitionMismatch(String),
239 #[error("not found: {0}")]
240 NotFound(String),
241 #[error("invalid input: {0}")]
242 InvalidInput(String),
243 #[error("operation failed: {0}")]
244 OperationFailed(String),
245 #[error("script: {0}")]
246 Script(String),
247 #[error("too many concurrent {0} calls (max: {1})")]
256 ConcurrencyLimitExceeded(&'static str, u32),
257 #[error(
262 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
263 )]
264 ValkeyVersionTooLow {
265 detected: String,
266 required: String,
267 },
268}
269
270impl From<ferriskey::Error> for ServerError {
275 fn from(err: ferriskey::Error) -> Self {
276 Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
277 }
278}
279
280pub(crate) fn backend_context(
283 err: ferriskey::Error,
284 context: impl Into<String>,
285) -> ServerError {
286 ServerError::BackendContext {
287 source: ff_backend_valkey::backend_error_from_ferriskey(&err),
288 context: context.into(),
289 }
290}
291
292impl ServerError {
293 pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
301 match self {
302 Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
303 Self::LibraryLoad(e) => e
304 .valkey_kind()
305 .map(ff_backend_valkey::classify_ferriskey_kind),
306 _ => None,
307 }
308 }
309
310 pub fn is_retryable(&self) -> bool {
317 match self {
318 Self::Backend(be) | Self::BackendContext { source: be, .. } => {
319 be.kind().is_retryable()
320 }
321 Self::LibraryLoad(load_err) => load_err
322 .valkey_kind()
323 .map(is_retryable_kind)
324 .unwrap_or(false),
325 Self::Config(_)
326 | Self::PartitionMismatch(_)
327 | Self::NotFound(_)
328 | Self::InvalidInput(_)
329 | Self::OperationFailed(_)
330 | Self::Script(_) => false,
331 Self::ConcurrencyLimitExceeded(_, _) => true,
335 Self::ValkeyVersionTooLow { .. } => false,
337 }
338 }
339}
340
341impl Server {
342 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
350 Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
351 }
352
353 pub async fn start_with_metrics(
363 config: ServerConfig,
364 metrics: Arc<ff_observability::Metrics>,
365 ) -> Result<Self, ServerError> {
366 tracing::info!(
368 host = %config.host, port = config.port,
369 tls = config.tls, cluster = config.cluster,
370 "connecting to Valkey"
371 );
372 let mut builder = ClientBuilder::new()
373 .host(&config.host, config.port)
374 .connect_timeout(Duration::from_secs(10))
375 .request_timeout(Duration::from_millis(5000));
376 if config.tls {
377 builder = builder.tls();
378 }
379 if config.cluster {
380 builder = builder.cluster();
381 }
382 let client = builder
383 .build()
384 .await
385 .map_err(|e| crate::server::backend_context(e, "connect"))?;
386
387 let pong: String = client
389 .cmd("PING")
390 .execute()
391 .await
392 .map_err(|e| crate::server::backend_context(e, "PING"))?;
393 if pong != "PONG" {
394 return Err(ServerError::OperationFailed(format!(
395 "unexpected PING response: {pong}"
396 )));
397 }
398 tracing::info!("Valkey connection established");
399
400 verify_valkey_version(&client).await?;
405
406 validate_or_create_partition_config(&client, &config.partition_config).await?;
408
409 initialize_waitpoint_hmac_secret(
414 &client,
415 &config.partition_config,
416 &config.waitpoint_hmac_secret,
417 )
418 .await?;
419
420 if !config.skip_library_load {
422 tracing::info!("loading flowfabric Lua library");
423 ff_script::loader::ensure_library(&client)
424 .await
425 .map_err(ServerError::LibraryLoad)?;
426 } else {
427 tracing::info!("skipping library load (skip_library_load=true)");
428 }
429
430 let engine_cfg = ff_engine::EngineConfig {
433 partition_config: config.partition_config,
434 lanes: config.lanes.clone(),
435 lease_expiry_interval: config.engine_config.lease_expiry_interval,
436 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
437 index_reconciler_interval: config.engine_config.index_reconciler_interval,
438 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
439 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
440 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
441 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
442 budget_reset_interval: config.engine_config.budget_reset_interval,
443 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
444 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
445 unblock_interval: config.engine_config.unblock_interval,
446 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
447 flow_projector_interval: config.engine_config.flow_projector_interval,
448 execution_deadline_interval: config.engine_config.execution_deadline_interval,
449 cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
450 scanner_filter: config.engine_config.scanner_filter.clone(),
451 };
452 let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
463 config.host.clone(),
464 config.port,
465 );
466 valkey_conn.tls = config.tls;
467 valkey_conn.cluster = config.cluster;
468 let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
469 client.clone(),
470 config.partition_config,
471 valkey_conn,
472 );
473 let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
474 .await
475 .map_err(|e| ServerError::OperationFailed(format!(
476 "subscribe_completions: {e}"
477 )))?;
478
479 let engine = Engine::start_with_completions(
480 engine_cfg,
481 client.clone(),
482 metrics.clone(),
483 completion_stream,
484 );
485
486 tracing::info!("opening dedicated tail connection");
492 let mut tail_builder = ClientBuilder::new()
493 .host(&config.host, config.port)
494 .connect_timeout(Duration::from_secs(10))
495 .request_timeout(Duration::from_millis(5000));
499 if config.tls {
500 tail_builder = tail_builder.tls();
501 }
502 if config.cluster {
503 tail_builder = tail_builder.cluster();
504 }
505 let tail_client = tail_builder
506 .build()
507 .await
508 .map_err(|e| crate::server::backend_context(e, "connect (tail)"))?;
509 let tail_pong: String = tail_client
510 .cmd("PING")
511 .execute()
512 .await
513 .map_err(|e| crate::server::backend_context(e, "PING (tail)"))?;
514 if tail_pong != "PONG" {
515 return Err(ServerError::OperationFailed(format!(
516 "tail client unexpected PING response: {tail_pong}"
517 )));
518 }
519
520 let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
521 config.max_concurrent_stream_ops as usize,
522 ));
523 let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
524 tracing::info!(
525 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
526 "stream-op client ready (read + tail share the semaphore; \
527 tails additionally serialize via xread_block_lock)"
528 );
529
530 if config.api_token.is_none() {
538 tracing::warn!(
539 listen_addr = %config.listen_addr,
540 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
541 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
542 FF_API_TOKEN for any deployment reachable from untrusted \
543 networks."
544 );
545 tracing::warn!(
551 listen_addr = %config.listen_addr,
552 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
553 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
554 and GET /v1/executions/{{id}}/result returns raw completion payload \
555 bytes (may contain PII). Both are UNAUTHENTICATED in this \
556 configuration."
557 );
558 }
559
560 tracing::info!(
565 flow_partitions = config.partition_config.num_flow_partitions,
566 budget_partitions = config.partition_config.num_budget_partitions,
567 quota_partitions = config.partition_config.num_quota_partitions,
568 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
569 listen_addr = %config.listen_addr,
570 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
571 config.partition_config.num_flow_partitions,
572 config.partition_config.num_budget_partitions,
573 config.partition_config.num_quota_partitions,
574 );
575
576 let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
577 client.clone(),
578 config.partition_config,
579 metrics.clone(),
580 ));
581
582 Ok(Self {
583 client,
584 tail_client,
585 stream_semaphore,
586 xread_block_lock,
587 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
592 engine,
593 config,
594 scheduler,
595 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
596 metrics,
597 })
598 }
599
600 pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
602 &self.metrics
603 }
604
605 pub fn client(&self) -> &Client {
607 &self.client
608 }
609
610 async fn fcall_with_reload(
617 &self,
618 function: &str,
619 keys: &[&str],
620 args: &[&str],
621 ) -> Result<Value, ServerError> {
622 fcall_with_reload_on_client(&self.client, function, keys, args).await
623 }
624
625 pub fn config(&self) -> &ServerConfig {
627 &self.config
628 }
629
630 pub fn partition_config(&self) -> &PartitionConfig {
632 &self.config.partition_config
633 }
634
635 pub async fn create_execution(
641 &self,
642 args: &CreateExecutionArgs,
643 ) -> Result<CreateExecutionResult, ServerError> {
644 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
645 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
646 let idx = IndexKeys::new(&partition);
647
648 let lane = &args.lane_id;
649 let tag = partition.hash_tag();
650 let idem_key = match &args.idempotency_key {
651 Some(k) if !k.is_empty() => {
652 keys::idempotency_key(&tag, args.namespace.as_str(), k)
653 }
654 _ => ctx.noop(),
655 };
656
657 let delay_str = args
658 .delay_until
659 .map(|d| d.0.to_string())
660 .unwrap_or_default();
661 let is_delayed = !delay_str.is_empty();
662
663 let scheduling_zset = if is_delayed {
668 idx.lane_delayed(lane)
669 } else {
670 idx.lane_eligible(lane)
671 };
672
673 let fcall_keys: Vec<String> = vec![
674 ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
683
684 let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
685
686 let fcall_args: Vec<String> = vec![
692 args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
699 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
700 .unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
704 .map(|_| "86400000".to_string())
705 .unwrap_or_default(), tags_json, args.execution_deadline_at
708 .map(|d| d.to_string())
709 .unwrap_or_default(), args.partition_id.to_string(), ];
712
713 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
714 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
715
716 let raw: Value = self
717 .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
718 .await?;
719
720 parse_create_result(&raw, &args.execution_id)
721 }
722
723 pub async fn cancel_execution(
725 &self,
726 args: &CancelExecutionArgs,
727 ) -> Result<CancelExecutionResult, ServerError> {
728 let raw = self
729 .fcall_cancel_execution_with_reload(args)
730 .await?;
731 parse_cancel_result(&raw, &args.execution_id)
732 }
733
734 async fn fcall_cancel_execution_with_reload(
738 &self,
739 args: &CancelExecutionArgs,
740 ) -> Result<Value, ServerError> {
741 let (keys, argv) = build_cancel_execution_fcall(
742 &self.client,
743 &self.config.partition_config,
744 args,
745 )
746 .await?;
747 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
748 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
749 self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
750 }
751
752 pub async fn get_execution_state(
757 &self,
758 execution_id: &ExecutionId,
759 ) -> Result<PublicState, ServerError> {
760 let partition = execution_partition(execution_id, &self.config.partition_config);
761 let ctx = ExecKeyContext::new(&partition, execution_id);
762
763 let state_str: Option<String> = self
764 .client
765 .hget(&ctx.core(), "public_state")
766 .await
767 .map_err(|e| crate::server::backend_context(e, "HGET public_state"))?;
768
769 match state_str {
770 Some(s) => {
771 let quoted = format!("\"{s}\"");
772 serde_json::from_str("ed).map_err(|e| {
773 ServerError::Script(format!(
774 "invalid public_state '{s}' for {execution_id}: {e}"
775 ))
776 })
777 }
778 None => Err(ServerError::NotFound(format!(
779 "execution not found: {execution_id}"
780 ))),
781 }
782 }
783
784 pub async fn get_execution_result(
811 &self,
812 execution_id: &ExecutionId,
813 ) -> Result<Option<Vec<u8>>, ServerError> {
814 let partition = execution_partition(execution_id, &self.config.partition_config);
815 let ctx = ExecKeyContext::new(&partition, execution_id);
816
817 let payload: Option<Vec<u8>> = self
827 .client
828 .cmd("GET")
829 .arg(ctx.result())
830 .execute()
831 .await
832 .map_err(|e| crate::server::backend_context(e, "GET exec result"))?;
833 Ok(payload)
834 }
835
836 pub async fn list_pending_waitpoints(
882 &self,
883 execution_id: &ExecutionId,
884 ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
885 let partition = execution_partition(execution_id, &self.config.partition_config);
886 let ctx = ExecKeyContext::new(&partition, execution_id);
887
888 let core_exists: bool = self
889 .client
890 .cmd("EXISTS")
891 .arg(ctx.core())
892 .execute()
893 .await
894 .map_err(|e| crate::server::backend_context(e, "EXISTS exec_core (pending waitpoints)"))?;
895 if !core_exists {
896 return Err(ServerError::NotFound(format!(
897 "execution not found: {execution_id}"
898 )));
899 }
900
901 const WAITPOINTS_SSCAN_COUNT: usize = 100;
909 let waitpoints_key = ctx.waitpoints();
910 let mut wp_ids_raw: Vec<String> = Vec::new();
911 let mut cursor: String = "0".to_owned();
912 loop {
913 let reply: (String, Vec<String>) = self
914 .client
915 .cmd("SSCAN")
916 .arg(&waitpoints_key)
917 .arg(&cursor)
918 .arg("COUNT")
919 .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
920 .execute()
921 .await
922 .map_err(|e| crate::server::backend_context(e, "SSCAN waitpoints"))?;
923 cursor = reply.0;
924 wp_ids_raw.extend(reply.1);
925 if cursor == "0" {
926 break;
927 }
928 }
929
930 wp_ids_raw.sort_unstable();
938 wp_ids_raw.dedup();
939
940 if wp_ids_raw.is_empty() {
941 return Ok(Vec::new());
942 }
943
944 let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
948 for raw in &wp_ids_raw {
949 match WaitpointId::parse(raw) {
950 Ok(id) => wp_ids.push(id),
951 Err(e) => tracing::warn!(
952 raw_id = %raw,
953 error = %e,
954 execution_id = %execution_id,
955 "list_pending_waitpoints: skipping unparseable waitpoint_id"
956 ),
957 }
958 }
959 if wp_ids.is_empty() {
960 return Ok(Vec::new());
961 }
962
963 const WP_FIELDS: [&str; 6] = [
967 "state",
968 "waitpoint_key",
969 "waitpoint_token",
970 "created_at",
971 "activated_at",
972 "expires_at",
973 ];
974
975 let mut pass1 = self.client.pipeline();
980 let mut wp_slots = Vec::with_capacity(wp_ids.len());
981 let mut cond_slots = Vec::with_capacity(wp_ids.len());
982 for wp_id in &wp_ids {
983 let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
984 cmd = cmd.arg(ctx.waitpoint(wp_id));
985 for f in WP_FIELDS {
986 cmd = cmd.arg(f);
987 }
988 wp_slots.push(cmd.finish());
989
990 cond_slots.push(
991 pass1
992 .cmd::<Option<String>>("HGET")
993 .arg(ctx.waitpoint_condition(wp_id))
994 .arg("total_matchers")
995 .finish(),
996 );
997 }
998 pass1
999 .execute()
1000 .await
1001 .map_err(|e| crate::server::backend_context(e, "pipeline HMGET waitpoints + HGET total_matchers"))?;
1002
1003 struct Kept {
1009 wp_id: WaitpointId,
1010 wp_fields: Vec<Option<String>>,
1011 total_matchers: usize,
1012 }
1013 let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
1014 for ((wp_id, wp_slot), cond_slot) in wp_ids
1015 .iter()
1016 .zip(wp_slots)
1017 .zip(cond_slots)
1018 {
1019 let wp_fields: Vec<Option<String>> =
1020 wp_slot.value().map_err(|e| crate::server::backend_context(e, format!("pipeline slot HMGET waitpoint {wp_id}")))?;
1021
1022 if wp_fields.iter().all(Option::is_none) {
1025 let _ = cond_slot.value();
1027 continue;
1028 }
1029 let state_ref = wp_fields
1030 .first()
1031 .and_then(|v| v.as_deref())
1032 .unwrap_or("");
1033 if state_ref != "pending" && state_ref != "active" {
1034 let _ = cond_slot.value();
1035 continue;
1036 }
1037 let token_ref = wp_fields
1038 .get(2)
1039 .and_then(|v| v.as_deref())
1040 .unwrap_or("");
1041 if token_ref.is_empty() {
1042 let _ = cond_slot.value();
1043 tracing::warn!(
1044 waitpoint_id = %wp_id,
1045 execution_id = %execution_id,
1046 waitpoint_hash_key = %ctx.waitpoint(wp_id),
1047 state = %state_ref,
1048 "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1049 field is empty — likely storage corruption (half-populated write, \
1050 operator edit, or interrupted script). Skipping this entry in the \
1051 response. HGETALL the waitpoint_hash_key to inspect."
1052 );
1053 continue;
1054 }
1055
1056 let total_matchers = cond_slot
1057 .value()
1058 .map_err(|e| crate::server::backend_context(e, format!("pipeline slot HGET total_matchers {wp_id}")))?
1059 .and_then(|s| s.parse::<usize>().ok())
1060 .unwrap_or(0);
1061
1062 kept.push(Kept {
1063 wp_id: wp_id.clone(),
1064 wp_fields,
1065 total_matchers,
1066 });
1067 }
1068
1069 if kept.is_empty() {
1070 return Ok(Vec::new());
1071 }
1072
1073 let mut pass2 = self.client.pipeline();
1078 let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1079 let mut pass2_needed = false;
1080 for k in &kept {
1081 if k.total_matchers == 0 {
1082 matcher_slots.push(None);
1083 continue;
1084 }
1085 pass2_needed = true;
1086 let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1087 cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1088 for i in 0..k.total_matchers {
1089 cmd = cmd.arg(format!("matcher:{i}:name"));
1090 }
1091 matcher_slots.push(Some(cmd.finish()));
1092 }
1093 if pass2_needed {
1094 pass2.execute().await.map_err(|e| crate::server::backend_context(e, "pipeline HMGET wp_condition matchers"))?;
1095 }
1096
1097 let parse_ts = |raw: &str| -> Option<TimestampMs> {
1098 if raw.is_empty() {
1099 None
1100 } else {
1101 raw.parse::<i64>().ok().map(TimestampMs)
1102 }
1103 };
1104
1105 let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1106 for (k, slot) in kept.into_iter().zip(matcher_slots) {
1107 let get = |i: usize| -> &str {
1108 k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1109 };
1110
1111 let required_signal_names: Vec<String> = match slot {
1114 None => Vec::new(),
1115 Some(s) => {
1116 let vals: Vec<Option<String>> =
1117 s.value().map_err(|e| crate::server::backend_context(e, format!(
1118 "pipeline slot HMGET wp_condition matchers {}",
1119 k.wp_id
1120 )))?;
1121 vals.into_iter()
1122 .flatten()
1123 .filter(|name| !name.is_empty())
1124 .collect()
1125 }
1126 };
1127
1128 out.push(PendingWaitpointInfo {
1129 waitpoint_id: k.wp_id,
1130 waitpoint_key: get(1).to_owned(),
1131 state: get(0).to_owned(),
1132 waitpoint_token: WaitpointToken(get(2).to_owned()),
1133 required_signal_names,
1134 created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1135 activated_at: parse_ts(get(4)),
1136 expires_at: parse_ts(get(5)),
1137 });
1138 }
1139
1140 Ok(out)
1141 }
1142
1143 pub async fn create_budget(
1147 &self,
1148 args: &CreateBudgetArgs,
1149 ) -> Result<CreateBudgetResult, ServerError> {
1150 validate_create_budget_dimensions(
1152 &args.dimensions,
1153 &args.hard_limits,
1154 &args.soft_limits,
1155 )?;
1156 let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1157 let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1158 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1159 let policies_index = keys::budget_policies_index(bctx.hash_tag());
1160
1161 let fcall_keys: Vec<String> = vec![
1164 bctx.definition(),
1165 bctx.limits(),
1166 bctx.usage(),
1167 resets_key,
1168 policies_index,
1169 ];
1170
1171 let dim_count = args.dimensions.len();
1175 let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1176 fcall_args.push(args.budget_id.to_string());
1177 fcall_args.push(args.scope_type.clone());
1178 fcall_args.push(args.scope_id.clone());
1179 fcall_args.push(args.enforcement_mode.clone());
1180 fcall_args.push(args.on_hard_limit.clone());
1181 fcall_args.push(args.on_soft_limit.clone());
1182 fcall_args.push(args.reset_interval_ms.to_string());
1183 fcall_args.push(args.now.to_string());
1184 fcall_args.push(dim_count.to_string());
1185 for dim in &args.dimensions {
1186 fcall_args.push(dim.clone());
1187 }
1188 for hard in &args.hard_limits {
1189 fcall_args.push(hard.to_string());
1190 }
1191 for soft in &args.soft_limits {
1192 fcall_args.push(soft.to_string());
1193 }
1194
1195 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1196 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1197
1198 let raw: Value = self
1199 .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1200 .await?;
1201
1202 parse_budget_create_result(&raw, &args.budget_id)
1203 }
1204
1205 pub async fn create_quota_policy(
1207 &self,
1208 args: &CreateQuotaPolicyArgs,
1209 ) -> Result<CreateQuotaPolicyResult, ServerError> {
1210 let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1211 let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1212
1213 let fcall_keys: Vec<String> = vec![
1216 qctx.definition(),
1217 qctx.window("requests_per_window"),
1218 qctx.concurrency(),
1219 qctx.admitted_set(),
1220 keys::quota_policies_index(qctx.hash_tag()),
1221 ];
1222
1223 let fcall_args: Vec<String> = vec![
1226 args.quota_policy_id.to_string(),
1227 args.window_seconds.to_string(),
1228 args.max_requests_per_window.to_string(),
1229 args.max_concurrent.to_string(),
1230 args.now.to_string(),
1231 ];
1232
1233 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1234 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1235
1236 let raw: Value = self
1237 .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1238 .await?;
1239
1240 parse_quota_create_result(&raw, &args.quota_policy_id)
1241 }
1242
1243 pub async fn get_budget_status(
1245 &self,
1246 budget_id: &BudgetId,
1247 ) -> Result<BudgetStatus, ServerError> {
1248 let partition = budget_partition(budget_id, &self.config.partition_config);
1249 let bctx = BudgetKeyContext::new(&partition, budget_id);
1250
1251 let def: HashMap<String, String> = self
1253 .client
1254 .hgetall(&bctx.definition())
1255 .await
1256 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_def"))?;
1257
1258 if def.is_empty() {
1259 return Err(ServerError::NotFound(format!(
1260 "budget not found: {budget_id}"
1261 )));
1262 }
1263
1264 let usage_raw: HashMap<String, String> = self
1266 .client
1267 .hgetall(&bctx.usage())
1268 .await
1269 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_usage"))?;
1270 let usage: HashMap<String, u64> = usage_raw
1271 .into_iter()
1272 .filter(|(k, _)| k != "_init")
1273 .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1274 .collect();
1275
1276 let limits_raw: HashMap<String, String> = self
1278 .client
1279 .hgetall(&bctx.limits())
1280 .await
1281 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_limits"))?;
1282 let mut hard_limits = HashMap::new();
1283 let mut soft_limits = HashMap::new();
1284 for (k, v) in &limits_raw {
1285 if let Some(dim) = k.strip_prefix("hard:") {
1286 hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1287 } else if let Some(dim) = k.strip_prefix("soft:") {
1288 soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1289 }
1290 }
1291
1292 let non_empty = |s: Option<&String>| -> Option<String> {
1293 s.filter(|v| !v.is_empty()).cloned()
1294 };
1295
1296 Ok(BudgetStatus {
1297 budget_id: budget_id.to_string(),
1298 scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1299 scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1300 enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1301 usage,
1302 hard_limits,
1303 soft_limits,
1304 breach_count: def
1305 .get("breach_count")
1306 .and_then(|v| v.parse().ok())
1307 .unwrap_or(0),
1308 soft_breach_count: def
1309 .get("soft_breach_count")
1310 .and_then(|v| v.parse().ok())
1311 .unwrap_or(0),
1312 last_breach_at: non_empty(def.get("last_breach_at")),
1313 last_breach_dim: non_empty(def.get("last_breach_dim")),
1314 next_reset_at: non_empty(def.get("next_reset_at")),
1315 created_at: non_empty(def.get("created_at")),
1316 })
1317 }
1318
1319 pub async fn report_usage(
1321 &self,
1322 budget_id: &BudgetId,
1323 args: &ReportUsageArgs,
1324 ) -> Result<ReportUsageResult, ServerError> {
1325 validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1327 let partition = budget_partition(budget_id, &self.config.partition_config);
1328 let bctx = BudgetKeyContext::new(&partition, budget_id);
1329
1330 let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1332
1333 let dim_count = args.dimensions.len();
1335 let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1336 fcall_args.push(dim_count.to_string());
1337 for dim in &args.dimensions {
1338 fcall_args.push(dim.clone());
1339 }
1340 for delta in &args.deltas {
1341 fcall_args.push(delta.to_string());
1342 }
1343 fcall_args.push(args.now.to_string());
1344 let dedup_key_val = args
1345 .dedup_key
1346 .as_ref()
1347 .filter(|k| !k.is_empty())
1348 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1349 .unwrap_or_default();
1350 fcall_args.push(dedup_key_val);
1351
1352 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1353 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1354
1355 let raw: Value = self
1356 .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1357 .await?;
1358
1359 parse_report_usage_result(&raw)
1360 }
1361
1362 pub async fn reset_budget(
1364 &self,
1365 budget_id: &BudgetId,
1366 ) -> Result<ResetBudgetResult, ServerError> {
1367 let partition = budget_partition(budget_id, &self.config.partition_config);
1368 let bctx = BudgetKeyContext::new(&partition, budget_id);
1369 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1370
1371 let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1373
1374 let now = TimestampMs::now();
1376 let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1377
1378 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1379 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1380
1381 let raw: Value = self
1382 .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1383 .await?;
1384
1385 parse_reset_budget_result(&raw)
1386 }
1387
1388 pub async fn create_flow(
1392 &self,
1393 args: &CreateFlowArgs,
1394 ) -> Result<CreateFlowResult, ServerError> {
1395 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1396 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1397 let fidx = FlowIndexKeys::new(&partition);
1398
1399 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1401
1402 let fcall_args: Vec<String> = vec![
1404 args.flow_id.to_string(),
1405 args.flow_kind.clone(),
1406 args.namespace.to_string(),
1407 args.now.to_string(),
1408 ];
1409
1410 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1411 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1412
1413 let raw: Value = self
1414 .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1415 .await?;
1416
1417 parse_create_flow_result(&raw, &args.flow_id)
1418 }
1419
1420 pub async fn add_execution_to_flow(
1458 &self,
1459 args: &AddExecutionToFlowArgs,
1460 ) -> Result<AddExecutionToFlowResult, ServerError> {
1461 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1462 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1463 let fidx = FlowIndexKeys::new(&partition);
1464
1465 let exec_partition =
1469 execution_partition(&args.execution_id, &self.config.partition_config);
1470 let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1471
1472 if exec_partition.index != partition.index {
1481 return Err(ServerError::PartitionMismatch(format!(
1482 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1483 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1484 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1485 exec_p = exec_partition.index,
1486 flow_p = partition.index,
1487 )));
1488 }
1489
1490 let fcall_keys: Vec<String> = vec![
1492 fctx.core(),
1493 fctx.members(),
1494 fidx.flow_index(),
1495 ectx.core(),
1496 ];
1497
1498 let fcall_args: Vec<String> = vec![
1500 args.flow_id.to_string(),
1501 args.execution_id.to_string(),
1502 args.now.to_string(),
1503 ];
1504
1505 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1506 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1507
1508 let raw: Value = self
1509 .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1510 .await?;
1511
1512 parse_add_execution_to_flow_result(&raw)
1513 }
1514
1515 pub async fn cancel_flow(
1557 &self,
1558 args: &CancelFlowArgs,
1559 ) -> Result<CancelFlowResult, ServerError> {
1560 self.cancel_flow_inner(args, false).await
1561 }
1562
1563 pub async fn cancel_flow_wait(
1567 &self,
1568 args: &CancelFlowArgs,
1569 ) -> Result<CancelFlowResult, ServerError> {
1570 self.cancel_flow_inner(args, true).await
1571 }
1572
1573 async fn cancel_flow_inner(
1574 &self,
1575 args: &CancelFlowArgs,
1576 wait: bool,
1577 ) -> Result<CancelFlowResult, ServerError> {
1578 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1579 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1580 let fidx = FlowIndexKeys::new(&partition);
1581
1582 const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1587
1588 let fcall_keys: Vec<String> = vec![
1590 fctx.core(),
1591 fctx.members(),
1592 fidx.flow_index(),
1593 fctx.pending_cancels(),
1594 fidx.cancel_backlog(),
1595 ];
1596
1597 let fcall_args: Vec<String> = vec![
1599 args.flow_id.to_string(),
1600 args.reason.clone(),
1601 args.cancellation_policy.clone(),
1602 args.now.to_string(),
1603 CANCEL_RECONCILER_GRACE_MS.to_string(),
1604 ];
1605
1606 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1607 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1608
1609 let raw: Value = self
1610 .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1611 .await?;
1612
1613 let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1614 ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1615 (policy, member_execution_ids)
1616 }
1617 ParsedCancelFlow::AlreadyTerminal => {
1623 let flow_meta: Vec<Option<String>> = self
1624 .client
1625 .cmd("HMGET")
1626 .arg(fctx.core())
1627 .arg("cancellation_policy")
1628 .arg("cancel_reason")
1629 .execute()
1630 .await
1631 .map_err(|e| crate::server::backend_context(e, "HMGET flow_core cancellation_policy,cancel_reason"))?;
1632 let stored_policy = flow_meta
1633 .first()
1634 .and_then(|v| v.as_ref())
1635 .filter(|s| !s.is_empty())
1636 .cloned();
1637 let stored_reason = flow_meta
1638 .get(1)
1639 .and_then(|v| v.as_ref())
1640 .filter(|s| !s.is_empty())
1641 .cloned();
1642 let all_members: Vec<String> = self
1643 .client
1644 .cmd("SMEMBERS")
1645 .arg(fctx.members())
1646 .execute()
1647 .await
1648 .map_err(|e| crate::server::backend_context(e, "SMEMBERS flow members (already terminal)"))?;
1649 let total_members = all_members.len();
1656 let stored_members: Vec<String> = all_members
1657 .into_iter()
1658 .take(ALREADY_TERMINAL_MEMBER_CAP)
1659 .collect();
1660 tracing::debug!(
1661 flow_id = %args.flow_id,
1662 stored_policy = stored_policy.as_deref().unwrap_or(""),
1663 stored_reason = stored_reason.as_deref().unwrap_or(""),
1664 total_members,
1665 returned_members = stored_members.len(),
1666 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1667 );
1668 return Ok(CancelFlowResult::Cancelled {
1669 cancellation_policy: stored_policy
1673 .unwrap_or_else(|| args.cancellation_policy.clone()),
1674 member_execution_ids: stored_members,
1675 });
1676 }
1677 };
1678 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1679
1680 if !needs_dispatch {
1681 return Ok(CancelFlowResult::Cancelled {
1682 cancellation_policy: policy,
1683 member_execution_ids: members,
1684 });
1685 }
1686
1687 let pending_cancels_key = fctx.pending_cancels();
1688 let cancel_backlog_key = fidx.cancel_backlog();
1689
1690 if wait {
1691 let mut failed: Vec<String> = Vec::new();
1700 for eid_str in &members {
1701 match cancel_member_execution(
1702 &self.client,
1703 &self.config.partition_config,
1704 eid_str,
1705 &args.reason,
1706 args.now,
1707 )
1708 .await
1709 {
1710 Ok(()) => {
1711 ack_cancel_member(
1712 &self.client,
1713 &pending_cancels_key,
1714 &cancel_backlog_key,
1715 eid_str,
1716 &args.flow_id.to_string(),
1717 )
1718 .await;
1719 }
1720 Err(e) => {
1721 if is_terminal_ack_error(&e) {
1728 ack_cancel_member(
1729 &self.client,
1730 &pending_cancels_key,
1731 &cancel_backlog_key,
1732 eid_str,
1733 &args.flow_id.to_string(),
1734 )
1735 .await;
1736 continue;
1737 }
1738 tracing::warn!(
1739 execution_id = %eid_str,
1740 error = %e,
1741 "cancel_flow(wait): individual execution cancel failed \
1742 (transport/contract fault; reconciler will retry if transient)"
1743 );
1744 failed.push(eid_str.clone());
1745 }
1746 }
1747 }
1748 if failed.is_empty() {
1749 return Ok(CancelFlowResult::Cancelled {
1750 cancellation_policy: policy,
1751 member_execution_ids: members,
1752 });
1753 }
1754 return Ok(CancelFlowResult::PartiallyCancelled {
1755 cancellation_policy: policy,
1756 member_execution_ids: members,
1757 failed_member_execution_ids: failed,
1758 });
1759 }
1760
1761 let client = self.client.clone();
1764 let partition_config = self.config.partition_config;
1765 let reason = args.reason.clone();
1766 let now = args.now;
1767 let dispatch_members = members.clone();
1768 let flow_id = args.flow_id.clone();
1769 let mut guard = self.background_tasks.lock().await;
1775
1776 while let Some(joined) = guard.try_join_next() {
1783 if let Err(e) = joined {
1784 tracing::warn!(
1785 error = %e,
1786 "cancel_flow: background dispatch task panicked or was aborted"
1787 );
1788 }
1789 }
1790
1791 let pending_key_owned = pending_cancels_key.clone();
1792 let backlog_key_owned = cancel_backlog_key.clone();
1793 let flow_id_str = args.flow_id.to_string();
1794
1795 guard.spawn(async move {
1796 use futures::stream::StreamExt;
1803 const CONCURRENCY: usize = 16;
1804
1805 let member_count = dispatch_members.len();
1806 let flow_id_for_log = flow_id.clone();
1807 futures::stream::iter(dispatch_members)
1808 .map(|eid_str| {
1809 let client = client.clone();
1810 let reason = reason.clone();
1811 let flow_id = flow_id.clone();
1812 let pending = pending_key_owned.clone();
1813 let backlog = backlog_key_owned.clone();
1814 let flow_id_str = flow_id_str.clone();
1815 async move {
1816 match cancel_member_execution(
1817 &client,
1818 &partition_config,
1819 &eid_str,
1820 &reason,
1821 now,
1822 )
1823 .await
1824 {
1825 Ok(()) => {
1826 ack_cancel_member(
1827 &client,
1828 &pending,
1829 &backlog,
1830 &eid_str,
1831 &flow_id_str,
1832 )
1833 .await;
1834 }
1835 Err(e) => {
1836 if is_terminal_ack_error(&e) {
1837 ack_cancel_member(
1838 &client,
1839 &pending,
1840 &backlog,
1841 &eid_str,
1842 &flow_id_str,
1843 )
1844 .await;
1845 } else {
1846 tracing::warn!(
1847 flow_id = %flow_id,
1848 execution_id = %eid_str,
1849 error = %e,
1850 "cancel_flow(async): individual execution cancel failed \
1851 (transport/contract fault; reconciler will retry if transient)"
1852 );
1853 }
1854 }
1855 }
1856 }
1857 })
1858 .buffer_unordered(CONCURRENCY)
1859 .for_each(|()| async {})
1860 .await;
1861
1862 tracing::debug!(
1863 flow_id = %flow_id_for_log,
1864 member_count,
1865 concurrency = CONCURRENCY,
1866 "cancel_flow: background member dispatch complete"
1867 );
1868 });
1869 drop(guard);
1870
1871 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1872 Ok(CancelFlowResult::CancellationScheduled {
1873 cancellation_policy: policy,
1874 member_count,
1875 member_execution_ids: members,
1876 })
1877 }
1878
1879 pub async fn stage_dependency_edge(
1884 &self,
1885 args: &StageDependencyEdgeArgs,
1886 ) -> Result<StageDependencyEdgeResult, ServerError> {
1887 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1888 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1889
1890 let fcall_keys: Vec<String> = vec![
1892 fctx.core(),
1893 fctx.members(),
1894 fctx.edge(&args.edge_id),
1895 fctx.outgoing(&args.upstream_execution_id),
1896 fctx.incoming(&args.downstream_execution_id),
1897 fctx.grant(&args.edge_id.to_string()),
1898 ];
1899
1900 let fcall_args: Vec<String> = vec![
1903 args.flow_id.to_string(),
1904 args.edge_id.to_string(),
1905 args.upstream_execution_id.to_string(),
1906 args.downstream_execution_id.to_string(),
1907 args.dependency_kind.clone(),
1908 args.data_passing_ref.clone().unwrap_or_default(),
1909 args.expected_graph_revision.to_string(),
1910 args.now.to_string(),
1911 ];
1912
1913 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1914 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1915
1916 let raw: Value = self
1917 .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1918 .await?;
1919
1920 parse_stage_dependency_edge_result(&raw)
1921 }
1922
1923 pub async fn apply_dependency_to_child(
1928 &self,
1929 args: &ApplyDependencyToChildArgs,
1930 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1931 let partition = execution_partition(
1932 &args.downstream_execution_id,
1933 &self.config.partition_config,
1934 );
1935 let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1936 let idx = IndexKeys::new(&partition);
1937
1938 let lane_str: Option<String> = self
1940 .client
1941 .hget(&ctx.core(), "lane_id")
1942 .await
1943 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
1944 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1945
1946 let fcall_keys: Vec<String> = vec![
1949 ctx.core(),
1950 ctx.deps_meta(),
1951 ctx.deps_unresolved(),
1952 ctx.dep_edge(&args.edge_id),
1953 idx.lane_eligible(&lane),
1954 idx.lane_blocked_dependencies(&lane),
1955 ctx.deps_all_edges(),
1956 ];
1957
1958 let fcall_args: Vec<String> = vec![
1961 args.flow_id.to_string(),
1962 args.edge_id.to_string(),
1963 args.upstream_execution_id.to_string(),
1964 args.graph_revision.to_string(),
1965 args.dependency_kind.clone(),
1966 args.data_passing_ref.clone().unwrap_or_default(),
1967 args.now.to_string(),
1968 ];
1969
1970 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1971 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1972
1973 let raw: Value = self
1974 .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1975 .await?;
1976
1977 parse_apply_dependency_result(&raw)
1978 }
1979
1980 pub async fn deliver_signal(
1987 &self,
1988 args: &DeliverSignalArgs,
1989 ) -> Result<DeliverSignalResult, ServerError> {
1990 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1991 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1992 let idx = IndexKeys::new(&partition);
1993
1994 let lane_str: Option<String> = self
1996 .client
1997 .hget(&ctx.core(), "lane_id")
1998 .await
1999 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2000 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2001
2002 let wp_id = &args.waitpoint_id;
2003 let sig_id = &args.signal_id;
2004 let idem_key = args
2005 .idempotency_key
2006 .as_ref()
2007 .filter(|k| !k.is_empty())
2008 .map(|k| ctx.signal_dedup(wp_id, k))
2009 .unwrap_or_else(|| ctx.noop());
2010
2011 let fcall_keys: Vec<String> = vec![
2017 ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
2032
2033 let fcall_args: Vec<String> = vec![
2040 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
2048 .map(|p| String::from_utf8_lossy(p).into_owned())
2049 .unwrap_or_default(), args.payload_encoding
2051 .clone()
2052 .unwrap_or_else(|| "json".to_owned()), args.idempotency_key
2054 .clone()
2055 .unwrap_or_default(), args.correlation_id
2057 .clone()
2058 .unwrap_or_default(), args.target_scope.clone(), args.created_at
2061 .map(|ts| ts.to_string())
2062 .unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
2067 .unwrap_or(10_000)
2068 .to_string(), args.waitpoint_token.as_str().to_owned(), ];
2073
2074 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2075 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2076
2077 let raw: Value = self
2078 .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2079 .await?;
2080
2081 parse_deliver_signal_result(&raw, &args.signal_id)
2082 }
2083
2084 pub async fn change_priority(
2088 &self,
2089 execution_id: &ExecutionId,
2090 new_priority: i32,
2091 ) -> Result<ChangePriorityResult, ServerError> {
2092 let partition = execution_partition(execution_id, &self.config.partition_config);
2093 let ctx = ExecKeyContext::new(&partition, execution_id);
2094 let idx = IndexKeys::new(&partition);
2095
2096 let lane_str: Option<String> = self
2098 .client
2099 .hget(&ctx.core(), "lane_id")
2100 .await
2101 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2102 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2103
2104 let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2106
2107 let fcall_args: Vec<String> = vec![
2109 execution_id.to_string(),
2110 new_priority.to_string(),
2111 ];
2112
2113 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2114 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2115
2116 let raw: Value = self
2117 .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2118 .await?;
2119
2120 parse_change_priority_result(&raw, execution_id)
2121 }
2122
2123 pub async fn claim_for_worker(
2137 &self,
2138 lane: &LaneId,
2139 worker_id: &WorkerId,
2140 worker_instance_id: &WorkerInstanceId,
2141 worker_capabilities: &std::collections::BTreeSet<String>,
2142 grant_ttl_ms: u64,
2143 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2144 self.scheduler
2145 .claim_for_worker(
2146 lane,
2147 worker_id,
2148 worker_instance_id,
2149 worker_capabilities,
2150 grant_ttl_ms,
2151 )
2152 .await
2153 .map_err(|e| match e {
2154 ff_scheduler::SchedulerError::Valkey(inner) => ServerError::from(inner),
2155 ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2156 crate::server::backend_context(source, context)
2157 }
2158 ff_scheduler::SchedulerError::Config(msg) => ServerError::InvalidInput(msg),
2159 })
2160 }
2161
2162 pub async fn revoke_lease(
2164 &self,
2165 execution_id: &ExecutionId,
2166 ) -> Result<RevokeLeaseResult, ServerError> {
2167 let partition = execution_partition(execution_id, &self.config.partition_config);
2168 let ctx = ExecKeyContext::new(&partition, execution_id);
2169 let idx = IndexKeys::new(&partition);
2170
2171 let wiid_str: Option<String> = self
2173 .client
2174 .hget(&ctx.core(), "current_worker_instance_id")
2175 .await
2176 .map_err(|e| crate::server::backend_context(e, "HGET worker_instance_id"))?;
2177 let wiid = match wiid_str {
2178 Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2179 _ => {
2180 return Err(ServerError::NotFound(format!(
2181 "no active lease for execution {execution_id} (no current_worker_instance_id)"
2182 )));
2183 }
2184 };
2185
2186 let fcall_keys: Vec<String> = vec![
2188 ctx.core(),
2189 ctx.lease_current(),
2190 ctx.lease_history(),
2191 idx.lease_expiry(),
2192 idx.worker_leases(&wiid),
2193 ];
2194
2195 let fcall_args: Vec<String> = vec![
2197 execution_id.to_string(),
2198 String::new(), "operator_revoke".to_owned(),
2200 ];
2201
2202 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2203 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2204
2205 let raw: Value = self
2206 .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2207 .await?;
2208
2209 parse_revoke_lease_result(&raw)
2210 }
2211
2212 pub async fn get_execution(
2214 &self,
2215 execution_id: &ExecutionId,
2216 ) -> Result<ExecutionInfo, ServerError> {
2217 let partition = execution_partition(execution_id, &self.config.partition_config);
2218 let ctx = ExecKeyContext::new(&partition, execution_id);
2219
2220 let fields: HashMap<String, String> = self
2221 .client
2222 .hgetall(&ctx.core())
2223 .await
2224 .map_err(|e| crate::server::backend_context(e, "HGETALL exec_core"))?;
2225
2226 if fields.is_empty() {
2227 return Err(ServerError::NotFound(format!(
2228 "execution not found: {execution_id}"
2229 )));
2230 }
2231
2232 let parse_enum = |field: &str| -> String {
2233 fields.get(field).cloned().unwrap_or_default()
2234 };
2235 fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2236 let quoted = format!("\"{raw}\"");
2237 serde_json::from_str("ed).map_err(|e| {
2238 ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2239 })
2240 }
2241
2242 let lp_str = parse_enum("lifecycle_phase");
2243 let os_str = parse_enum("ownership_state");
2244 let es_str = parse_enum("eligibility_state");
2245 let br_str = parse_enum("blocking_reason");
2246 let to_str = parse_enum("terminal_outcome");
2247 let as_str = parse_enum("attempt_state");
2248 let ps_str = parse_enum("public_state");
2249
2250 let state_vector = StateVector {
2251 lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2252 ownership_state: deserialize("ownership_state", &os_str)?,
2253 eligibility_state: deserialize("eligibility_state", &es_str)?,
2254 blocking_reason: deserialize("blocking_reason", &br_str)?,
2255 terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2256 attempt_state: deserialize("attempt_state", &as_str)?,
2257 public_state: deserialize("public_state", &ps_str)?,
2258 };
2259
2260 let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2267
2268 let started_at_opt = fields
2275 .get("started_at")
2276 .filter(|s| !s.is_empty())
2277 .cloned();
2278 let completed_at_opt = fields
2279 .get("completed_at")
2280 .filter(|s| !s.is_empty())
2281 .cloned();
2282
2283 Ok(ExecutionInfo {
2284 execution_id: execution_id.clone(),
2285 namespace: parse_enum("namespace"),
2286 lane_id: parse_enum("lane_id"),
2287 priority: fields
2288 .get("priority")
2289 .and_then(|v| v.parse().ok())
2290 .unwrap_or(0),
2291 execution_kind: parse_enum("execution_kind"),
2292 state_vector,
2293 public_state: deserialize("public_state", &ps_str)?,
2294 created_at: parse_enum("created_at"),
2295 started_at: started_at_opt,
2296 completed_at: completed_at_opt,
2297 current_attempt_index: fields
2298 .get("current_attempt_index")
2299 .and_then(|v| v.parse().ok())
2300 .unwrap_or(0),
2301 flow_id: flow_id_val,
2302 blocking_detail: parse_enum("blocking_detail"),
2303 })
2304 }
2305
2306 pub async fn list_executions_page(
2316 &self,
2317 partition_id: u16,
2318 cursor: Option<ExecutionId>,
2319 limit: usize,
2320 ) -> Result<ListExecutionsPage, ServerError> {
2321 if limit == 0 {
2322 return Ok(ListExecutionsPage::new(Vec::new(), None));
2323 }
2324 let partition = ff_core::partition::Partition {
2325 family: ff_core::partition::PartitionFamily::Execution,
2326 index: partition_id,
2327 };
2328 let idx = IndexKeys::new(&partition);
2329 let all_key = idx.all_executions();
2330
2331 let raw_members: Vec<String> = self
2332 .client
2333 .cmd("SMEMBERS")
2334 .arg(&all_key)
2335 .execute()
2336 .await
2337 .map_err(|e| crate::server::backend_context(e, format!("SMEMBERS {all_key}")))?;
2338
2339 if raw_members.is_empty() {
2340 return Ok(ListExecutionsPage::new(Vec::new(), None));
2341 }
2342
2343 let mut parsed: Vec<ExecutionId> = Vec::with_capacity(raw_members.len());
2344 for raw in &raw_members {
2345 match ExecutionId::parse(raw) {
2346 Ok(id) => parsed.push(id),
2347 Err(e) => {
2348 tracing::warn!(
2349 raw_id = %raw,
2350 error = %e,
2351 set = %all_key,
2352 "list_executions_page: SMEMBERS member failed to parse as ExecutionId \
2353 (data corruption?)"
2354 );
2355 }
2356 }
2357 }
2358 parsed.sort_by(|a, b| a.as_str().cmp(b.as_str()));
2359
2360 let filtered: Vec<ExecutionId> = if let Some(c) = cursor.as_ref() {
2361 let cs = c.as_str();
2362 parsed.into_iter().filter(|e| e.as_str() > cs).collect()
2363 } else {
2364 parsed
2365 };
2366
2367 let effective_limit = limit.min(1000);
2368 let has_more = filtered.len() > effective_limit;
2369 let page: Vec<ExecutionId> = filtered.into_iter().take(effective_limit).collect();
2370 let next_cursor = if has_more { page.last().cloned() } else { None };
2371 Ok(ListExecutionsPage::new(page, next_cursor))
2372 }
2373
2374 pub async fn replay_execution(
2379 &self,
2380 execution_id: &ExecutionId,
2381 ) -> Result<ReplayExecutionResult, ServerError> {
2382 let partition = execution_partition(execution_id, &self.config.partition_config);
2383 let ctx = ExecKeyContext::new(&partition, execution_id);
2384 let idx = IndexKeys::new(&partition);
2385
2386 let dyn_fields: Vec<Option<String>> = self
2398 .client
2399 .cmd("HMGET")
2400 .arg(ctx.core())
2401 .arg("lane_id")
2402 .arg("flow_id")
2403 .arg("terminal_outcome")
2404 .execute()
2405 .await
2406 .map_err(|e| crate::server::backend_context(e, "HMGET replay pre-read"))?;
2407 let lane = LaneId::new(
2408 dyn_fields
2409 .first()
2410 .and_then(|v| v.as_ref())
2411 .cloned()
2412 .unwrap_or_else(|| "default".to_owned()),
2413 );
2414 let flow_id_str = dyn_fields
2415 .get(1)
2416 .and_then(|v| v.as_ref())
2417 .cloned()
2418 .unwrap_or_default();
2419 let terminal_outcome = dyn_fields
2420 .get(2)
2421 .and_then(|v| v.as_ref())
2422 .cloned()
2423 .unwrap_or_default();
2424
2425 let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2426
2427 let mut fcall_keys: Vec<String> = vec![
2429 ctx.core(),
2430 idx.lane_terminal(&lane),
2431 idx.lane_eligible(&lane),
2432 ctx.lease_history(),
2433 ];
2434
2435 let now = TimestampMs::now();
2437 let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2438
2439 if is_skipped_flow_member {
2440 let flow_id = FlowId::parse(&flow_id_str)
2444 .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2445 let flow_part =
2446 flow_partition(&flow_id, &self.config.partition_config);
2447 let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2448 let edge_ids: Vec<String> = self
2449 .client
2450 .cmd("SMEMBERS")
2451 .arg(flow_ctx.incoming(execution_id))
2452 .execute()
2453 .await
2454 .map_err(|e| crate::server::backend_context(e, "SMEMBERS replay edges"))?;
2455
2456 fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
2461 let edge_id = EdgeId::parse(eid_str)
2462 .unwrap_or_else(|_| EdgeId::new());
2463 fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
2466 }
2467
2468 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2469 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2470
2471 let raw: Value = self
2472 .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2473 .await?;
2474
2475 parse_replay_result(&raw)
2476 }
2477
2478 pub async fn read_attempt_stream(
2490 &self,
2491 execution_id: &ExecutionId,
2492 attempt_index: AttemptIndex,
2493 from_id: &str,
2494 to_id: &str,
2495 count_limit: u64,
2496 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2497 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2498
2499 if count_limit == 0 {
2500 return Err(ServerError::InvalidInput(
2501 "count_limit must be >= 1".to_owned(),
2502 ));
2503 }
2504
2505 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2510 Ok(p) => p,
2511 Err(tokio::sync::TryAcquireError::NoPermits) => {
2512 return Err(ServerError::ConcurrencyLimitExceeded(
2513 "stream_ops",
2514 self.config.max_concurrent_stream_ops,
2515 ));
2516 }
2517 Err(tokio::sync::TryAcquireError::Closed) => {
2518 return Err(ServerError::OperationFailed(
2519 "stream semaphore closed (server shutting down)".into(),
2520 ));
2521 }
2522 };
2523
2524 let args = ReadFramesArgs {
2525 execution_id: execution_id.clone(),
2526 attempt_index,
2527 from_id: from_id.to_owned(),
2528 to_id: to_id.to_owned(),
2529 count_limit,
2530 };
2531
2532 let partition = execution_partition(execution_id, &self.config.partition_config);
2533 let ctx = ExecKeyContext::new(&partition, execution_id);
2534 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2535
2536 let result = ff_script::functions::stream::ff_read_attempt_stream(
2540 &self.tail_client, &keys, &args,
2541 )
2542 .await
2543 .map_err(script_error_to_server);
2544
2545 drop(permit);
2546
2547 match result? {
2548 ReadFramesResult::Frames(f) => Ok(f),
2549 }
2550 }
2551
2552 pub async fn tail_attempt_stream(
2570 &self,
2571 execution_id: &ExecutionId,
2572 attempt_index: AttemptIndex,
2573 last_id: &str,
2574 block_ms: u64,
2575 count_limit: u64,
2576 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2577 if count_limit == 0 {
2578 return Err(ServerError::InvalidInput(
2579 "count_limit must be >= 1".to_owned(),
2580 ));
2581 }
2582
2583 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2599 Ok(p) => p,
2600 Err(tokio::sync::TryAcquireError::NoPermits) => {
2601 return Err(ServerError::ConcurrencyLimitExceeded(
2602 "stream_ops",
2603 self.config.max_concurrent_stream_ops,
2604 ));
2605 }
2606 Err(tokio::sync::TryAcquireError::Closed) => {
2607 return Err(ServerError::OperationFailed(
2608 "stream semaphore closed (server shutting down)".into(),
2609 ));
2610 }
2611 };
2612
2613 let partition = execution_partition(execution_id, &self.config.partition_config);
2614 let ctx = ExecKeyContext::new(&partition, execution_id);
2615 let stream_key = ctx.stream(attempt_index);
2616 let stream_meta_key = ctx.stream_meta(attempt_index);
2617
2618 let _xread_guard = self.xread_block_lock.lock().await;
2626
2627 let result = ff_script::stream_tail::xread_block(
2628 &self.tail_client,
2629 &stream_key,
2630 &stream_meta_key,
2631 last_id,
2632 block_ms,
2633 count_limit,
2634 )
2635 .await
2636 .map_err(script_error_to_server);
2637
2638 drop(_xread_guard);
2639 drop(permit);
2640 result
2641 }
2642
2643 pub async fn shutdown(self) {
2666 tracing::info!("shutting down FlowFabric server");
2667
2668 self.stream_semaphore.close();
2673 tracing::info!(
2674 "stream semaphore closed; no new read/tail attempts will be accepted"
2675 );
2676
2677 let drain_timeout = Duration::from_secs(15);
2681 let background = self.background_tasks.clone();
2682 let drain = async move {
2683 let mut guard = background.lock().await;
2684 while guard.join_next().await.is_some() {}
2685 };
2686 match tokio::time::timeout(drain_timeout, drain).await {
2687 Ok(()) => {}
2688 Err(_) => {
2689 tracing::warn!(
2690 timeout_s = drain_timeout.as_secs(),
2691 "shutdown: background tasks did not finish in time, aborting"
2692 );
2693 self.background_tasks.lock().await.abort_all();
2694 }
2695 }
2696
2697 self.engine.shutdown().await;
2698 tracing::info!("FlowFabric server shutdown complete");
2699 }
2700}
2701
2702const REQUIRED_VALKEY_MAJOR: u32 = 7;
2708const REQUIRED_VALKEY_MINOR: u32 = 2;
2709
2710const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2715
2716async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2741 let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2742 let mut backoff = Duration::from_millis(200);
2743 loop {
2744 let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2745 match query_valkey_version(client).await {
2746 Ok((detected_major, detected_minor))
2747 if (detected_major, detected_minor)
2748 >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2749 {
2750 tracing::info!(
2751 detected_major,
2752 detected_minor,
2753 required_major = REQUIRED_VALKEY_MAJOR,
2754 required_minor = REQUIRED_VALKEY_MINOR,
2755 "Valkey version accepted"
2756 );
2757 return Ok(());
2758 }
2759 Ok((detected_major, detected_minor)) => (
2760 true,
2764 ServerError::ValkeyVersionTooLow {
2765 detected: format!("{detected_major}.{detected_minor}"),
2766 required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2767 },
2768 format!(
2769 "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2770 ),
2771 ),
2772 Err(e) => {
2773 let retryable = e
2778 .backend_kind()
2779 .map(|k| k.is_retryable())
2780 .unwrap_or(true);
2784 let detail = e.to_string();
2785 (retryable, e, detail)
2786 }
2787 };
2788
2789 if !should_retry {
2790 return Err(err_for_budget_exhaust);
2791 }
2792 if tokio::time::Instant::now() >= deadline {
2793 return Err(err_for_budget_exhaust);
2794 }
2795 tracing::warn!(
2796 backoff_ms = backoff.as_millis() as u64,
2797 detail = %log_detail,
2798 "valkey version check transient failure; retrying"
2799 );
2800 tokio::time::sleep(backoff).await;
2801 backoff = (backoff * 2).min(Duration::from_secs(5));
2802 }
2803}
2804
2805async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2818 let raw: Value = client
2819 .cmd("INFO")
2820 .arg("server")
2821 .execute()
2822 .await
2823 .map_err(|e| crate::server::backend_context(e, "INFO server"))?;
2824 let bodies = extract_info_bodies(&raw)?;
2825 let mut min_version: Option<(u32, u32)> = None;
2831 for body in &bodies {
2832 let version = parse_valkey_version(body)?;
2833 min_version = Some(match min_version {
2834 None => version,
2835 Some(existing) => existing.min(version),
2836 });
2837 }
2838 min_version.ok_or_else(|| {
2839 ServerError::OperationFailed(
2840 "valkey version check: cluster INFO returned no node bodies".into(),
2841 )
2842 })
2843}
2844
2845fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2851 match raw {
2852 Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2853 Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2854 Value::SimpleString(s) => Ok(vec![s.clone()]),
2855 Value::Map(entries) => {
2856 if entries.is_empty() {
2857 return Err(ServerError::OperationFailed(
2858 "valkey version check: cluster INFO returned empty map".into(),
2859 ));
2860 }
2861 let mut out = Vec::with_capacity(entries.len());
2862 for (_, body) in entries {
2863 out.extend(extract_info_bodies(body)?);
2864 }
2865 Ok(out)
2866 }
2867 other => Err(ServerError::OperationFailed(format!(
2868 "valkey version check: unexpected INFO shape: {other:?}"
2869 ))),
2870 }
2871}
2872
2873fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2888 let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2889 let trimmed = line.trim();
2890 let mut parts = trimmed.split('.');
2891 let major_str = parts.next().unwrap_or("").trim();
2892 if major_str.is_empty() {
2893 return Err(ServerError::OperationFailed(format!(
2894 "valkey version check: empty version field in '{trimmed}'"
2895 )));
2896 }
2897 let major = major_str.parse::<u32>().map_err(|_| {
2898 ServerError::OperationFailed(format!(
2899 "valkey version check: non-numeric major in '{trimmed}'"
2900 ))
2901 })?;
2902 let minor_str = parts.next().unwrap_or("").trim();
2906 if minor_str.is_empty() {
2907 return Err(ServerError::OperationFailed(format!(
2908 "valkey version check: missing minor component in '{trimmed}'"
2909 )));
2910 }
2911 let minor = minor_str.parse::<u32>().map_err(|_| {
2912 ServerError::OperationFailed(format!(
2913 "valkey version check: non-numeric minor in '{trimmed}'"
2914 ))
2915 })?;
2916 Ok((major, minor))
2917 };
2918 if let Some(valkey_line) = info
2920 .lines()
2921 .find_map(|line| line.strip_prefix("valkey_version:"))
2922 {
2923 return extract_major_minor(valkey_line);
2924 }
2925 let server_is_valkey = info
2930 .lines()
2931 .map(str::trim)
2932 .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
2933 if !server_is_valkey {
2934 return Err(ServerError::OperationFailed(
2935 "valkey version check: INFO missing valkey_version and server_name:valkey marker \
2936 (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
2937 .into(),
2938 ));
2939 }
2940 if let Some(redis_line) = info
2944 .lines()
2945 .find_map(|line| line.strip_prefix("redis_version:"))
2946 {
2947 return extract_major_minor(redis_line);
2948 }
2949 Err(ServerError::OperationFailed(
2950 "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
2951 .into(),
2952 ))
2953}
2954
2955async fn validate_or_create_partition_config(
2962 client: &Client,
2963 config: &PartitionConfig,
2964) -> Result<(), ServerError> {
2965 let key = keys::global_config_partitions();
2966
2967 let existing: HashMap<String, String> = client
2968 .hgetall(&key)
2969 .await
2970 .map_err(|e| crate::server::backend_context(e, format!("HGETALL {key}")))?;
2971
2972 if existing.is_empty() {
2973 tracing::info!("first boot: creating {key}");
2975 client
2976 .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
2977 .await
2978 .map_err(|e| crate::server::backend_context(e, "HSET num_flow_partitions"))?;
2979 client
2980 .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
2981 .await
2982 .map_err(|e| crate::server::backend_context(e, "HSET num_budget_partitions"))?;
2983 client
2984 .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
2985 .await
2986 .map_err(|e| crate::server::backend_context(e, "HSET num_quota_partitions"))?;
2987 return Ok(());
2988 }
2989
2990 let check = |field: &str, expected: u16| -> Result<(), ServerError> {
2992 let stored: u16 = existing
2993 .get(field)
2994 .and_then(|v| v.parse().ok())
2995 .unwrap_or(0);
2996 if stored != expected {
2997 return Err(ServerError::PartitionMismatch(format!(
2998 "{field}: stored={stored}, config={expected}. \
2999 Partition counts are fixed at deployment time. \
3000 Either fix your config or migrate the data."
3001 )));
3002 }
3003 Ok(())
3004 };
3005
3006 check("num_flow_partitions", config.num_flow_partitions)?;
3007 check("num_budget_partitions", config.num_budget_partitions)?;
3008 check("num_quota_partitions", config.num_quota_partitions)?;
3009
3010 tracing::info!("partition config validated against stored {key}");
3011 Ok(())
3012}
3013
3014const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3020
3021enum PartitionBootOutcome {
3024 Match,
3026 Mismatch,
3028 Repaired,
3030 Installed,
3032}
3033
3034const BOOT_INIT_CONCURRENCY: usize = 16;
3039
3040async fn init_one_partition(
3041 client: &Client,
3042 partition: Partition,
3043 secret_hex: &str,
3044) -> Result<PartitionBootOutcome, ServerError> {
3045 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3046
3047 let stored_kid: Option<String> = client
3055 .cmd("HGET")
3056 .arg(&key)
3057 .arg("current_kid")
3058 .execute()
3059 .await
3060 .map_err(|e| crate::server::backend_context(e, format!("HGET {key} current_kid (init probe)")))?;
3061
3062 if let Some(stored_kid) = stored_kid {
3063 let field = format!("secret:{stored_kid}");
3067 let stored_secret: Option<String> = client
3068 .hget(&key, &field)
3069 .await
3070 .map_err(|e| crate::server::backend_context(e, format!("HGET {key} secret:<kid> (init check)")))?;
3071 if stored_secret.is_none() {
3072 client
3078 .hset(&key, &field, secret_hex)
3079 .await
3080 .map_err(|e| crate::server::backend_context(e, format!("HSET {key} secret:<kid> (repair torn write)")))?;
3081 return Ok(PartitionBootOutcome::Repaired);
3082 }
3083 if stored_secret.as_deref() != Some(secret_hex) {
3084 return Ok(PartitionBootOutcome::Mismatch);
3085 }
3086 return Ok(PartitionBootOutcome::Match);
3087 }
3088
3089 let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3093 let _: i64 = client
3094 .cmd("HSET")
3095 .arg(&key)
3096 .arg("current_kid")
3097 .arg(WAITPOINT_HMAC_INITIAL_KID)
3098 .arg(&secret_field)
3099 .arg(secret_hex)
3100 .execute()
3101 .await
3102 .map_err(|e| crate::server::backend_context(e, format!("HSET {key} (init waitpoint HMAC atomic)")))?;
3103 Ok(PartitionBootOutcome::Installed)
3104}
3105
3106async fn initialize_waitpoint_hmac_secret(
3118 client: &Client,
3119 partition_config: &PartitionConfig,
3120 secret_hex: &str,
3121) -> Result<(), ServerError> {
3122 use futures::stream::{FuturesUnordered, StreamExt};
3123
3124 let n = partition_config.num_flow_partitions;
3125 tracing::info!(
3126 partitions = n,
3127 concurrency = BOOT_INIT_CONCURRENCY,
3128 "installing waitpoint HMAC secret across {n} execution partitions"
3129 );
3130
3131 let mut mismatch_count: u16 = 0;
3132 let mut repaired_count: u16 = 0;
3133 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3134 let mut next_index: u16 = 0;
3135
3136 loop {
3137 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3138 let partition = Partition {
3139 family: PartitionFamily::Execution,
3140 index: next_index,
3141 };
3142 let client = client.clone();
3143 let secret_hex = secret_hex.to_owned();
3144 pending.push(async move {
3145 init_one_partition(&client, partition, &secret_hex).await
3146 });
3147 next_index += 1;
3148 }
3149 match pending.next().await {
3150 Some(res) => match res? {
3151 PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3152 PartitionBootOutcome::Mismatch => mismatch_count += 1,
3153 PartitionBootOutcome::Repaired => repaired_count += 1,
3154 },
3155 None => break,
3156 }
3157 }
3158
3159 if repaired_count > 0 {
3160 tracing::warn!(
3161 repaired_partitions = repaired_count,
3162 total_partitions = n,
3163 "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3164 (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3165 );
3166 }
3167
3168 if mismatch_count > 0 {
3169 tracing::warn!(
3170 mismatched_partitions = mismatch_count,
3171 total_partitions = n,
3172 "stored/env secret mismatch on {mismatch_count} partitions — \
3173 env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3174 run POST /v1/admin/rotate-waitpoint-secret to sync"
3175 );
3176 }
3177
3178 tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3179 Ok(())
3180}
3181
3182#[derive(Debug, Clone, serde::Serialize)]
3184pub struct RotateWaitpointSecretResult {
3185 pub rotated: u16,
3187 pub failed: Vec<u16>,
3192 pub new_kid: String,
3194}
3195
3196impl Server {
3197 pub async fn rotate_waitpoint_secret(
3205 &self,
3206 new_kid: &str,
3207 new_secret_hex: &str,
3208 ) -> Result<RotateWaitpointSecretResult, ServerError> {
3209 if new_kid.is_empty() || new_kid.contains(':') {
3210 return Err(ServerError::OperationFailed(
3211 "new_kid must be non-empty and must not contain ':'".into(),
3212 ));
3213 }
3214 if new_secret_hex.is_empty()
3215 || !new_secret_hex.len().is_multiple_of(2)
3216 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3217 {
3218 return Err(ServerError::OperationFailed(
3219 "new_secret_hex must be a non-empty even-length hex string".into(),
3220 ));
3221 }
3222
3223 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3231 Ok(p) => p,
3232 Err(tokio::sync::TryAcquireError::NoPermits) => {
3233 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3234 }
3235 Err(tokio::sync::TryAcquireError::Closed) => {
3236 return Err(ServerError::OperationFailed(
3237 "admin rotate semaphore closed (server shutting down)".into(),
3238 ));
3239 }
3240 };
3241
3242 let n = self.config.partition_config.num_flow_partitions;
3243 let grace_ms = self.config.waitpoint_hmac_grace_ms;
3247
3248 use futures::stream::{FuturesUnordered, StreamExt};
3259
3260 let mut rotated = 0u16;
3261 let mut failed = Vec::new();
3262 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3263 let mut next_index: u16 = 0;
3264
3265 loop {
3266 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3267 let partition = Partition {
3268 family: PartitionFamily::Execution,
3269 index: next_index,
3270 };
3271 let idx = next_index;
3272 let new_kid_owned = new_kid.to_owned();
3277 let new_secret_owned = new_secret_hex.to_owned();
3278 let partition_owned = partition;
3279 let fut = async move {
3280 let outcome = self
3281 .rotate_single_partition(
3282 &partition_owned,
3283 &new_kid_owned,
3284 &new_secret_owned,
3285 grace_ms,
3286 )
3287 .await;
3288 (idx, partition_owned, outcome)
3289 };
3290 pending.push(fut);
3291 next_index += 1;
3292 }
3293 match pending.next().await {
3294 Some((idx, partition, outcome)) => match outcome {
3295 Ok(()) => {
3296 rotated += 1;
3297 tracing::debug!(
3305 partition = %partition,
3306 new_kid = %new_kid,
3307 "waitpoint_hmac_rotated"
3308 );
3309 }
3310 Err(e) => {
3311 tracing::error!(
3315 target: "audit",
3316 partition = %partition,
3317 err = %e,
3318 "waitpoint_hmac_rotation_failed"
3319 );
3320 failed.push(idx);
3321 }
3322 },
3323 None => break,
3324 }
3325 }
3326
3327 tracing::info!(
3331 target: "audit",
3332 new_kid = %new_kid,
3333 total_partitions = n,
3334 rotated,
3335 failed_count = failed.len(),
3336 "waitpoint_hmac_rotation_complete"
3337 );
3338
3339 Ok(RotateWaitpointSecretResult {
3340 rotated,
3341 failed,
3342 new_kid: new_kid.to_owned(),
3343 })
3344 }
3345
3346 async fn rotate_single_partition(
3353 &self,
3354 partition: &Partition,
3355 new_kid: &str,
3356 new_secret_hex: &str,
3357 grace_ms: u64,
3358 ) -> Result<(), ServerError> {
3359 let idx = IndexKeys::new(partition);
3360 let args = RotateWaitpointHmacSecretArgs {
3361 new_kid: new_kid.to_owned(),
3362 new_secret_hex: new_secret_hex.to_owned(),
3363 grace_ms,
3364 };
3365 let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3366 &self.client,
3367 &idx,
3368 &args,
3369 )
3370 .await
3371 .map_err(|e| match e {
3372 ff_script::ScriptError::RotationConflict(kid) => {
3376 ServerError::OperationFailed(format!(
3377 "rotation conflict: kid {kid} already installed with a \
3378 different secret. Either use a fresh kid or restore the \
3379 original secret for this kid before retrying."
3380 ))
3381 }
3382 ff_script::ScriptError::Valkey(v) => crate::server::backend_context(
3383 v,
3384 format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3385 ),
3386 other => ServerError::OperationFailed(format!(
3387 "rotation failed on partition {partition}: {other}"
3388 )),
3389 })?;
3390 let _ = outcome;
3393 Ok(())
3394 }
3395}
3396
3397fn parse_create_result(
3400 raw: &Value,
3401 execution_id: &ExecutionId,
3402) -> Result<CreateExecutionResult, ServerError> {
3403 let arr = match raw {
3404 Value::Array(arr) => arr,
3405 _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3406 };
3407
3408 let status = match arr.first() {
3409 Some(Ok(Value::Int(n))) => *n,
3410 _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3411 };
3412
3413 if status == 1 {
3414 let sub = arr
3416 .get(1)
3417 .and_then(|v| match v {
3418 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3419 Ok(Value::SimpleString(s)) => Some(s.clone()),
3420 _ => None,
3421 })
3422 .unwrap_or_default();
3423
3424 if sub == "DUPLICATE" {
3425 Ok(CreateExecutionResult::Duplicate {
3426 execution_id: execution_id.clone(),
3427 })
3428 } else {
3429 Ok(CreateExecutionResult::Created {
3430 execution_id: execution_id.clone(),
3431 public_state: PublicState::Waiting,
3432 })
3433 }
3434 } else {
3435 let error_code = arr
3436 .get(1)
3437 .and_then(|v| match v {
3438 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3439 Ok(Value::SimpleString(s)) => Some(s.clone()),
3440 _ => None,
3441 })
3442 .unwrap_or_else(|| "unknown".to_owned());
3443 Err(ServerError::OperationFailed(format!(
3444 "ff_create_execution failed: {error_code}"
3445 )))
3446 }
3447}
3448
3449fn parse_cancel_result(
3450 raw: &Value,
3451 execution_id: &ExecutionId,
3452) -> Result<CancelExecutionResult, ServerError> {
3453 let arr = match raw {
3454 Value::Array(arr) => arr,
3455 _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3456 };
3457
3458 let status = match arr.first() {
3459 Some(Ok(Value::Int(n))) => *n,
3460 _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3461 };
3462
3463 if status == 1 {
3464 Ok(CancelExecutionResult::Cancelled {
3465 execution_id: execution_id.clone(),
3466 public_state: PublicState::Cancelled,
3467 })
3468 } else {
3469 let error_code = arr
3470 .get(1)
3471 .and_then(|v| match v {
3472 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3473 Ok(Value::SimpleString(s)) => Some(s.clone()),
3474 _ => None,
3475 })
3476 .unwrap_or_else(|| "unknown".to_owned());
3477 Err(ServerError::OperationFailed(format!(
3478 "ff_cancel_execution failed: {error_code}"
3479 )))
3480 }
3481}
3482
3483fn parse_budget_create_result(
3484 raw: &Value,
3485 budget_id: &BudgetId,
3486) -> Result<CreateBudgetResult, ServerError> {
3487 let arr = match raw {
3488 Value::Array(arr) => arr,
3489 _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3490 };
3491
3492 let status = match arr.first() {
3493 Some(Ok(Value::Int(n))) => *n,
3494 _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3495 };
3496
3497 if status == 1 {
3498 let sub = arr
3499 .get(1)
3500 .and_then(|v| match v {
3501 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3502 Ok(Value::SimpleString(s)) => Some(s.clone()),
3503 _ => None,
3504 })
3505 .unwrap_or_default();
3506
3507 if sub == "ALREADY_SATISFIED" {
3508 Ok(CreateBudgetResult::AlreadySatisfied {
3509 budget_id: budget_id.clone(),
3510 })
3511 } else {
3512 Ok(CreateBudgetResult::Created {
3513 budget_id: budget_id.clone(),
3514 })
3515 }
3516 } else {
3517 let error_code = arr
3518 .get(1)
3519 .and_then(|v| match v {
3520 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3521 Ok(Value::SimpleString(s)) => Some(s.clone()),
3522 _ => None,
3523 })
3524 .unwrap_or_else(|| "unknown".to_owned());
3525 Err(ServerError::OperationFailed(format!(
3526 "ff_create_budget failed: {error_code}"
3527 )))
3528 }
3529}
3530
3531fn parse_quota_create_result(
3532 raw: &Value,
3533 quota_policy_id: &QuotaPolicyId,
3534) -> Result<CreateQuotaPolicyResult, ServerError> {
3535 let arr = match raw {
3536 Value::Array(arr) => arr,
3537 _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3538 };
3539
3540 let status = match arr.first() {
3541 Some(Ok(Value::Int(n))) => *n,
3542 _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3543 };
3544
3545 if status == 1 {
3546 let sub = arr
3547 .get(1)
3548 .and_then(|v| match v {
3549 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3550 Ok(Value::SimpleString(s)) => Some(s.clone()),
3551 _ => None,
3552 })
3553 .unwrap_or_default();
3554
3555 if sub == "ALREADY_SATISFIED" {
3556 Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3557 quota_policy_id: quota_policy_id.clone(),
3558 })
3559 } else {
3560 Ok(CreateQuotaPolicyResult::Created {
3561 quota_policy_id: quota_policy_id.clone(),
3562 })
3563 }
3564 } else {
3565 let error_code = arr
3566 .get(1)
3567 .and_then(|v| match v {
3568 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3569 Ok(Value::SimpleString(s)) => Some(s.clone()),
3570 _ => None,
3571 })
3572 .unwrap_or_else(|| "unknown".to_owned());
3573 Err(ServerError::OperationFailed(format!(
3574 "ff_create_quota_policy failed: {error_code}"
3575 )))
3576 }
3577}
3578
3579fn parse_create_flow_result(
3582 raw: &Value,
3583 flow_id: &FlowId,
3584) -> Result<CreateFlowResult, ServerError> {
3585 let arr = match raw {
3586 Value::Array(arr) => arr,
3587 _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3588 };
3589 let status = match arr.first() {
3590 Some(Ok(Value::Int(n))) => *n,
3591 _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3592 };
3593 if status == 1 {
3594 let sub = fcall_field_str(arr, 1);
3595 if sub == "ALREADY_SATISFIED" {
3596 Ok(CreateFlowResult::AlreadySatisfied {
3597 flow_id: flow_id.clone(),
3598 })
3599 } else {
3600 Ok(CreateFlowResult::Created {
3601 flow_id: flow_id.clone(),
3602 })
3603 }
3604 } else {
3605 let error_code = fcall_field_str(arr, 1);
3606 Err(ServerError::OperationFailed(format!(
3607 "ff_create_flow failed: {error_code}"
3608 )))
3609 }
3610}
3611
3612fn parse_add_execution_to_flow_result(
3613 raw: &Value,
3614) -> Result<AddExecutionToFlowResult, ServerError> {
3615 let arr = match raw {
3616 Value::Array(arr) => arr,
3617 _ => {
3618 return Err(ServerError::Script(
3619 "ff_add_execution_to_flow: expected Array".into(),
3620 ))
3621 }
3622 };
3623 let status = match arr.first() {
3624 Some(Ok(Value::Int(n))) => *n,
3625 _ => {
3626 return Err(ServerError::Script(
3627 "ff_add_execution_to_flow: bad status code".into(),
3628 ))
3629 }
3630 };
3631 if status == 1 {
3632 let sub = fcall_field_str(arr, 1);
3633 let eid_str = fcall_field_str(arr, 2);
3634 let nc_str = fcall_field_str(arr, 3);
3635 let eid = ExecutionId::parse(&eid_str)
3636 .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3637 let nc: u32 = nc_str.parse().unwrap_or(0);
3638 if sub == "ALREADY_SATISFIED" {
3639 Ok(AddExecutionToFlowResult::AlreadyMember {
3640 execution_id: eid,
3641 node_count: nc,
3642 })
3643 } else {
3644 Ok(AddExecutionToFlowResult::Added {
3645 execution_id: eid,
3646 new_node_count: nc,
3647 })
3648 }
3649 } else {
3650 let error_code = fcall_field_str(arr, 1);
3651 Err(ServerError::OperationFailed(format!(
3652 "ff_add_execution_to_flow failed: {error_code}"
3653 )))
3654 }
3655}
3656
3657enum ParsedCancelFlow {
3663 Cancelled {
3664 policy: String,
3665 member_execution_ids: Vec<String>,
3666 },
3667 AlreadyTerminal,
3668}
3669
3670fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3676 let arr = match raw {
3677 Value::Array(arr) => arr,
3678 _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3679 };
3680 let status = match arr.first() {
3681 Some(Ok(Value::Int(n))) => *n,
3682 _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3683 };
3684 if status != 1 {
3685 let error_code = fcall_field_str(arr, 1);
3686 if error_code == "flow_already_terminal" {
3687 return Ok(ParsedCancelFlow::AlreadyTerminal);
3688 }
3689 return Err(ServerError::OperationFailed(format!(
3690 "ff_cancel_flow failed: {error_code}"
3691 )));
3692 }
3693 let policy = fcall_field_str(arr, 2);
3695 let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3698 for i in 3..arr.len() {
3699 members.push(fcall_field_str(arr, i));
3700 }
3701 Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3702}
3703
3704fn parse_stage_dependency_edge_result(
3705 raw: &Value,
3706) -> Result<StageDependencyEdgeResult, ServerError> {
3707 let arr = match raw {
3708 Value::Array(arr) => arr,
3709 _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3710 };
3711 let status = match arr.first() {
3712 Some(Ok(Value::Int(n))) => *n,
3713 _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3714 };
3715 if status == 1 {
3716 let edge_id_str = fcall_field_str(arr, 2);
3717 let rev_str = fcall_field_str(arr, 3);
3718 let edge_id = EdgeId::parse(&edge_id_str)
3719 .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3720 let rev: u64 = rev_str.parse().unwrap_or(0);
3721 Ok(StageDependencyEdgeResult::Staged {
3722 edge_id,
3723 new_graph_revision: rev,
3724 })
3725 } else {
3726 let error_code = fcall_field_str(arr, 1);
3727 Err(ServerError::OperationFailed(format!(
3728 "ff_stage_dependency_edge failed: {error_code}"
3729 )))
3730 }
3731}
3732
3733fn parse_apply_dependency_result(
3734 raw: &Value,
3735) -> Result<ApplyDependencyToChildResult, ServerError> {
3736 let arr = match raw {
3737 Value::Array(arr) => arr,
3738 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3739 };
3740 let status = match arr.first() {
3741 Some(Ok(Value::Int(n))) => *n,
3742 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3743 };
3744 if status == 1 {
3745 let sub = fcall_field_str(arr, 1);
3746 if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3747 Ok(ApplyDependencyToChildResult::AlreadyApplied)
3748 } else {
3749 let count_str = fcall_field_str(arr, 2);
3751 let count: u32 = count_str.parse().unwrap_or(0);
3752 Ok(ApplyDependencyToChildResult::Applied {
3753 unsatisfied_count: count,
3754 })
3755 }
3756 } else {
3757 let error_code = fcall_field_str(arr, 1);
3758 Err(ServerError::OperationFailed(format!(
3759 "ff_apply_dependency_to_child failed: {error_code}"
3760 )))
3761 }
3762}
3763
3764fn parse_deliver_signal_result(
3765 raw: &Value,
3766 signal_id: &SignalId,
3767) -> Result<DeliverSignalResult, ServerError> {
3768 let arr = match raw {
3769 Value::Array(arr) => arr,
3770 _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3771 };
3772 let status = match arr.first() {
3773 Some(Ok(Value::Int(n))) => *n,
3774 _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3775 };
3776 if status == 1 {
3777 let sub = fcall_field_str(arr, 1);
3778 if sub == "DUPLICATE" {
3779 let existing_str = fcall_field_str(arr, 2);
3781 let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3782 Ok(DeliverSignalResult::Duplicate {
3783 existing_signal_id: existing_id,
3784 })
3785 } else {
3786 let effect = fcall_field_str(arr, 3);
3788 Ok(DeliverSignalResult::Accepted {
3789 signal_id: signal_id.clone(),
3790 effect,
3791 })
3792 }
3793 } else {
3794 let error_code = fcall_field_str(arr, 1);
3795 Err(ServerError::OperationFailed(format!(
3796 "ff_deliver_signal failed: {error_code}"
3797 )))
3798 }
3799}
3800
3801fn parse_change_priority_result(
3802 raw: &Value,
3803 execution_id: &ExecutionId,
3804) -> Result<ChangePriorityResult, ServerError> {
3805 let arr = match raw {
3806 Value::Array(arr) => arr,
3807 _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3808 };
3809 let status = match arr.first() {
3810 Some(Ok(Value::Int(n))) => *n,
3811 _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3812 };
3813 if status == 1 {
3814 Ok(ChangePriorityResult::Changed {
3815 execution_id: execution_id.clone(),
3816 })
3817 } else {
3818 let error_code = fcall_field_str(arr, 1);
3819 Err(ServerError::OperationFailed(format!(
3820 "ff_change_priority failed: {error_code}"
3821 )))
3822 }
3823}
3824
3825fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3826 let arr = match raw {
3827 Value::Array(arr) => arr,
3828 _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3829 };
3830 let status = match arr.first() {
3831 Some(Ok(Value::Int(n))) => *n,
3832 _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3833 };
3834 if status == 1 {
3835 let unsatisfied = fcall_field_str(arr, 2);
3837 let ps = if unsatisfied == "0" {
3838 PublicState::Waiting
3839 } else {
3840 PublicState::WaitingChildren
3841 };
3842 Ok(ReplayExecutionResult::Replayed { public_state: ps })
3843 } else {
3844 let error_code = fcall_field_str(arr, 1);
3845 Err(ServerError::OperationFailed(format!(
3846 "ff_replay_execution failed: {error_code}"
3847 )))
3848 }
3849}
3850
3851fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3862 match e {
3863 ff_script::error::ScriptError::Valkey(valkey_err) => {
3864 crate::server::backend_context(valkey_err, "stream FCALL transport")
3865 }
3866 other => ServerError::Script(other.to_string()),
3867 }
3868}
3869
3870fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3871 match arr.get(index) {
3872 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3873 Some(Ok(Value::SimpleString(s))) => s.clone(),
3874 Some(Ok(Value::Int(n))) => n.to_string(),
3875 _ => String::new(),
3876 }
3877}
3878
3879fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3883 let arr = match raw {
3884 Value::Array(arr) => arr,
3885 _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3886 };
3887 let status_code = match arr.first() {
3888 Some(Ok(Value::Int(n))) => *n,
3889 _ => {
3890 return Err(ServerError::Script(
3891 "ff_report_usage_and_check: expected Int status code".into(),
3892 ))
3893 }
3894 };
3895 if status_code != 1 {
3896 let error_code = fcall_field_str(arr, 1);
3897 return Err(ServerError::OperationFailed(format!(
3898 "ff_report_usage_and_check failed: {error_code}"
3899 )));
3900 }
3901 let sub_status = fcall_field_str(arr, 1);
3902 match sub_status.as_str() {
3903 "OK" => Ok(ReportUsageResult::Ok),
3904 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3905 "SOFT_BREACH" => {
3906 let dim = fcall_field_str(arr, 2);
3907 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3908 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3909 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3910 }
3911 "HARD_BREACH" => {
3912 let dim = fcall_field_str(arr, 2);
3913 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3914 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3915 Ok(ReportUsageResult::HardBreach {
3916 dimension: dim,
3917 current_usage: current,
3918 hard_limit: limit,
3919 })
3920 }
3921 _ => Err(ServerError::OperationFailed(format!(
3922 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3923 ))),
3924 }
3925}
3926
3927fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3928 let arr = match raw {
3929 Value::Array(arr) => arr,
3930 _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3931 };
3932 let status = match arr.first() {
3933 Some(Ok(Value::Int(n))) => *n,
3934 _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3935 };
3936 if status == 1 {
3937 let sub = fcall_field_str(arr, 1);
3938 if sub == "ALREADY_SATISFIED" {
3939 let reason = fcall_field_str(arr, 2);
3940 Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3941 } else {
3942 let lid = fcall_field_str(arr, 2);
3943 let epoch = fcall_field_str(arr, 3);
3944 Ok(RevokeLeaseResult::Revoked {
3945 lease_id: lid,
3946 lease_epoch: epoch,
3947 })
3948 }
3949 } else {
3950 let error_code = fcall_field_str(arr, 1);
3951 Err(ServerError::OperationFailed(format!(
3952 "ff_revoke_lease failed: {error_code}"
3953 )))
3954 }
3955}
3956
3957fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3963 if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
3964 return true;
3965 }
3966 e.detail()
3967 .map(|d| {
3968 d.contains("Function not loaded")
3969 || d.contains("No matching function")
3970 || d.contains("function not found")
3971 })
3972 .unwrap_or(false)
3973 || e.to_string().contains("Function not loaded")
3974}
3975
3976async fn fcall_with_reload_on_client(
3979 client: &Client,
3980 function: &str,
3981 keys: &[&str],
3982 args: &[&str],
3983) -> Result<Value, ServerError> {
3984 match client.fcall(function, keys, args).await {
3985 Ok(v) => Ok(v),
3986 Err(e) if is_function_not_loaded(&e) => {
3987 tracing::warn!(function, "Lua library not found on server, reloading");
3988 ff_script::loader::ensure_library(client)
3989 .await
3990 .map_err(ServerError::LibraryLoad)?;
3991 client
3992 .fcall(function, keys, args)
3993 .await
3994 .map_err(ServerError::from)
3995 }
3996 Err(e) => Err(ServerError::from(e)),
3997 }
3998}
3999
4000async fn build_cancel_execution_fcall(
4004 client: &Client,
4005 partition_config: &PartitionConfig,
4006 args: &CancelExecutionArgs,
4007) -> Result<(Vec<String>, Vec<String>), ServerError> {
4008 let partition = execution_partition(&args.execution_id, partition_config);
4009 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4010 let idx = IndexKeys::new(&partition);
4011
4012 let lane_str: Option<String> = client
4013 .hget(&ctx.core(), "lane_id")
4014 .await
4015 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
4016 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4017
4018 let dyn_fields: Vec<Option<String>> = client
4019 .cmd("HMGET")
4020 .arg(ctx.core())
4021 .arg("current_attempt_index")
4022 .arg("current_waitpoint_id")
4023 .arg("current_worker_instance_id")
4024 .execute()
4025 .await
4026 .map_err(|e| crate::server::backend_context(e, "HMGET cancel pre-read"))?;
4027
4028 let att_idx_val = dyn_fields.first()
4029 .and_then(|v| v.as_ref())
4030 .and_then(|s| s.parse::<u32>().ok())
4031 .unwrap_or(0);
4032 let att_idx = AttemptIndex::new(att_idx_val);
4033 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4034 let wp_id = if wp_id_str.is_empty() {
4035 WaitpointId::new()
4036 } else {
4037 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4038 };
4039 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4040 let wiid = WorkerInstanceId::new(&wiid_str);
4041
4042 let keys: Vec<String> = vec![
4043 ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
4065 let argv: Vec<String> = vec![
4066 args.execution_id.to_string(),
4067 args.reason.clone(),
4068 args.source.to_string(),
4069 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4070 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4071 ];
4072 Ok((keys, argv))
4073}
4074
4075const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4079
4080fn extract_backend_kind(e: &ServerError) -> Option<ff_core::BackendErrorKind> {
4091 e.backend_kind()
4092}
4093
4094async fn ack_cancel_member(
4111 client: &Client,
4112 pending_cancels_key: &str,
4113 cancel_backlog_key: &str,
4114 eid_str: &str,
4115 flow_id: &str,
4116) {
4117 let keys = [pending_cancels_key, cancel_backlog_key];
4118 let args_v = [eid_str, flow_id];
4119 let fut: Result<Value, _> =
4120 client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4121 if let Err(e) = fut {
4122 tracing::warn!(
4123 flow_id = %flow_id,
4124 execution_id = %eid_str,
4125 error = %e,
4126 "ff_ack_cancel_member failed; reconciler will drain on next pass"
4127 );
4128 }
4129}
4130
4131fn is_terminal_ack_error(err: &ServerError) -> bool {
4140 match err {
4141 ServerError::OperationFailed(msg) => {
4142 msg.contains("execution_not_active") || msg.contains("execution_not_found")
4143 }
4144 _ => false,
4145 }
4146}
4147
4148async fn cancel_member_execution(
4149 client: &Client,
4150 partition_config: &PartitionConfig,
4151 eid_str: &str,
4152 reason: &str,
4153 now: TimestampMs,
4154) -> Result<(), ServerError> {
4155 let execution_id = ExecutionId::parse(eid_str)
4156 .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4157 let args = CancelExecutionArgs {
4158 execution_id: execution_id.clone(),
4159 reason: reason.to_owned(),
4160 source: CancelSource::OperatorOverride,
4161 lease_id: None,
4162 lease_epoch: None,
4163 attempt_id: None,
4164 now,
4165 };
4166
4167 let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4168 for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4169 let is_last = attempt_idx + 1 == attempts;
4170 match try_cancel_member_once(client, partition_config, &args).await {
4171 Ok(()) => return Ok(()),
4172 Err(e) => {
4173 let retryable = extract_backend_kind(&e)
4177 .map(|k| k.is_retryable())
4178 .unwrap_or(false);
4179 if !retryable || is_last {
4180 return Err(e);
4181 }
4182 tracing::debug!(
4183 execution_id = %execution_id,
4184 attempt = attempt_idx + 1,
4185 delay_ms = *delay_ms,
4186 error = %e,
4187 "cancel_member_execution: transient error, retrying"
4188 );
4189 tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4190 }
4191 }
4192 }
4193 Err(ServerError::OperationFailed(format!(
4197 "cancel_member_execution: retries exhausted for {execution_id}"
4198 )))
4199}
4200
4201async fn try_cancel_member_once(
4204 client: &Client,
4205 partition_config: &PartitionConfig,
4206 args: &CancelExecutionArgs,
4207) -> Result<(), ServerError> {
4208 let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4209 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4210 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4211 let raw =
4212 fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4213 parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4214}
4215
4216fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4217 let arr = match raw {
4218 Value::Array(arr) => arr,
4219 _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4220 };
4221 let status = match arr.first() {
4222 Some(Ok(Value::Int(n))) => *n,
4223 _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4224 };
4225 if status == 1 {
4226 let next_str = fcall_field_str(arr, 2);
4227 let next_ms: i64 = next_str.parse().unwrap_or(0);
4228 Ok(ResetBudgetResult::Reset {
4229 next_reset_at: TimestampMs::from_millis(next_ms),
4230 })
4231 } else {
4232 let error_code = fcall_field_str(arr, 1);
4233 Err(ServerError::OperationFailed(format!(
4234 "ff_reset_budget failed: {error_code}"
4235 )))
4236 }
4237}
4238
4239#[cfg(test)]
4240mod tests {
4241 use super::*;
4242 use ferriskey::ErrorKind;
4243
4244 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4245 ferriskey::Error::from((kind, "synthetic"))
4246 }
4247
4248 #[test]
4251 fn create_budget_rejects_over_cap_dimension_count() {
4252 let n = MAX_BUDGET_DIMENSIONS + 1;
4253 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4254 let hard = vec![1u64; n];
4255 let soft = vec![0u64; n];
4256 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4257 match err {
4258 ServerError::InvalidInput(msg) => {
4259 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4260 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4261 assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4262 }
4263 other => panic!("expected InvalidInput, got {other:?}"),
4264 }
4265 }
4266
4267 #[test]
4268 fn create_budget_accepts_exactly_cap_dimensions() {
4269 let n = MAX_BUDGET_DIMENSIONS;
4270 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4271 let hard = vec![1u64; n];
4272 let soft = vec![0u64; n];
4273 assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4274 }
4275
4276 #[test]
4277 fn create_budget_rejects_hard_limit_length_mismatch() {
4278 let dims = vec!["a".to_string(), "b".to_string()];
4279 let hard = vec![1u64]; let soft = vec![0u64, 0u64];
4281 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4282 match err {
4283 ServerError::InvalidInput(msg) => {
4284 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4285 assert!(msg.contains("hard_limits=1"), "got: {msg}");
4286 assert!(msg.contains("dimensions=2"), "got: {msg}");
4287 }
4288 other => panic!("expected InvalidInput, got {other:?}"),
4289 }
4290 }
4291
4292 #[test]
4293 fn create_budget_rejects_soft_limit_length_mismatch() {
4294 let dims = vec!["a".to_string(), "b".to_string()];
4295 let hard = vec![1u64, 2u64];
4296 let soft = vec![0u64, 0u64, 0u64]; let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4298 match err {
4299 ServerError::InvalidInput(msg) => {
4300 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4301 assert!(msg.contains("soft_limits=3"), "got: {msg}");
4302 }
4303 other => panic!("expected InvalidInput, got {other:?}"),
4304 }
4305 }
4306
4307 #[test]
4308 fn report_usage_rejects_over_cap_dimension_count() {
4309 let n = MAX_BUDGET_DIMENSIONS + 1;
4310 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4311 let deltas = vec![1u64; n];
4312 let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4313 match err {
4314 ServerError::InvalidInput(msg) => {
4315 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4316 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4317 }
4318 other => panic!("expected InvalidInput, got {other:?}"),
4319 }
4320 }
4321
4322 #[test]
4323 fn report_usage_accepts_exactly_cap_dimensions() {
4324 let n = MAX_BUDGET_DIMENSIONS;
4325 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4326 let deltas = vec![1u64; n];
4327 assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4328 }
4329
4330 #[test]
4331 fn report_usage_rejects_delta_length_mismatch() {
4332 let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4333 let deltas = vec![1u64, 2u64]; let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4335 match err {
4336 ServerError::InvalidInput(msg) => {
4337 assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4338 assert!(msg.contains("dimensions=3"), "got: {msg}");
4339 assert!(msg.contains("deltas=2"), "got: {msg}");
4340 }
4341 other => panic!("expected InvalidInput, got {other:?}"),
4342 }
4343 }
4344
4345 #[test]
4346 fn report_usage_accepts_empty_dimensions() {
4347 assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4350 }
4351
4352 #[test]
4353 fn is_retryable_backend_variant_uses_kind_table() {
4354 assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
4356 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4357 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4358 assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4363 assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4364 assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
4365 assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
4366 assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4368
4369 assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4371 assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4372 assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4373 }
4374
4375 #[test]
4376 fn is_retryable_backend_context_uses_kind_table() {
4377 let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
4378 assert!(err.is_retryable());
4379
4380 let err =
4381 crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
4382 assert!(!err.is_retryable());
4383 }
4384
4385 #[test]
4386 fn is_retryable_library_load_delegates_to_inner_kind() {
4387 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4388 mk_fk_err(ErrorKind::IoError),
4389 ));
4390 assert!(err.is_retryable());
4391
4392 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4393 mk_fk_err(ErrorKind::AuthenticationFailed),
4394 ));
4395 assert!(!err.is_retryable());
4396
4397 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4398 expected: "1".into(),
4399 got: "2".into(),
4400 });
4401 assert!(!err.is_retryable());
4402 }
4403
4404 #[test]
4405 fn is_retryable_business_logic_variants_are_false() {
4406 assert!(!ServerError::NotFound("x".into()).is_retryable());
4407 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4408 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4409 assert!(!ServerError::Script("x".into()).is_retryable());
4410 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4411 }
4412
4413 #[test]
4414 fn backend_kind_delegates_through_library_load() {
4415 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4416 mk_fk_err(ErrorKind::ClusterDown),
4417 ));
4418 assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
4419
4420 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4421 expected: "1".into(),
4422 got: "2".into(),
4423 });
4424 assert_eq!(err.backend_kind(), None);
4425 }
4426
4427 #[test]
4430 fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4431 let info = "\
4435# Server\r\n\
4436redis_version:7.2.4\r\n\
4437valkey_version:9.0.3\r\n\
4438server_mode:cluster\r\n\
4439os:Linux\r\n";
4440 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4441 }
4442
4443 #[test]
4444 fn parse_valkey_version_real_valkey_8_cluster_body() {
4445 let info = "\
4449# Server\r\n\
4450redis_version:7.2.4\r\n\
4451server_name:valkey\r\n\
4452valkey_version:9.0.3\r\n\
4453valkey_release_stage:ga\r\n\
4454redis_git_sha1:00000000\r\n\
4455server_mode:cluster\r\n";
4456 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4457 }
4458
4459 #[test]
4460 fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4461 let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4464 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4465 }
4466
4467 #[test]
4468 fn parse_valkey_version_rejects_redis_backend() {
4469 let info = "\
4474# Server\r\n\
4475redis_version:7.4.0\r\n\
4476redis_mode:standalone\r\n\
4477os:Linux\r\n";
4478 let err = parse_valkey_version(info).unwrap_err();
4479 assert!(matches!(err, ServerError::OperationFailed(_)));
4480 let msg = err.to_string();
4481 assert!(
4482 msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4483 "expected Redis-rejection message, got: {msg}"
4484 );
4485 }
4486
4487 #[test]
4488 fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4489 let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4491 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4492 }
4493
4494 #[test]
4495 fn parse_valkey_version_errors_when_no_version_field() {
4496 let info = "# Server\r\nfoo:bar\r\n";
4497 let err = parse_valkey_version(info).unwrap_err();
4498 assert!(matches!(err, ServerError::OperationFailed(_)));
4499 assert!(
4500 err.to_string().contains("missing"),
4501 "expected 'missing' in message, got: {err}"
4502 );
4503 }
4504
4505 #[test]
4506 fn parse_valkey_version_errors_on_non_numeric_major() {
4507 let info = "valkey_version:invalid.x.y\n";
4508 let err = parse_valkey_version(info).unwrap_err();
4509 assert!(matches!(err, ServerError::OperationFailed(_)));
4510 assert!(err.to_string().contains("non-numeric major"));
4511 }
4512
4513 #[test]
4514 fn parse_valkey_version_errors_on_non_numeric_minor() {
4515 let info = "valkey_version:7.x.0\n";
4516 let err = parse_valkey_version(info).unwrap_err();
4517 assert!(matches!(err, ServerError::OperationFailed(_)));
4518 assert!(err.to_string().contains("non-numeric minor"));
4519 }
4520
4521 #[test]
4522 fn parse_valkey_version_errors_on_missing_minor() {
4523 let info = "valkey_version:7\n";
4526 let err = parse_valkey_version(info).unwrap_err();
4527 assert!(matches!(err, ServerError::OperationFailed(_)));
4528 assert!(err.to_string().contains("missing minor"));
4529 }
4530
4531 #[test]
4532 fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4533 let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4537 let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4538 let map = Value::Map(vec![
4539 (
4540 Value::SimpleString("127.0.0.1:7000".to_string()),
4541 Value::VerbatimString {
4542 format: ferriskey::value::VerbatimFormat::Text,
4543 text: body_a.to_string(),
4544 },
4545 ),
4546 (
4547 Value::SimpleString("127.0.0.1:7001".to_string()),
4548 Value::VerbatimString {
4549 format: ferriskey::value::VerbatimFormat::Text,
4550 text: body_b.to_string(),
4551 },
4552 ),
4553 ]);
4554 let bodies = extract_info_bodies(&map).unwrap();
4555 assert_eq!(bodies.len(), 2);
4556 assert_eq!(bodies[0], body_a);
4557 assert_eq!(bodies[1], body_b);
4558 }
4559
4560 #[test]
4561 fn extract_info_bodies_handles_simple_string() {
4562 let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4563 let v = Value::SimpleString(body_text.to_string());
4564 let bodies = extract_info_bodies(&v).unwrap();
4565 assert_eq!(bodies, vec![body_text.to_string()]);
4566 }
4567
4568 #[test]
4569 fn extract_info_bodies_rejects_empty_cluster_map() {
4570 let map = Value::Map(vec![]);
4571 let err = extract_info_bodies(&map).unwrap_err();
4572 assert!(matches!(err, ServerError::OperationFailed(_)));
4573 assert!(err.to_string().contains("empty map"));
4574 }
4575
4576 #[test]
4582 fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4583 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4587 let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4588 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4589 let map = Value::Map(vec![
4590 (
4591 Value::SimpleString("node1:6379".to_string()),
4592 Value::VerbatimString {
4593 format: ferriskey::value::VerbatimFormat::Text,
4594 text: body_node1.to_string(),
4595 },
4596 ),
4597 (
4598 Value::SimpleString("node2:6379".to_string()),
4599 Value::VerbatimString {
4600 format: ferriskey::value::VerbatimFormat::Text,
4601 text: body_node2.to_string(),
4602 },
4603 ),
4604 (
4605 Value::SimpleString("node3:6379".to_string()),
4606 Value::VerbatimString {
4607 format: ferriskey::value::VerbatimFormat::Text,
4608 text: body_node3.to_string(),
4609 },
4610 ),
4611 ]);
4612
4613 let bodies = extract_info_bodies(&map).unwrap();
4614 let min = bodies
4615 .iter()
4616 .map(|b| parse_valkey_version(b).unwrap())
4617 .min()
4618 .unwrap();
4619
4620 assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4621 assert!(
4622 min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4623 "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4624 );
4625 }
4626
4627 #[test]
4631 fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4632 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4633 let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4634 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4635 let map = Value::Map(vec![
4636 (
4637 Value::SimpleString("node1:6379".to_string()),
4638 Value::VerbatimString {
4639 format: ferriskey::value::VerbatimFormat::Text,
4640 text: body_node1.to_string(),
4641 },
4642 ),
4643 (
4644 Value::SimpleString("node2:6379".to_string()),
4645 Value::VerbatimString {
4646 format: ferriskey::value::VerbatimFormat::Text,
4647 text: body_node2.to_string(),
4648 },
4649 ),
4650 (
4651 Value::SimpleString("node3:6379".to_string()),
4652 Value::VerbatimString {
4653 format: ferriskey::value::VerbatimFormat::Text,
4654 text: body_node3.to_string(),
4655 },
4656 ),
4657 ]);
4658
4659 let bodies = extract_info_bodies(&map).unwrap();
4660 let min = bodies
4661 .iter()
4662 .map(|b| parse_valkey_version(b).unwrap())
4663 .min()
4664 .unwrap();
4665
4666 assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4667 assert!(
4668 min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4669 "all-above-floor cluster must pass the gate"
4670 );
4671 }
4672
4673 #[test]
4674 fn valkey_version_too_low_is_not_retryable() {
4675 let err = ServerError::ValkeyVersionTooLow {
4676 detected: "7.0".into(),
4677 required: "7.2".into(),
4678 };
4679 assert!(!err.is_retryable());
4680 assert_eq!(err.backend_kind(), None);
4681 }
4682
4683 #[test]
4684 fn valkey_version_too_low_error_message_includes_both_versions() {
4685 let err = ServerError::ValkeyVersionTooLow {
4686 detected: "7.0".into(),
4687 required: "7.2".into(),
4688 };
4689 let msg = err.to_string();
4690 assert!(msg.contains("7.0"), "detected version in message: {msg}");
4691 assert!(msg.contains("7.2"), "required version in message: {msg}");
4692 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4693 }
4694}