1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15 ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17 RevokeLeaseResult,
18 RotateWaitpointHmacSecretArgs,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22 self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23 IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26 budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27 PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53fn validate_create_budget_dimensions(
63 dimensions: &[String],
64 hard_limits: &[u64],
65 soft_limits: &[u64],
66) -> Result<(), ServerError> {
67 let dim_count = dimensions.len();
68 if dim_count > MAX_BUDGET_DIMENSIONS {
69 return Err(ServerError::InvalidInput(format!(
70 "too_many_dimensions: limit={}, got={}",
71 MAX_BUDGET_DIMENSIONS, dim_count
72 )));
73 }
74 if hard_limits.len() != dim_count {
75 return Err(ServerError::InvalidInput(format!(
76 "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77 dim_count,
78 hard_limits.len()
79 )));
80 }
81 if soft_limits.len() != dim_count {
82 return Err(ServerError::InvalidInput(format!(
83 "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84 dim_count,
85 soft_limits.len()
86 )));
87 }
88 Ok(())
89}
90
91fn validate_report_usage_dimensions(
97 dimensions: &[String],
98 deltas: &[u64],
99) -> Result<(), ServerError> {
100 let dim_count = dimensions.len();
101 if dim_count > MAX_BUDGET_DIMENSIONS {
102 return Err(ServerError::InvalidInput(format!(
103 "too_many_dimensions: limit={}, got={}",
104 MAX_BUDGET_DIMENSIONS, dim_count
105 )));
106 }
107 if deltas.len() != dim_count {
108 return Err(ServerError::InvalidInput(format!(
109 "dimension_delta_array_mismatch: dimensions={} deltas={}",
110 dim_count,
111 deltas.len()
112 )));
113 }
114 Ok(())
115}
116
117pub struct Server {
122 client: Client,
123 tail_client: Client,
144 stream_semaphore: Arc<tokio::sync::Semaphore>,
157 xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194 engine: Engine,
195 config: ServerConfig,
196 scheduler: Arc<ff_scheduler::Scheduler>,
201 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204 metrics: Arc<ff_observability::Metrics>,
212}
213
214#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217 #[error("backend: {0}")]
224 Backend(#[from] ff_core::BackendError),
225 #[error("backend ({context}): {source}")]
228 BackendContext {
229 #[source]
230 source: ff_core::BackendError,
231 context: String,
232 },
233 #[error("config: {0}")]
234 Config(#[from] crate::config::ConfigError),
235 #[error("library load: {0}")]
236 LibraryLoad(#[from] ff_script::loader::LoadError),
237 #[error("partition mismatch: {0}")]
238 PartitionMismatch(String),
239 #[error("not found: {0}")]
240 NotFound(String),
241 #[error("invalid input: {0}")]
242 InvalidInput(String),
243 #[error("operation failed: {0}")]
244 OperationFailed(String),
245 #[error("script: {0}")]
246 Script(String),
247 #[error("too many concurrent {0} calls (max: {1})")]
256 ConcurrencyLimitExceeded(&'static str, u32),
257 #[error(
262 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
263 )]
264 ValkeyVersionTooLow {
265 detected: String,
266 required: String,
267 },
268}
269
270impl From<ferriskey::Error> for ServerError {
275 fn from(err: ferriskey::Error) -> Self {
276 Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
277 }
278}
279
280pub(crate) fn backend_context(
283 err: ferriskey::Error,
284 context: impl Into<String>,
285) -> ServerError {
286 ServerError::BackendContext {
287 source: ff_backend_valkey::backend_error_from_ferriskey(&err),
288 context: context.into(),
289 }
290}
291
292impl ServerError {
293 pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
301 match self {
302 Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
303 Self::LibraryLoad(e) => e
304 .valkey_kind()
305 .map(ff_backend_valkey::classify_ferriskey_kind),
306 _ => None,
307 }
308 }
309
310 pub fn is_retryable(&self) -> bool {
317 match self {
318 Self::Backend(be) | Self::BackendContext { source: be, .. } => {
319 be.kind().is_retryable()
320 }
321 Self::LibraryLoad(load_err) => load_err
322 .valkey_kind()
323 .map(is_retryable_kind)
324 .unwrap_or(false),
325 Self::Config(_)
326 | Self::PartitionMismatch(_)
327 | Self::NotFound(_)
328 | Self::InvalidInput(_)
329 | Self::OperationFailed(_)
330 | Self::Script(_) => false,
331 Self::ConcurrencyLimitExceeded(_, _) => true,
335 Self::ValkeyVersionTooLow { .. } => false,
337 }
338 }
339}
340
341impl Server {
342 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
350 Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
351 }
352
353 pub async fn start_with_metrics(
363 config: ServerConfig,
364 metrics: Arc<ff_observability::Metrics>,
365 ) -> Result<Self, ServerError> {
366 tracing::info!(
368 host = %config.host, port = config.port,
369 tls = config.tls, cluster = config.cluster,
370 "connecting to Valkey"
371 );
372 let mut builder = ClientBuilder::new()
373 .host(&config.host, config.port)
374 .connect_timeout(Duration::from_secs(10))
375 .request_timeout(Duration::from_millis(5000));
376 if config.tls {
377 builder = builder.tls();
378 }
379 if config.cluster {
380 builder = builder.cluster();
381 }
382 let client = builder
383 .build()
384 .await
385 .map_err(|e| crate::server::backend_context(e, "connect"))?;
386
387 let pong: String = client
389 .cmd("PING")
390 .execute()
391 .await
392 .map_err(|e| crate::server::backend_context(e, "PING"))?;
393 if pong != "PONG" {
394 return Err(ServerError::OperationFailed(format!(
395 "unexpected PING response: {pong}"
396 )));
397 }
398 tracing::info!("Valkey connection established");
399
400 verify_valkey_version(&client).await?;
405
406 validate_or_create_partition_config(&client, &config.partition_config).await?;
408
409 initialize_waitpoint_hmac_secret(
414 &client,
415 &config.partition_config,
416 &config.waitpoint_hmac_secret,
417 )
418 .await?;
419
420 if !config.skip_library_load {
422 tracing::info!("loading flowfabric Lua library");
423 ff_script::loader::ensure_library(&client)
424 .await
425 .map_err(ServerError::LibraryLoad)?;
426 } else {
427 tracing::info!("skipping library load (skip_library_load=true)");
428 }
429
430 let engine_cfg = ff_engine::EngineConfig {
433 partition_config: config.partition_config,
434 lanes: config.lanes.clone(),
435 lease_expiry_interval: config.engine_config.lease_expiry_interval,
436 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
437 index_reconciler_interval: config.engine_config.index_reconciler_interval,
438 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
439 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
440 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
441 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
442 budget_reset_interval: config.engine_config.budget_reset_interval,
443 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
444 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
445 unblock_interval: config.engine_config.unblock_interval,
446 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
447 flow_projector_interval: config.engine_config.flow_projector_interval,
448 execution_deadline_interval: config.engine_config.execution_deadline_interval,
449 cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
450 scanner_filter: config.engine_config.scanner_filter.clone(),
451 };
452 let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
463 config.host.clone(),
464 config.port,
465 );
466 valkey_conn.tls = config.tls;
467 valkey_conn.cluster = config.cluster;
468 let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
469 client.clone(),
470 config.partition_config,
471 valkey_conn,
472 );
473 let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
474 .await
475 .map_err(|e| ServerError::OperationFailed(format!(
476 "subscribe_completions: {e}"
477 )))?;
478
479 let engine = Engine::start_with_completions(
480 engine_cfg,
481 client.clone(),
482 metrics.clone(),
483 completion_stream,
484 );
485
486 tracing::info!("opening dedicated tail connection");
492 let mut tail_builder = ClientBuilder::new()
493 .host(&config.host, config.port)
494 .connect_timeout(Duration::from_secs(10))
495 .request_timeout(Duration::from_millis(5000));
499 if config.tls {
500 tail_builder = tail_builder.tls();
501 }
502 if config.cluster {
503 tail_builder = tail_builder.cluster();
504 }
505 let tail_client = tail_builder
506 .build()
507 .await
508 .map_err(|e| crate::server::backend_context(e, "connect (tail)"))?;
509 let tail_pong: String = tail_client
510 .cmd("PING")
511 .execute()
512 .await
513 .map_err(|e| crate::server::backend_context(e, "PING (tail)"))?;
514 if tail_pong != "PONG" {
515 return Err(ServerError::OperationFailed(format!(
516 "tail client unexpected PING response: {tail_pong}"
517 )));
518 }
519
520 let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
521 config.max_concurrent_stream_ops as usize,
522 ));
523 let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
524 tracing::info!(
525 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
526 "stream-op client ready (read + tail share the semaphore; \
527 tails additionally serialize via xread_block_lock)"
528 );
529
530 if config.api_token.is_none() {
538 tracing::warn!(
539 listen_addr = %config.listen_addr,
540 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
541 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
542 FF_API_TOKEN for any deployment reachable from untrusted \
543 networks."
544 );
545 tracing::warn!(
551 listen_addr = %config.listen_addr,
552 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
553 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
554 and GET /v1/executions/{{id}}/result returns raw completion payload \
555 bytes (may contain PII). Both are UNAUTHENTICATED in this \
556 configuration."
557 );
558 }
559
560 tracing::info!(
565 flow_partitions = config.partition_config.num_flow_partitions,
566 budget_partitions = config.partition_config.num_budget_partitions,
567 quota_partitions = config.partition_config.num_quota_partitions,
568 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
569 listen_addr = %config.listen_addr,
570 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
571 config.partition_config.num_flow_partitions,
572 config.partition_config.num_budget_partitions,
573 config.partition_config.num_quota_partitions,
574 );
575
576 let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
577 client.clone(),
578 config.partition_config,
579 metrics.clone(),
580 ));
581
582 Ok(Self {
583 client,
584 tail_client,
585 stream_semaphore,
586 xread_block_lock,
587 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
592 engine,
593 config,
594 scheduler,
595 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
596 metrics,
597 })
598 }
599
600 pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
602 &self.metrics
603 }
604
605 pub fn client(&self) -> &Client {
607 &self.client
608 }
609
610 async fn fcall_with_reload(
617 &self,
618 function: &str,
619 keys: &[&str],
620 args: &[&str],
621 ) -> Result<Value, ServerError> {
622 fcall_with_reload_on_client(&self.client, function, keys, args).await
623 }
624
625 pub fn config(&self) -> &ServerConfig {
627 &self.config
628 }
629
630 pub fn partition_config(&self) -> &PartitionConfig {
632 &self.config.partition_config
633 }
634
635 pub async fn create_execution(
641 &self,
642 args: &CreateExecutionArgs,
643 ) -> Result<CreateExecutionResult, ServerError> {
644 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
645 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
646 let idx = IndexKeys::new(&partition);
647
648 let lane = &args.lane_id;
649 let tag = partition.hash_tag();
650 let idem_key = match &args.idempotency_key {
651 Some(k) if !k.is_empty() => {
652 keys::idempotency_key(&tag, args.namespace.as_str(), k)
653 }
654 _ => ctx.noop(),
655 };
656
657 let delay_str = args
658 .delay_until
659 .map(|d| d.0.to_string())
660 .unwrap_or_default();
661 let is_delayed = !delay_str.is_empty();
662
663 let scheduling_zset = if is_delayed {
668 idx.lane_delayed(lane)
669 } else {
670 idx.lane_eligible(lane)
671 };
672
673 let fcall_keys: Vec<String> = vec![
674 ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
683
684 let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
685
686 let fcall_args: Vec<String> = vec![
692 args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
699 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
700 .unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
704 .map(|_| "86400000".to_string())
705 .unwrap_or_default(), tags_json, args.execution_deadline_at
708 .map(|d| d.to_string())
709 .unwrap_or_default(), args.partition_id.to_string(), ];
712
713 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
714 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
715
716 let raw: Value = self
717 .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
718 .await?;
719
720 parse_create_result(&raw, &args.execution_id)
721 }
722
723 pub async fn cancel_execution(
725 &self,
726 args: &CancelExecutionArgs,
727 ) -> Result<CancelExecutionResult, ServerError> {
728 let raw = self
729 .fcall_cancel_execution_with_reload(args)
730 .await?;
731 parse_cancel_result(&raw, &args.execution_id)
732 }
733
734 async fn fcall_cancel_execution_with_reload(
738 &self,
739 args: &CancelExecutionArgs,
740 ) -> Result<Value, ServerError> {
741 let (keys, argv) = build_cancel_execution_fcall(
742 &self.client,
743 &self.config.partition_config,
744 args,
745 )
746 .await?;
747 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
748 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
749 self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
750 }
751
752 pub async fn get_execution_state(
757 &self,
758 execution_id: &ExecutionId,
759 ) -> Result<PublicState, ServerError> {
760 let partition = execution_partition(execution_id, &self.config.partition_config);
761 let ctx = ExecKeyContext::new(&partition, execution_id);
762
763 let state_str: Option<String> = self
764 .client
765 .hget(&ctx.core(), "public_state")
766 .await
767 .map_err(|e| crate::server::backend_context(e, "HGET public_state"))?;
768
769 match state_str {
770 Some(s) => {
771 let quoted = format!("\"{s}\"");
772 serde_json::from_str("ed).map_err(|e| {
773 ServerError::Script(format!(
774 "invalid public_state '{s}' for {execution_id}: {e}"
775 ))
776 })
777 }
778 None => Err(ServerError::NotFound(format!(
779 "execution not found: {execution_id}"
780 ))),
781 }
782 }
783
784 pub async fn get_execution_result(
811 &self,
812 execution_id: &ExecutionId,
813 ) -> Result<Option<Vec<u8>>, ServerError> {
814 let partition = execution_partition(execution_id, &self.config.partition_config);
815 let ctx = ExecKeyContext::new(&partition, execution_id);
816
817 let payload: Option<Vec<u8>> = self
827 .client
828 .cmd("GET")
829 .arg(ctx.result())
830 .execute()
831 .await
832 .map_err(|e| crate::server::backend_context(e, "GET exec result"))?;
833 Ok(payload)
834 }
835
836 pub async fn list_pending_waitpoints(
882 &self,
883 execution_id: &ExecutionId,
884 ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
885 let partition = execution_partition(execution_id, &self.config.partition_config);
886 let ctx = ExecKeyContext::new(&partition, execution_id);
887
888 let core_exists: bool = self
889 .client
890 .cmd("EXISTS")
891 .arg(ctx.core())
892 .execute()
893 .await
894 .map_err(|e| crate::server::backend_context(e, "EXISTS exec_core (pending waitpoints)"))?;
895 if !core_exists {
896 return Err(ServerError::NotFound(format!(
897 "execution not found: {execution_id}"
898 )));
899 }
900
901 const WAITPOINTS_SSCAN_COUNT: usize = 100;
909 let waitpoints_key = ctx.waitpoints();
910 let mut wp_ids_raw: Vec<String> = Vec::new();
911 let mut cursor: String = "0".to_owned();
912 loop {
913 let reply: (String, Vec<String>) = self
914 .client
915 .cmd("SSCAN")
916 .arg(&waitpoints_key)
917 .arg(&cursor)
918 .arg("COUNT")
919 .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
920 .execute()
921 .await
922 .map_err(|e| crate::server::backend_context(e, "SSCAN waitpoints"))?;
923 cursor = reply.0;
924 wp_ids_raw.extend(reply.1);
925 if cursor == "0" {
926 break;
927 }
928 }
929
930 wp_ids_raw.sort_unstable();
938 wp_ids_raw.dedup();
939
940 if wp_ids_raw.is_empty() {
941 return Ok(Vec::new());
942 }
943
944 let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
948 for raw in &wp_ids_raw {
949 match WaitpointId::parse(raw) {
950 Ok(id) => wp_ids.push(id),
951 Err(e) => tracing::warn!(
952 raw_id = %raw,
953 error = %e,
954 execution_id = %execution_id,
955 "list_pending_waitpoints: skipping unparseable waitpoint_id"
956 ),
957 }
958 }
959 if wp_ids.is_empty() {
960 return Ok(Vec::new());
961 }
962
963 const WP_FIELDS: [&str; 6] = [
967 "state",
968 "waitpoint_key",
969 "waitpoint_token",
970 "created_at",
971 "activated_at",
972 "expires_at",
973 ];
974
975 let mut pass1 = self.client.pipeline();
980 let mut wp_slots = Vec::with_capacity(wp_ids.len());
981 let mut cond_slots = Vec::with_capacity(wp_ids.len());
982 for wp_id in &wp_ids {
983 let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
984 cmd = cmd.arg(ctx.waitpoint(wp_id));
985 for f in WP_FIELDS {
986 cmd = cmd.arg(f);
987 }
988 wp_slots.push(cmd.finish());
989
990 cond_slots.push(
991 pass1
992 .cmd::<Option<String>>("HGET")
993 .arg(ctx.waitpoint_condition(wp_id))
994 .arg("total_matchers")
995 .finish(),
996 );
997 }
998 pass1
999 .execute()
1000 .await
1001 .map_err(|e| crate::server::backend_context(e, "pipeline HMGET waitpoints + HGET total_matchers"))?;
1002
1003 struct Kept {
1009 wp_id: WaitpointId,
1010 wp_fields: Vec<Option<String>>,
1011 total_matchers: usize,
1012 }
1013 let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
1014 for ((wp_id, wp_slot), cond_slot) in wp_ids
1015 .iter()
1016 .zip(wp_slots)
1017 .zip(cond_slots)
1018 {
1019 let wp_fields: Vec<Option<String>> =
1020 wp_slot.value().map_err(|e| crate::server::backend_context(e, format!("pipeline slot HMGET waitpoint {wp_id}")))?;
1021
1022 if wp_fields.iter().all(Option::is_none) {
1025 let _ = cond_slot.value();
1027 continue;
1028 }
1029 let state_ref = wp_fields
1030 .first()
1031 .and_then(|v| v.as_deref())
1032 .unwrap_or("");
1033 if state_ref != "pending" && state_ref != "active" {
1034 let _ = cond_slot.value();
1035 continue;
1036 }
1037 let token_ref = wp_fields
1038 .get(2)
1039 .and_then(|v| v.as_deref())
1040 .unwrap_or("");
1041 if token_ref.is_empty() {
1042 let _ = cond_slot.value();
1043 tracing::warn!(
1044 waitpoint_id = %wp_id,
1045 execution_id = %execution_id,
1046 waitpoint_hash_key = %ctx.waitpoint(wp_id),
1047 state = %state_ref,
1048 "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1049 field is empty — likely storage corruption (half-populated write, \
1050 operator edit, or interrupted script). Skipping this entry in the \
1051 response. HGETALL the waitpoint_hash_key to inspect."
1052 );
1053 continue;
1054 }
1055
1056 let total_matchers = cond_slot
1057 .value()
1058 .map_err(|e| crate::server::backend_context(e, format!("pipeline slot HGET total_matchers {wp_id}")))?
1059 .and_then(|s| s.parse::<usize>().ok())
1060 .unwrap_or(0);
1061
1062 kept.push(Kept {
1063 wp_id: wp_id.clone(),
1064 wp_fields,
1065 total_matchers,
1066 });
1067 }
1068
1069 if kept.is_empty() {
1070 return Ok(Vec::new());
1071 }
1072
1073 let mut pass2 = self.client.pipeline();
1078 let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1079 let mut pass2_needed = false;
1080 for k in &kept {
1081 if k.total_matchers == 0 {
1082 matcher_slots.push(None);
1083 continue;
1084 }
1085 pass2_needed = true;
1086 let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1087 cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1088 for i in 0..k.total_matchers {
1089 cmd = cmd.arg(format!("matcher:{i}:name"));
1090 }
1091 matcher_slots.push(Some(cmd.finish()));
1092 }
1093 if pass2_needed {
1094 pass2.execute().await.map_err(|e| crate::server::backend_context(e, "pipeline HMGET wp_condition matchers"))?;
1095 }
1096
1097 let parse_ts = |raw: &str| -> Option<TimestampMs> {
1098 if raw.is_empty() {
1099 None
1100 } else {
1101 raw.parse::<i64>().ok().map(TimestampMs)
1102 }
1103 };
1104
1105 let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1106 for (k, slot) in kept.into_iter().zip(matcher_slots) {
1107 let get = |i: usize| -> &str {
1108 k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1109 };
1110
1111 let required_signal_names: Vec<String> = match slot {
1114 None => Vec::new(),
1115 Some(s) => {
1116 let vals: Vec<Option<String>> =
1117 s.value().map_err(|e| crate::server::backend_context(e, format!(
1118 "pipeline slot HMGET wp_condition matchers {}",
1119 k.wp_id
1120 )))?;
1121 vals.into_iter()
1122 .flatten()
1123 .filter(|name| !name.is_empty())
1124 .collect()
1125 }
1126 };
1127
1128 out.push(PendingWaitpointInfo {
1129 waitpoint_id: k.wp_id,
1130 waitpoint_key: get(1).to_owned(),
1131 state: get(0).to_owned(),
1132 waitpoint_token: WaitpointToken(get(2).to_owned()),
1133 required_signal_names,
1134 created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1135 activated_at: parse_ts(get(4)),
1136 expires_at: parse_ts(get(5)),
1137 });
1138 }
1139
1140 Ok(out)
1141 }
1142
1143 pub async fn create_budget(
1147 &self,
1148 args: &CreateBudgetArgs,
1149 ) -> Result<CreateBudgetResult, ServerError> {
1150 validate_create_budget_dimensions(
1152 &args.dimensions,
1153 &args.hard_limits,
1154 &args.soft_limits,
1155 )?;
1156 let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1157 let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1158 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1159 let policies_index = keys::budget_policies_index(bctx.hash_tag());
1160
1161 let fcall_keys: Vec<String> = vec![
1164 bctx.definition(),
1165 bctx.limits(),
1166 bctx.usage(),
1167 resets_key,
1168 policies_index,
1169 ];
1170
1171 let dim_count = args.dimensions.len();
1175 let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1176 fcall_args.push(args.budget_id.to_string());
1177 fcall_args.push(args.scope_type.clone());
1178 fcall_args.push(args.scope_id.clone());
1179 fcall_args.push(args.enforcement_mode.clone());
1180 fcall_args.push(args.on_hard_limit.clone());
1181 fcall_args.push(args.on_soft_limit.clone());
1182 fcall_args.push(args.reset_interval_ms.to_string());
1183 fcall_args.push(args.now.to_string());
1184 fcall_args.push(dim_count.to_string());
1185 for dim in &args.dimensions {
1186 fcall_args.push(dim.clone());
1187 }
1188 for hard in &args.hard_limits {
1189 fcall_args.push(hard.to_string());
1190 }
1191 for soft in &args.soft_limits {
1192 fcall_args.push(soft.to_string());
1193 }
1194
1195 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1196 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1197
1198 let raw: Value = self
1199 .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1200 .await?;
1201
1202 parse_budget_create_result(&raw, &args.budget_id)
1203 }
1204
1205 pub async fn create_quota_policy(
1207 &self,
1208 args: &CreateQuotaPolicyArgs,
1209 ) -> Result<CreateQuotaPolicyResult, ServerError> {
1210 let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1211 let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1212
1213 let fcall_keys: Vec<String> = vec![
1216 qctx.definition(),
1217 qctx.window("requests_per_window"),
1218 qctx.concurrency(),
1219 qctx.admitted_set(),
1220 keys::quota_policies_index(qctx.hash_tag()),
1221 ];
1222
1223 let fcall_args: Vec<String> = vec![
1226 args.quota_policy_id.to_string(),
1227 args.window_seconds.to_string(),
1228 args.max_requests_per_window.to_string(),
1229 args.max_concurrent.to_string(),
1230 args.now.to_string(),
1231 ];
1232
1233 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1234 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1235
1236 let raw: Value = self
1237 .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1238 .await?;
1239
1240 parse_quota_create_result(&raw, &args.quota_policy_id)
1241 }
1242
1243 pub async fn get_budget_status(
1245 &self,
1246 budget_id: &BudgetId,
1247 ) -> Result<BudgetStatus, ServerError> {
1248 let partition = budget_partition(budget_id, &self.config.partition_config);
1249 let bctx = BudgetKeyContext::new(&partition, budget_id);
1250
1251 let def: HashMap<String, String> = self
1253 .client
1254 .hgetall(&bctx.definition())
1255 .await
1256 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_def"))?;
1257
1258 if def.is_empty() {
1259 return Err(ServerError::NotFound(format!(
1260 "budget not found: {budget_id}"
1261 )));
1262 }
1263
1264 let usage_raw: HashMap<String, String> = self
1266 .client
1267 .hgetall(&bctx.usage())
1268 .await
1269 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_usage"))?;
1270 let usage: HashMap<String, u64> = usage_raw
1271 .into_iter()
1272 .filter(|(k, _)| k != "_init")
1273 .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1274 .collect();
1275
1276 let limits_raw: HashMap<String, String> = self
1278 .client
1279 .hgetall(&bctx.limits())
1280 .await
1281 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_limits"))?;
1282 let mut hard_limits = HashMap::new();
1283 let mut soft_limits = HashMap::new();
1284 for (k, v) in &limits_raw {
1285 if let Some(dim) = k.strip_prefix("hard:") {
1286 hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1287 } else if let Some(dim) = k.strip_prefix("soft:") {
1288 soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1289 }
1290 }
1291
1292 let non_empty = |s: Option<&String>| -> Option<String> {
1293 s.filter(|v| !v.is_empty()).cloned()
1294 };
1295
1296 Ok(BudgetStatus {
1297 budget_id: budget_id.to_string(),
1298 scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1299 scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1300 enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1301 usage,
1302 hard_limits,
1303 soft_limits,
1304 breach_count: def
1305 .get("breach_count")
1306 .and_then(|v| v.parse().ok())
1307 .unwrap_or(0),
1308 soft_breach_count: def
1309 .get("soft_breach_count")
1310 .and_then(|v| v.parse().ok())
1311 .unwrap_or(0),
1312 last_breach_at: non_empty(def.get("last_breach_at")),
1313 last_breach_dim: non_empty(def.get("last_breach_dim")),
1314 next_reset_at: non_empty(def.get("next_reset_at")),
1315 created_at: non_empty(def.get("created_at")),
1316 })
1317 }
1318
1319 pub async fn report_usage(
1321 &self,
1322 budget_id: &BudgetId,
1323 args: &ReportUsageArgs,
1324 ) -> Result<ReportUsageResult, ServerError> {
1325 validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1327 let partition = budget_partition(budget_id, &self.config.partition_config);
1328 let bctx = BudgetKeyContext::new(&partition, budget_id);
1329
1330 let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1332
1333 let dim_count = args.dimensions.len();
1335 let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1336 fcall_args.push(dim_count.to_string());
1337 for dim in &args.dimensions {
1338 fcall_args.push(dim.clone());
1339 }
1340 for delta in &args.deltas {
1341 fcall_args.push(delta.to_string());
1342 }
1343 fcall_args.push(args.now.to_string());
1344 let dedup_key_val = args
1345 .dedup_key
1346 .as_ref()
1347 .filter(|k| !k.is_empty())
1348 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1349 .unwrap_or_default();
1350 fcall_args.push(dedup_key_val);
1351
1352 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1353 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1354
1355 let raw: Value = self
1356 .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1357 .await?;
1358
1359 parse_report_usage_result(&raw)
1360 }
1361
1362 pub async fn reset_budget(
1364 &self,
1365 budget_id: &BudgetId,
1366 ) -> Result<ResetBudgetResult, ServerError> {
1367 let partition = budget_partition(budget_id, &self.config.partition_config);
1368 let bctx = BudgetKeyContext::new(&partition, budget_id);
1369 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1370
1371 let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1373
1374 let now = TimestampMs::now();
1376 let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1377
1378 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1379 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1380
1381 let raw: Value = self
1382 .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1383 .await?;
1384
1385 parse_reset_budget_result(&raw)
1386 }
1387
1388 pub async fn create_flow(
1392 &self,
1393 args: &CreateFlowArgs,
1394 ) -> Result<CreateFlowResult, ServerError> {
1395 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1396 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1397 let fidx = FlowIndexKeys::new(&partition);
1398
1399 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1401
1402 let fcall_args: Vec<String> = vec![
1404 args.flow_id.to_string(),
1405 args.flow_kind.clone(),
1406 args.namespace.to_string(),
1407 args.now.to_string(),
1408 ];
1409
1410 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1411 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1412
1413 let raw: Value = self
1414 .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1415 .await?;
1416
1417 parse_create_flow_result(&raw, &args.flow_id)
1418 }
1419
1420 pub async fn add_execution_to_flow(
1458 &self,
1459 args: &AddExecutionToFlowArgs,
1460 ) -> Result<AddExecutionToFlowResult, ServerError> {
1461 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1462 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1463 let fidx = FlowIndexKeys::new(&partition);
1464
1465 let exec_partition =
1469 execution_partition(&args.execution_id, &self.config.partition_config);
1470 let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1471
1472 if exec_partition.index != partition.index {
1481 return Err(ServerError::PartitionMismatch(format!(
1482 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1483 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1484 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1485 exec_p = exec_partition.index,
1486 flow_p = partition.index,
1487 )));
1488 }
1489
1490 let fcall_keys: Vec<String> = vec![
1492 fctx.core(),
1493 fctx.members(),
1494 fidx.flow_index(),
1495 ectx.core(),
1496 ];
1497
1498 let fcall_args: Vec<String> = vec![
1500 args.flow_id.to_string(),
1501 args.execution_id.to_string(),
1502 args.now.to_string(),
1503 ];
1504
1505 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1506 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1507
1508 let raw: Value = self
1509 .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1510 .await?;
1511
1512 parse_add_execution_to_flow_result(&raw)
1513 }
1514
1515 pub async fn cancel_flow(
1557 &self,
1558 args: &CancelFlowArgs,
1559 ) -> Result<CancelFlowResult, ServerError> {
1560 self.cancel_flow_inner(args, false).await
1561 }
1562
1563 pub async fn cancel_flow_wait(
1567 &self,
1568 args: &CancelFlowArgs,
1569 ) -> Result<CancelFlowResult, ServerError> {
1570 self.cancel_flow_inner(args, true).await
1571 }
1572
1573 async fn cancel_flow_inner(
1574 &self,
1575 args: &CancelFlowArgs,
1576 wait: bool,
1577 ) -> Result<CancelFlowResult, ServerError> {
1578 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1579 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1580 let fidx = FlowIndexKeys::new(&partition);
1581
1582 const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1587
1588 let fcall_keys: Vec<String> = vec![
1590 fctx.core(),
1591 fctx.members(),
1592 fidx.flow_index(),
1593 fctx.pending_cancels(),
1594 fidx.cancel_backlog(),
1595 ];
1596
1597 let fcall_args: Vec<String> = vec![
1599 args.flow_id.to_string(),
1600 args.reason.clone(),
1601 args.cancellation_policy.clone(),
1602 args.now.to_string(),
1603 CANCEL_RECONCILER_GRACE_MS.to_string(),
1604 ];
1605
1606 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1607 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1608
1609 let raw: Value = self
1610 .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1611 .await?;
1612
1613 let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1614 ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1615 (policy, member_execution_ids)
1616 }
1617 ParsedCancelFlow::AlreadyTerminal => {
1623 let flow_meta: Vec<Option<String>> = self
1624 .client
1625 .cmd("HMGET")
1626 .arg(fctx.core())
1627 .arg("cancellation_policy")
1628 .arg("cancel_reason")
1629 .execute()
1630 .await
1631 .map_err(|e| crate::server::backend_context(e, "HMGET flow_core cancellation_policy,cancel_reason"))?;
1632 let stored_policy = flow_meta
1633 .first()
1634 .and_then(|v| v.as_ref())
1635 .filter(|s| !s.is_empty())
1636 .cloned();
1637 let stored_reason = flow_meta
1638 .get(1)
1639 .and_then(|v| v.as_ref())
1640 .filter(|s| !s.is_empty())
1641 .cloned();
1642 let all_members: Vec<String> = self
1643 .client
1644 .cmd("SMEMBERS")
1645 .arg(fctx.members())
1646 .execute()
1647 .await
1648 .map_err(|e| crate::server::backend_context(e, "SMEMBERS flow members (already terminal)"))?;
1649 let total_members = all_members.len();
1656 let stored_members: Vec<String> = all_members
1657 .into_iter()
1658 .take(ALREADY_TERMINAL_MEMBER_CAP)
1659 .collect();
1660 tracing::debug!(
1661 flow_id = %args.flow_id,
1662 stored_policy = stored_policy.as_deref().unwrap_or(""),
1663 stored_reason = stored_reason.as_deref().unwrap_or(""),
1664 total_members,
1665 returned_members = stored_members.len(),
1666 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1667 );
1668 return Ok(CancelFlowResult::Cancelled {
1669 cancellation_policy: stored_policy
1673 .unwrap_or_else(|| args.cancellation_policy.clone()),
1674 member_execution_ids: stored_members,
1675 });
1676 }
1677 };
1678 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1679
1680 if !needs_dispatch {
1681 return Ok(CancelFlowResult::Cancelled {
1682 cancellation_policy: policy,
1683 member_execution_ids: members,
1684 });
1685 }
1686
1687 let pending_cancels_key = fctx.pending_cancels();
1688 let cancel_backlog_key = fidx.cancel_backlog();
1689
1690 if wait {
1691 let mut failed: Vec<String> = Vec::new();
1700 for eid_str in &members {
1701 match cancel_member_execution(
1702 &self.client,
1703 &self.config.partition_config,
1704 eid_str,
1705 &args.reason,
1706 args.now,
1707 )
1708 .await
1709 {
1710 Ok(()) => {
1711 ack_cancel_member(
1712 &self.client,
1713 &pending_cancels_key,
1714 &cancel_backlog_key,
1715 eid_str,
1716 &args.flow_id.to_string(),
1717 )
1718 .await;
1719 }
1720 Err(e) => {
1721 if is_terminal_ack_error(&e) {
1728 ack_cancel_member(
1729 &self.client,
1730 &pending_cancels_key,
1731 &cancel_backlog_key,
1732 eid_str,
1733 &args.flow_id.to_string(),
1734 )
1735 .await;
1736 continue;
1737 }
1738 tracing::warn!(
1739 execution_id = %eid_str,
1740 error = %e,
1741 "cancel_flow(wait): individual execution cancel failed \
1742 (transport/contract fault; reconciler will retry if transient)"
1743 );
1744 failed.push(eid_str.clone());
1745 }
1746 }
1747 }
1748 if failed.is_empty() {
1749 return Ok(CancelFlowResult::Cancelled {
1750 cancellation_policy: policy,
1751 member_execution_ids: members,
1752 });
1753 }
1754 return Ok(CancelFlowResult::PartiallyCancelled {
1755 cancellation_policy: policy,
1756 member_execution_ids: members,
1757 failed_member_execution_ids: failed,
1758 });
1759 }
1760
1761 let client = self.client.clone();
1764 let partition_config = self.config.partition_config;
1765 let reason = args.reason.clone();
1766 let now = args.now;
1767 let dispatch_members = members.clone();
1768 let flow_id = args.flow_id.clone();
1769 let mut guard = self.background_tasks.lock().await;
1775
1776 while let Some(joined) = guard.try_join_next() {
1783 if let Err(e) = joined {
1784 tracing::warn!(
1785 error = %e,
1786 "cancel_flow: background dispatch task panicked or was aborted"
1787 );
1788 }
1789 }
1790
1791 let pending_key_owned = pending_cancels_key.clone();
1792 let backlog_key_owned = cancel_backlog_key.clone();
1793 let flow_id_str = args.flow_id.to_string();
1794
1795 guard.spawn(async move {
1796 use futures::stream::StreamExt;
1803 const CONCURRENCY: usize = 16;
1804
1805 let member_count = dispatch_members.len();
1806 let flow_id_for_log = flow_id.clone();
1807 futures::stream::iter(dispatch_members)
1808 .map(|eid_str| {
1809 let client = client.clone();
1810 let reason = reason.clone();
1811 let flow_id = flow_id.clone();
1812 let pending = pending_key_owned.clone();
1813 let backlog = backlog_key_owned.clone();
1814 let flow_id_str = flow_id_str.clone();
1815 async move {
1816 match cancel_member_execution(
1817 &client,
1818 &partition_config,
1819 &eid_str,
1820 &reason,
1821 now,
1822 )
1823 .await
1824 {
1825 Ok(()) => {
1826 ack_cancel_member(
1827 &client,
1828 &pending,
1829 &backlog,
1830 &eid_str,
1831 &flow_id_str,
1832 )
1833 .await;
1834 }
1835 Err(e) => {
1836 if is_terminal_ack_error(&e) {
1837 ack_cancel_member(
1838 &client,
1839 &pending,
1840 &backlog,
1841 &eid_str,
1842 &flow_id_str,
1843 )
1844 .await;
1845 } else {
1846 tracing::warn!(
1847 flow_id = %flow_id,
1848 execution_id = %eid_str,
1849 error = %e,
1850 "cancel_flow(async): individual execution cancel failed \
1851 (transport/contract fault; reconciler will retry if transient)"
1852 );
1853 }
1854 }
1855 }
1856 }
1857 })
1858 .buffer_unordered(CONCURRENCY)
1859 .for_each(|()| async {})
1860 .await;
1861
1862 tracing::debug!(
1863 flow_id = %flow_id_for_log,
1864 member_count,
1865 concurrency = CONCURRENCY,
1866 "cancel_flow: background member dispatch complete"
1867 );
1868 });
1869 drop(guard);
1870
1871 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1872 Ok(CancelFlowResult::CancellationScheduled {
1873 cancellation_policy: policy,
1874 member_count,
1875 member_execution_ids: members,
1876 })
1877 }
1878
1879 pub async fn stage_dependency_edge(
1884 &self,
1885 args: &StageDependencyEdgeArgs,
1886 ) -> Result<StageDependencyEdgeResult, ServerError> {
1887 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1888 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1889
1890 let fcall_keys: Vec<String> = vec![
1892 fctx.core(),
1893 fctx.members(),
1894 fctx.edge(&args.edge_id),
1895 fctx.outgoing(&args.upstream_execution_id),
1896 fctx.incoming(&args.downstream_execution_id),
1897 fctx.grant(&args.edge_id.to_string()),
1898 ];
1899
1900 let fcall_args: Vec<String> = vec![
1903 args.flow_id.to_string(),
1904 args.edge_id.to_string(),
1905 args.upstream_execution_id.to_string(),
1906 args.downstream_execution_id.to_string(),
1907 args.dependency_kind.clone(),
1908 args.data_passing_ref.clone().unwrap_or_default(),
1909 args.expected_graph_revision.to_string(),
1910 args.now.to_string(),
1911 ];
1912
1913 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1914 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1915
1916 let raw: Value = self
1917 .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1918 .await?;
1919
1920 parse_stage_dependency_edge_result(&raw)
1921 }
1922
1923 pub async fn apply_dependency_to_child(
1928 &self,
1929 args: &ApplyDependencyToChildArgs,
1930 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1931 let partition = execution_partition(
1932 &args.downstream_execution_id,
1933 &self.config.partition_config,
1934 );
1935 let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1936 let idx = IndexKeys::new(&partition);
1937
1938 let lane_str: Option<String> = self
1940 .client
1941 .hget(&ctx.core(), "lane_id")
1942 .await
1943 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
1944 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1945
1946 let fcall_keys: Vec<String> = vec![
1949 ctx.core(),
1950 ctx.deps_meta(),
1951 ctx.deps_unresolved(),
1952 ctx.dep_edge(&args.edge_id),
1953 idx.lane_eligible(&lane),
1954 idx.lane_blocked_dependencies(&lane),
1955 ctx.deps_all_edges(),
1956 ];
1957
1958 let fcall_args: Vec<String> = vec![
1961 args.flow_id.to_string(),
1962 args.edge_id.to_string(),
1963 args.upstream_execution_id.to_string(),
1964 args.graph_revision.to_string(),
1965 args.dependency_kind.clone(),
1966 args.data_passing_ref.clone().unwrap_or_default(),
1967 args.now.to_string(),
1968 ];
1969
1970 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1971 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1972
1973 let raw: Value = self
1974 .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1975 .await?;
1976
1977 parse_apply_dependency_result(&raw)
1978 }
1979
1980 pub async fn deliver_signal(
1987 &self,
1988 args: &DeliverSignalArgs,
1989 ) -> Result<DeliverSignalResult, ServerError> {
1990 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1991 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1992 let idx = IndexKeys::new(&partition);
1993
1994 let lane_str: Option<String> = self
1996 .client
1997 .hget(&ctx.core(), "lane_id")
1998 .await
1999 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2000 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2001
2002 let wp_id = &args.waitpoint_id;
2003 let sig_id = &args.signal_id;
2004 let idem_key = args
2005 .idempotency_key
2006 .as_ref()
2007 .filter(|k| !k.is_empty())
2008 .map(|k| ctx.signal_dedup(wp_id, k))
2009 .unwrap_or_else(|| ctx.noop());
2010
2011 let fcall_keys: Vec<String> = vec![
2017 ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
2032
2033 let fcall_args: Vec<String> = vec![
2040 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
2048 .map(|p| String::from_utf8_lossy(p).into_owned())
2049 .unwrap_or_default(), args.payload_encoding
2051 .clone()
2052 .unwrap_or_else(|| "json".to_owned()), args.idempotency_key
2054 .clone()
2055 .unwrap_or_default(), args.correlation_id
2057 .clone()
2058 .unwrap_or_default(), args.target_scope.clone(), args.created_at
2061 .map(|ts| ts.to_string())
2062 .unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
2067 .unwrap_or(10_000)
2068 .to_string(), args.waitpoint_token.as_str().to_owned(), ];
2073
2074 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2075 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2076
2077 let raw: Value = self
2078 .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2079 .await?;
2080
2081 parse_deliver_signal_result(&raw, &args.signal_id)
2082 }
2083
2084 pub async fn change_priority(
2088 &self,
2089 execution_id: &ExecutionId,
2090 new_priority: i32,
2091 ) -> Result<ChangePriorityResult, ServerError> {
2092 let partition = execution_partition(execution_id, &self.config.partition_config);
2093 let ctx = ExecKeyContext::new(&partition, execution_id);
2094 let idx = IndexKeys::new(&partition);
2095
2096 let lane_str: Option<String> = self
2098 .client
2099 .hget(&ctx.core(), "lane_id")
2100 .await
2101 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2102 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2103
2104 let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2106
2107 let fcall_args: Vec<String> = vec![
2109 execution_id.to_string(),
2110 new_priority.to_string(),
2111 ];
2112
2113 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2114 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2115
2116 let raw: Value = self
2117 .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2118 .await?;
2119
2120 parse_change_priority_result(&raw, execution_id)
2121 }
2122
2123 pub async fn claim_for_worker(
2137 &self,
2138 lane: &LaneId,
2139 worker_id: &WorkerId,
2140 worker_instance_id: &WorkerInstanceId,
2141 worker_capabilities: &std::collections::BTreeSet<String>,
2142 grant_ttl_ms: u64,
2143 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2144 self.scheduler
2145 .claim_for_worker(
2146 lane,
2147 worker_id,
2148 worker_instance_id,
2149 worker_capabilities,
2150 grant_ttl_ms,
2151 )
2152 .await
2153 .map_err(|e| match e {
2154 ff_scheduler::SchedulerError::Valkey(inner) => ServerError::from(inner),
2155 ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2156 crate::server::backend_context(source, context)
2157 }
2158 ff_scheduler::SchedulerError::Config(msg) => ServerError::InvalidInput(msg),
2159 })
2160 }
2161
2162 pub async fn revoke_lease(
2164 &self,
2165 execution_id: &ExecutionId,
2166 ) -> Result<RevokeLeaseResult, ServerError> {
2167 let partition = execution_partition(execution_id, &self.config.partition_config);
2168 let ctx = ExecKeyContext::new(&partition, execution_id);
2169 let idx = IndexKeys::new(&partition);
2170
2171 let wiid_str: Option<String> = self
2173 .client
2174 .hget(&ctx.core(), "current_worker_instance_id")
2175 .await
2176 .map_err(|e| crate::server::backend_context(e, "HGET worker_instance_id"))?;
2177 let wiid = match wiid_str {
2178 Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2179 _ => {
2180 return Err(ServerError::NotFound(format!(
2181 "no active lease for execution {execution_id} (no current_worker_instance_id)"
2182 )));
2183 }
2184 };
2185
2186 let fcall_keys: Vec<String> = vec![
2188 ctx.core(),
2189 ctx.lease_current(),
2190 ctx.lease_history(),
2191 idx.lease_expiry(),
2192 idx.worker_leases(&wiid),
2193 ];
2194
2195 let fcall_args: Vec<String> = vec![
2197 execution_id.to_string(),
2198 String::new(), "operator_revoke".to_owned(),
2200 ];
2201
2202 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2203 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2204
2205 let raw: Value = self
2206 .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2207 .await?;
2208
2209 parse_revoke_lease_result(&raw)
2210 }
2211
2212 pub async fn get_execution(
2214 &self,
2215 execution_id: &ExecutionId,
2216 ) -> Result<ExecutionInfo, ServerError> {
2217 let partition = execution_partition(execution_id, &self.config.partition_config);
2218 let ctx = ExecKeyContext::new(&partition, execution_id);
2219
2220 let fields: HashMap<String, String> = self
2221 .client
2222 .hgetall(&ctx.core())
2223 .await
2224 .map_err(|e| crate::server::backend_context(e, "HGETALL exec_core"))?;
2225
2226 if fields.is_empty() {
2227 return Err(ServerError::NotFound(format!(
2228 "execution not found: {execution_id}"
2229 )));
2230 }
2231
2232 let parse_enum = |field: &str| -> String {
2233 fields.get(field).cloned().unwrap_or_default()
2234 };
2235 fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2236 let quoted = format!("\"{raw}\"");
2237 serde_json::from_str("ed).map_err(|e| {
2238 ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2239 })
2240 }
2241
2242 let lp_str = parse_enum("lifecycle_phase");
2243 let os_str = parse_enum("ownership_state");
2244 let es_str = parse_enum("eligibility_state");
2245 let br_str = parse_enum("blocking_reason");
2246 let to_str = parse_enum("terminal_outcome");
2247 let as_str = parse_enum("attempt_state");
2248 let ps_str = parse_enum("public_state");
2249
2250 let state_vector = StateVector {
2251 lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2252 ownership_state: deserialize("ownership_state", &os_str)?,
2253 eligibility_state: deserialize("eligibility_state", &es_str)?,
2254 blocking_reason: deserialize("blocking_reason", &br_str)?,
2255 terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2256 attempt_state: deserialize("attempt_state", &as_str)?,
2257 public_state: deserialize("public_state", &ps_str)?,
2258 };
2259
2260 let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2267
2268 let started_at_opt = fields
2275 .get("started_at")
2276 .filter(|s| !s.is_empty())
2277 .cloned();
2278 let completed_at_opt = fields
2279 .get("completed_at")
2280 .filter(|s| !s.is_empty())
2281 .cloned();
2282
2283 Ok(ExecutionInfo {
2284 execution_id: execution_id.clone(),
2285 namespace: parse_enum("namespace"),
2286 lane_id: parse_enum("lane_id"),
2287 priority: fields
2288 .get("priority")
2289 .and_then(|v| v.parse().ok())
2290 .unwrap_or(0),
2291 execution_kind: parse_enum("execution_kind"),
2292 state_vector,
2293 public_state: deserialize("public_state", &ps_str)?,
2294 created_at: parse_enum("created_at"),
2295 started_at: started_at_opt,
2296 completed_at: completed_at_opt,
2297 current_attempt_index: fields
2298 .get("current_attempt_index")
2299 .and_then(|v| v.parse().ok())
2300 .unwrap_or(0),
2301 flow_id: flow_id_val,
2302 blocking_detail: parse_enum("blocking_detail"),
2303 })
2304 }
2305
2306 pub async fn list_executions(
2310 &self,
2311 partition_id: u16,
2312 lane: &LaneId,
2313 state_filter: &str,
2314 offset: u64,
2315 limit: u64,
2316 ) -> Result<ListExecutionsResult, ServerError> {
2317 let partition = ff_core::partition::Partition {
2318 family: ff_core::partition::PartitionFamily::Execution,
2319 index: partition_id,
2320 };
2321 let idx = IndexKeys::new(&partition);
2322
2323 let zset_key = match state_filter {
2324 "eligible" => idx.lane_eligible(lane),
2325 "delayed" => idx.lane_delayed(lane),
2326 "terminal" => idx.lane_terminal(lane),
2327 "suspended" => idx.lane_suspended(lane),
2328 "active" => idx.lane_active(lane),
2329 other => {
2330 return Err(ServerError::InvalidInput(format!(
2331 "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2332 )));
2333 }
2334 };
2335
2336 let eids: Vec<String> = self
2338 .client
2339 .cmd("ZRANGE")
2340 .arg(&zset_key)
2341 .arg("-inf")
2342 .arg("+inf")
2343 .arg("BYSCORE")
2344 .arg("LIMIT")
2345 .arg(offset)
2346 .arg(limit)
2347 .execute()
2348 .await
2349 .map_err(|e| crate::server::backend_context(e, format!("ZRANGE {zset_key}")))?;
2350
2351 if eids.is_empty() {
2352 return Ok(ListExecutionsResult {
2353 executions: vec![],
2354 total_returned: 0,
2355 });
2356 }
2357
2358 let mut parsed = Vec::with_capacity(eids.len());
2360 for eid_str in &eids {
2361 match ExecutionId::parse(eid_str) {
2362 Ok(id) => parsed.push(id),
2363 Err(e) => {
2364 tracing::warn!(
2365 raw_id = %eid_str,
2366 error = %e,
2367 zset = %zset_key,
2368 "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2369 );
2370 }
2371 }
2372 }
2373
2374 if parsed.is_empty() {
2375 return Ok(ListExecutionsResult {
2376 executions: vec![],
2377 total_returned: 0,
2378 });
2379 }
2380
2381 let mut pipe = self.client.pipeline();
2383 let mut slots = Vec::with_capacity(parsed.len());
2384 for eid in &parsed {
2385 let ep = execution_partition(eid, &self.config.partition_config);
2386 let ctx = ExecKeyContext::new(&ep, eid);
2387 let slot = pipe
2388 .cmd::<Vec<Option<String>>>("HMGET")
2389 .arg(ctx.core())
2390 .arg("namespace")
2391 .arg("lane_id")
2392 .arg("execution_kind")
2393 .arg("public_state")
2394 .arg("priority")
2395 .arg("created_at")
2396 .finish();
2397 slots.push(slot);
2398 }
2399
2400 pipe.execute()
2401 .await
2402 .map_err(|e| crate::server::backend_context(e, "pipeline HMGET"))?;
2403
2404 let mut summaries = Vec::with_capacity(parsed.len());
2405 for (eid, slot) in parsed.into_iter().zip(slots) {
2406 let fields: Vec<Option<String>> = slot.value()
2407 .map_err(|e| crate::server::backend_context(e, "pipeline slot"))?;
2408
2409 let field = |i: usize| -> String {
2410 fields
2411 .get(i)
2412 .and_then(|v| v.as_ref())
2413 .cloned()
2414 .unwrap_or_default()
2415 };
2416
2417 summaries.push(ExecutionSummary {
2418 execution_id: eid,
2419 namespace: field(0),
2420 lane_id: field(1),
2421 execution_kind: field(2),
2422 public_state: field(3),
2423 priority: field(4).parse().unwrap_or(0),
2424 created_at: field(5),
2425 });
2426 }
2427
2428 let total = summaries.len();
2429 Ok(ListExecutionsResult {
2430 executions: summaries,
2431 total_returned: total,
2432 })
2433 }
2434
2435 pub async fn replay_execution(
2440 &self,
2441 execution_id: &ExecutionId,
2442 ) -> Result<ReplayExecutionResult, ServerError> {
2443 let partition = execution_partition(execution_id, &self.config.partition_config);
2444 let ctx = ExecKeyContext::new(&partition, execution_id);
2445 let idx = IndexKeys::new(&partition);
2446
2447 let dyn_fields: Vec<Option<String>> = self
2459 .client
2460 .cmd("HMGET")
2461 .arg(ctx.core())
2462 .arg("lane_id")
2463 .arg("flow_id")
2464 .arg("terminal_outcome")
2465 .execute()
2466 .await
2467 .map_err(|e| crate::server::backend_context(e, "HMGET replay pre-read"))?;
2468 let lane = LaneId::new(
2469 dyn_fields
2470 .first()
2471 .and_then(|v| v.as_ref())
2472 .cloned()
2473 .unwrap_or_else(|| "default".to_owned()),
2474 );
2475 let flow_id_str = dyn_fields
2476 .get(1)
2477 .and_then(|v| v.as_ref())
2478 .cloned()
2479 .unwrap_or_default();
2480 let terminal_outcome = dyn_fields
2481 .get(2)
2482 .and_then(|v| v.as_ref())
2483 .cloned()
2484 .unwrap_or_default();
2485
2486 let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2487
2488 let mut fcall_keys: Vec<String> = vec![
2490 ctx.core(),
2491 idx.lane_terminal(&lane),
2492 idx.lane_eligible(&lane),
2493 ctx.lease_history(),
2494 ];
2495
2496 let now = TimestampMs::now();
2498 let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2499
2500 if is_skipped_flow_member {
2501 let flow_id = FlowId::parse(&flow_id_str)
2505 .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2506 let flow_part =
2507 flow_partition(&flow_id, &self.config.partition_config);
2508 let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2509 let edge_ids: Vec<String> = self
2510 .client
2511 .cmd("SMEMBERS")
2512 .arg(flow_ctx.incoming(execution_id))
2513 .execute()
2514 .await
2515 .map_err(|e| crate::server::backend_context(e, "SMEMBERS replay edges"))?;
2516
2517 fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
2522 let edge_id = EdgeId::parse(eid_str)
2523 .unwrap_or_else(|_| EdgeId::new());
2524 fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
2527 }
2528
2529 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2530 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2531
2532 let raw: Value = self
2533 .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2534 .await?;
2535
2536 parse_replay_result(&raw)
2537 }
2538
2539 pub async fn read_attempt_stream(
2551 &self,
2552 execution_id: &ExecutionId,
2553 attempt_index: AttemptIndex,
2554 from_id: &str,
2555 to_id: &str,
2556 count_limit: u64,
2557 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2558 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2559
2560 if count_limit == 0 {
2561 return Err(ServerError::InvalidInput(
2562 "count_limit must be >= 1".to_owned(),
2563 ));
2564 }
2565
2566 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2571 Ok(p) => p,
2572 Err(tokio::sync::TryAcquireError::NoPermits) => {
2573 return Err(ServerError::ConcurrencyLimitExceeded(
2574 "stream_ops",
2575 self.config.max_concurrent_stream_ops,
2576 ));
2577 }
2578 Err(tokio::sync::TryAcquireError::Closed) => {
2579 return Err(ServerError::OperationFailed(
2580 "stream semaphore closed (server shutting down)".into(),
2581 ));
2582 }
2583 };
2584
2585 let args = ReadFramesArgs {
2586 execution_id: execution_id.clone(),
2587 attempt_index,
2588 from_id: from_id.to_owned(),
2589 to_id: to_id.to_owned(),
2590 count_limit,
2591 };
2592
2593 let partition = execution_partition(execution_id, &self.config.partition_config);
2594 let ctx = ExecKeyContext::new(&partition, execution_id);
2595 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2596
2597 let result = ff_script::functions::stream::ff_read_attempt_stream(
2601 &self.tail_client, &keys, &args,
2602 )
2603 .await
2604 .map_err(script_error_to_server);
2605
2606 drop(permit);
2607
2608 match result? {
2609 ReadFramesResult::Frames(f) => Ok(f),
2610 }
2611 }
2612
2613 pub async fn tail_attempt_stream(
2631 &self,
2632 execution_id: &ExecutionId,
2633 attempt_index: AttemptIndex,
2634 last_id: &str,
2635 block_ms: u64,
2636 count_limit: u64,
2637 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2638 if count_limit == 0 {
2639 return Err(ServerError::InvalidInput(
2640 "count_limit must be >= 1".to_owned(),
2641 ));
2642 }
2643
2644 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2660 Ok(p) => p,
2661 Err(tokio::sync::TryAcquireError::NoPermits) => {
2662 return Err(ServerError::ConcurrencyLimitExceeded(
2663 "stream_ops",
2664 self.config.max_concurrent_stream_ops,
2665 ));
2666 }
2667 Err(tokio::sync::TryAcquireError::Closed) => {
2668 return Err(ServerError::OperationFailed(
2669 "stream semaphore closed (server shutting down)".into(),
2670 ));
2671 }
2672 };
2673
2674 let partition = execution_partition(execution_id, &self.config.partition_config);
2675 let ctx = ExecKeyContext::new(&partition, execution_id);
2676 let stream_key = ctx.stream(attempt_index);
2677 let stream_meta_key = ctx.stream_meta(attempt_index);
2678
2679 let _xread_guard = self.xread_block_lock.lock().await;
2687
2688 let result = ff_script::stream_tail::xread_block(
2689 &self.tail_client,
2690 &stream_key,
2691 &stream_meta_key,
2692 last_id,
2693 block_ms,
2694 count_limit,
2695 )
2696 .await
2697 .map_err(script_error_to_server);
2698
2699 drop(_xread_guard);
2700 drop(permit);
2701 result
2702 }
2703
2704 pub async fn shutdown(self) {
2727 tracing::info!("shutting down FlowFabric server");
2728
2729 self.stream_semaphore.close();
2734 tracing::info!(
2735 "stream semaphore closed; no new read/tail attempts will be accepted"
2736 );
2737
2738 let drain_timeout = Duration::from_secs(15);
2742 let background = self.background_tasks.clone();
2743 let drain = async move {
2744 let mut guard = background.lock().await;
2745 while guard.join_next().await.is_some() {}
2746 };
2747 match tokio::time::timeout(drain_timeout, drain).await {
2748 Ok(()) => {}
2749 Err(_) => {
2750 tracing::warn!(
2751 timeout_s = drain_timeout.as_secs(),
2752 "shutdown: background tasks did not finish in time, aborting"
2753 );
2754 self.background_tasks.lock().await.abort_all();
2755 }
2756 }
2757
2758 self.engine.shutdown().await;
2759 tracing::info!("FlowFabric server shutdown complete");
2760 }
2761}
2762
2763const REQUIRED_VALKEY_MAJOR: u32 = 7;
2769const REQUIRED_VALKEY_MINOR: u32 = 2;
2770
2771const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2776
2777async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2802 let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2803 let mut backoff = Duration::from_millis(200);
2804 loop {
2805 let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2806 match query_valkey_version(client).await {
2807 Ok((detected_major, detected_minor))
2808 if (detected_major, detected_minor)
2809 >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2810 {
2811 tracing::info!(
2812 detected_major,
2813 detected_minor,
2814 required_major = REQUIRED_VALKEY_MAJOR,
2815 required_minor = REQUIRED_VALKEY_MINOR,
2816 "Valkey version accepted"
2817 );
2818 return Ok(());
2819 }
2820 Ok((detected_major, detected_minor)) => (
2821 true,
2825 ServerError::ValkeyVersionTooLow {
2826 detected: format!("{detected_major}.{detected_minor}"),
2827 required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2828 },
2829 format!(
2830 "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2831 ),
2832 ),
2833 Err(e) => {
2834 let retryable = e
2839 .backend_kind()
2840 .map(|k| k.is_retryable())
2841 .unwrap_or(true);
2845 let detail = e.to_string();
2846 (retryable, e, detail)
2847 }
2848 };
2849
2850 if !should_retry {
2851 return Err(err_for_budget_exhaust);
2852 }
2853 if tokio::time::Instant::now() >= deadline {
2854 return Err(err_for_budget_exhaust);
2855 }
2856 tracing::warn!(
2857 backoff_ms = backoff.as_millis() as u64,
2858 detail = %log_detail,
2859 "valkey version check transient failure; retrying"
2860 );
2861 tokio::time::sleep(backoff).await;
2862 backoff = (backoff * 2).min(Duration::from_secs(5));
2863 }
2864}
2865
2866async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2879 let raw: Value = client
2880 .cmd("INFO")
2881 .arg("server")
2882 .execute()
2883 .await
2884 .map_err(|e| crate::server::backend_context(e, "INFO server"))?;
2885 let bodies = extract_info_bodies(&raw)?;
2886 let mut min_version: Option<(u32, u32)> = None;
2892 for body in &bodies {
2893 let version = parse_valkey_version(body)?;
2894 min_version = Some(match min_version {
2895 None => version,
2896 Some(existing) => existing.min(version),
2897 });
2898 }
2899 min_version.ok_or_else(|| {
2900 ServerError::OperationFailed(
2901 "valkey version check: cluster INFO returned no node bodies".into(),
2902 )
2903 })
2904}
2905
2906fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2912 match raw {
2913 Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2914 Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2915 Value::SimpleString(s) => Ok(vec![s.clone()]),
2916 Value::Map(entries) => {
2917 if entries.is_empty() {
2918 return Err(ServerError::OperationFailed(
2919 "valkey version check: cluster INFO returned empty map".into(),
2920 ));
2921 }
2922 let mut out = Vec::with_capacity(entries.len());
2923 for (_, body) in entries {
2924 out.extend(extract_info_bodies(body)?);
2925 }
2926 Ok(out)
2927 }
2928 other => Err(ServerError::OperationFailed(format!(
2929 "valkey version check: unexpected INFO shape: {other:?}"
2930 ))),
2931 }
2932}
2933
2934fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2949 let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2950 let trimmed = line.trim();
2951 let mut parts = trimmed.split('.');
2952 let major_str = parts.next().unwrap_or("").trim();
2953 if major_str.is_empty() {
2954 return Err(ServerError::OperationFailed(format!(
2955 "valkey version check: empty version field in '{trimmed}'"
2956 )));
2957 }
2958 let major = major_str.parse::<u32>().map_err(|_| {
2959 ServerError::OperationFailed(format!(
2960 "valkey version check: non-numeric major in '{trimmed}'"
2961 ))
2962 })?;
2963 let minor_str = parts.next().unwrap_or("").trim();
2967 if minor_str.is_empty() {
2968 return Err(ServerError::OperationFailed(format!(
2969 "valkey version check: missing minor component in '{trimmed}'"
2970 )));
2971 }
2972 let minor = minor_str.parse::<u32>().map_err(|_| {
2973 ServerError::OperationFailed(format!(
2974 "valkey version check: non-numeric minor in '{trimmed}'"
2975 ))
2976 })?;
2977 Ok((major, minor))
2978 };
2979 if let Some(valkey_line) = info
2981 .lines()
2982 .find_map(|line| line.strip_prefix("valkey_version:"))
2983 {
2984 return extract_major_minor(valkey_line);
2985 }
2986 let server_is_valkey = info
2991 .lines()
2992 .map(str::trim)
2993 .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
2994 if !server_is_valkey {
2995 return Err(ServerError::OperationFailed(
2996 "valkey version check: INFO missing valkey_version and server_name:valkey marker \
2997 (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
2998 .into(),
2999 ));
3000 }
3001 if let Some(redis_line) = info
3005 .lines()
3006 .find_map(|line| line.strip_prefix("redis_version:"))
3007 {
3008 return extract_major_minor(redis_line);
3009 }
3010 Err(ServerError::OperationFailed(
3011 "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
3012 .into(),
3013 ))
3014}
3015
3016async fn validate_or_create_partition_config(
3023 client: &Client,
3024 config: &PartitionConfig,
3025) -> Result<(), ServerError> {
3026 let key = keys::global_config_partitions();
3027
3028 let existing: HashMap<String, String> = client
3029 .hgetall(&key)
3030 .await
3031 .map_err(|e| crate::server::backend_context(e, format!("HGETALL {key}")))?;
3032
3033 if existing.is_empty() {
3034 tracing::info!("first boot: creating {key}");
3036 client
3037 .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
3038 .await
3039 .map_err(|e| crate::server::backend_context(e, "HSET num_flow_partitions"))?;
3040 client
3041 .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
3042 .await
3043 .map_err(|e| crate::server::backend_context(e, "HSET num_budget_partitions"))?;
3044 client
3045 .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
3046 .await
3047 .map_err(|e| crate::server::backend_context(e, "HSET num_quota_partitions"))?;
3048 return Ok(());
3049 }
3050
3051 let check = |field: &str, expected: u16| -> Result<(), ServerError> {
3053 let stored: u16 = existing
3054 .get(field)
3055 .and_then(|v| v.parse().ok())
3056 .unwrap_or(0);
3057 if stored != expected {
3058 return Err(ServerError::PartitionMismatch(format!(
3059 "{field}: stored={stored}, config={expected}. \
3060 Partition counts are fixed at deployment time. \
3061 Either fix your config or migrate the data."
3062 )));
3063 }
3064 Ok(())
3065 };
3066
3067 check("num_flow_partitions", config.num_flow_partitions)?;
3068 check("num_budget_partitions", config.num_budget_partitions)?;
3069 check("num_quota_partitions", config.num_quota_partitions)?;
3070
3071 tracing::info!("partition config validated against stored {key}");
3072 Ok(())
3073}
3074
3075const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3081
3082enum PartitionBootOutcome {
3085 Match,
3087 Mismatch,
3089 Repaired,
3091 Installed,
3093}
3094
3095const BOOT_INIT_CONCURRENCY: usize = 16;
3100
3101async fn init_one_partition(
3102 client: &Client,
3103 partition: Partition,
3104 secret_hex: &str,
3105) -> Result<PartitionBootOutcome, ServerError> {
3106 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3107
3108 let stored_kid: Option<String> = client
3116 .cmd("HGET")
3117 .arg(&key)
3118 .arg("current_kid")
3119 .execute()
3120 .await
3121 .map_err(|e| crate::server::backend_context(e, format!("HGET {key} current_kid (init probe)")))?;
3122
3123 if let Some(stored_kid) = stored_kid {
3124 let field = format!("secret:{stored_kid}");
3128 let stored_secret: Option<String> = client
3129 .hget(&key, &field)
3130 .await
3131 .map_err(|e| crate::server::backend_context(e, format!("HGET {key} secret:<kid> (init check)")))?;
3132 if stored_secret.is_none() {
3133 client
3139 .hset(&key, &field, secret_hex)
3140 .await
3141 .map_err(|e| crate::server::backend_context(e, format!("HSET {key} secret:<kid> (repair torn write)")))?;
3142 return Ok(PartitionBootOutcome::Repaired);
3143 }
3144 if stored_secret.as_deref() != Some(secret_hex) {
3145 return Ok(PartitionBootOutcome::Mismatch);
3146 }
3147 return Ok(PartitionBootOutcome::Match);
3148 }
3149
3150 let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3154 let _: i64 = client
3155 .cmd("HSET")
3156 .arg(&key)
3157 .arg("current_kid")
3158 .arg(WAITPOINT_HMAC_INITIAL_KID)
3159 .arg(&secret_field)
3160 .arg(secret_hex)
3161 .execute()
3162 .await
3163 .map_err(|e| crate::server::backend_context(e, format!("HSET {key} (init waitpoint HMAC atomic)")))?;
3164 Ok(PartitionBootOutcome::Installed)
3165}
3166
3167async fn initialize_waitpoint_hmac_secret(
3179 client: &Client,
3180 partition_config: &PartitionConfig,
3181 secret_hex: &str,
3182) -> Result<(), ServerError> {
3183 use futures::stream::{FuturesUnordered, StreamExt};
3184
3185 let n = partition_config.num_flow_partitions;
3186 tracing::info!(
3187 partitions = n,
3188 concurrency = BOOT_INIT_CONCURRENCY,
3189 "installing waitpoint HMAC secret across {n} execution partitions"
3190 );
3191
3192 let mut mismatch_count: u16 = 0;
3193 let mut repaired_count: u16 = 0;
3194 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3195 let mut next_index: u16 = 0;
3196
3197 loop {
3198 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3199 let partition = Partition {
3200 family: PartitionFamily::Execution,
3201 index: next_index,
3202 };
3203 let client = client.clone();
3204 let secret_hex = secret_hex.to_owned();
3205 pending.push(async move {
3206 init_one_partition(&client, partition, &secret_hex).await
3207 });
3208 next_index += 1;
3209 }
3210 match pending.next().await {
3211 Some(res) => match res? {
3212 PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3213 PartitionBootOutcome::Mismatch => mismatch_count += 1,
3214 PartitionBootOutcome::Repaired => repaired_count += 1,
3215 },
3216 None => break,
3217 }
3218 }
3219
3220 if repaired_count > 0 {
3221 tracing::warn!(
3222 repaired_partitions = repaired_count,
3223 total_partitions = n,
3224 "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3225 (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3226 );
3227 }
3228
3229 if mismatch_count > 0 {
3230 tracing::warn!(
3231 mismatched_partitions = mismatch_count,
3232 total_partitions = n,
3233 "stored/env secret mismatch on {mismatch_count} partitions — \
3234 env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3235 run POST /v1/admin/rotate-waitpoint-secret to sync"
3236 );
3237 }
3238
3239 tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3240 Ok(())
3241}
3242
3243#[derive(Debug, Clone, serde::Serialize)]
3245pub struct RotateWaitpointSecretResult {
3246 pub rotated: u16,
3248 pub failed: Vec<u16>,
3253 pub new_kid: String,
3255}
3256
3257impl Server {
3258 pub async fn rotate_waitpoint_secret(
3266 &self,
3267 new_kid: &str,
3268 new_secret_hex: &str,
3269 ) -> Result<RotateWaitpointSecretResult, ServerError> {
3270 if new_kid.is_empty() || new_kid.contains(':') {
3271 return Err(ServerError::OperationFailed(
3272 "new_kid must be non-empty and must not contain ':'".into(),
3273 ));
3274 }
3275 if new_secret_hex.is_empty()
3276 || !new_secret_hex.len().is_multiple_of(2)
3277 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3278 {
3279 return Err(ServerError::OperationFailed(
3280 "new_secret_hex must be a non-empty even-length hex string".into(),
3281 ));
3282 }
3283
3284 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3292 Ok(p) => p,
3293 Err(tokio::sync::TryAcquireError::NoPermits) => {
3294 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3295 }
3296 Err(tokio::sync::TryAcquireError::Closed) => {
3297 return Err(ServerError::OperationFailed(
3298 "admin rotate semaphore closed (server shutting down)".into(),
3299 ));
3300 }
3301 };
3302
3303 let n = self.config.partition_config.num_flow_partitions;
3304 let grace_ms = self.config.waitpoint_hmac_grace_ms;
3308
3309 use futures::stream::{FuturesUnordered, StreamExt};
3320
3321 let mut rotated = 0u16;
3322 let mut failed = Vec::new();
3323 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3324 let mut next_index: u16 = 0;
3325
3326 loop {
3327 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3328 let partition = Partition {
3329 family: PartitionFamily::Execution,
3330 index: next_index,
3331 };
3332 let idx = next_index;
3333 let new_kid_owned = new_kid.to_owned();
3338 let new_secret_owned = new_secret_hex.to_owned();
3339 let partition_owned = partition;
3340 let fut = async move {
3341 let outcome = self
3342 .rotate_single_partition(
3343 &partition_owned,
3344 &new_kid_owned,
3345 &new_secret_owned,
3346 grace_ms,
3347 )
3348 .await;
3349 (idx, partition_owned, outcome)
3350 };
3351 pending.push(fut);
3352 next_index += 1;
3353 }
3354 match pending.next().await {
3355 Some((idx, partition, outcome)) => match outcome {
3356 Ok(()) => {
3357 rotated += 1;
3358 tracing::debug!(
3366 partition = %partition,
3367 new_kid = %new_kid,
3368 "waitpoint_hmac_rotated"
3369 );
3370 }
3371 Err(e) => {
3372 tracing::error!(
3376 target: "audit",
3377 partition = %partition,
3378 err = %e,
3379 "waitpoint_hmac_rotation_failed"
3380 );
3381 failed.push(idx);
3382 }
3383 },
3384 None => break,
3385 }
3386 }
3387
3388 tracing::info!(
3392 target: "audit",
3393 new_kid = %new_kid,
3394 total_partitions = n,
3395 rotated,
3396 failed_count = failed.len(),
3397 "waitpoint_hmac_rotation_complete"
3398 );
3399
3400 Ok(RotateWaitpointSecretResult {
3401 rotated,
3402 failed,
3403 new_kid: new_kid.to_owned(),
3404 })
3405 }
3406
3407 async fn rotate_single_partition(
3414 &self,
3415 partition: &Partition,
3416 new_kid: &str,
3417 new_secret_hex: &str,
3418 grace_ms: u64,
3419 ) -> Result<(), ServerError> {
3420 let idx = IndexKeys::new(partition);
3421 let args = RotateWaitpointHmacSecretArgs {
3422 new_kid: new_kid.to_owned(),
3423 new_secret_hex: new_secret_hex.to_owned(),
3424 grace_ms,
3425 };
3426 let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3427 &self.client,
3428 &idx,
3429 &args,
3430 )
3431 .await
3432 .map_err(|e| match e {
3433 ff_script::ScriptError::RotationConflict(kid) => {
3437 ServerError::OperationFailed(format!(
3438 "rotation conflict: kid {kid} already installed with a \
3439 different secret. Either use a fresh kid or restore the \
3440 original secret for this kid before retrying."
3441 ))
3442 }
3443 ff_script::ScriptError::Valkey(v) => crate::server::backend_context(
3444 v,
3445 format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3446 ),
3447 other => ServerError::OperationFailed(format!(
3448 "rotation failed on partition {partition}: {other}"
3449 )),
3450 })?;
3451 let _ = outcome;
3454 Ok(())
3455 }
3456}
3457
3458fn parse_create_result(
3461 raw: &Value,
3462 execution_id: &ExecutionId,
3463) -> Result<CreateExecutionResult, ServerError> {
3464 let arr = match raw {
3465 Value::Array(arr) => arr,
3466 _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3467 };
3468
3469 let status = match arr.first() {
3470 Some(Ok(Value::Int(n))) => *n,
3471 _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3472 };
3473
3474 if status == 1 {
3475 let sub = arr
3477 .get(1)
3478 .and_then(|v| match v {
3479 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3480 Ok(Value::SimpleString(s)) => Some(s.clone()),
3481 _ => None,
3482 })
3483 .unwrap_or_default();
3484
3485 if sub == "DUPLICATE" {
3486 Ok(CreateExecutionResult::Duplicate {
3487 execution_id: execution_id.clone(),
3488 })
3489 } else {
3490 Ok(CreateExecutionResult::Created {
3491 execution_id: execution_id.clone(),
3492 public_state: PublicState::Waiting,
3493 })
3494 }
3495 } else {
3496 let error_code = arr
3497 .get(1)
3498 .and_then(|v| match v {
3499 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3500 Ok(Value::SimpleString(s)) => Some(s.clone()),
3501 _ => None,
3502 })
3503 .unwrap_or_else(|| "unknown".to_owned());
3504 Err(ServerError::OperationFailed(format!(
3505 "ff_create_execution failed: {error_code}"
3506 )))
3507 }
3508}
3509
3510fn parse_cancel_result(
3511 raw: &Value,
3512 execution_id: &ExecutionId,
3513) -> Result<CancelExecutionResult, ServerError> {
3514 let arr = match raw {
3515 Value::Array(arr) => arr,
3516 _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3517 };
3518
3519 let status = match arr.first() {
3520 Some(Ok(Value::Int(n))) => *n,
3521 _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3522 };
3523
3524 if status == 1 {
3525 Ok(CancelExecutionResult::Cancelled {
3526 execution_id: execution_id.clone(),
3527 public_state: PublicState::Cancelled,
3528 })
3529 } else {
3530 let error_code = arr
3531 .get(1)
3532 .and_then(|v| match v {
3533 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3534 Ok(Value::SimpleString(s)) => Some(s.clone()),
3535 _ => None,
3536 })
3537 .unwrap_or_else(|| "unknown".to_owned());
3538 Err(ServerError::OperationFailed(format!(
3539 "ff_cancel_execution failed: {error_code}"
3540 )))
3541 }
3542}
3543
3544fn parse_budget_create_result(
3545 raw: &Value,
3546 budget_id: &BudgetId,
3547) -> Result<CreateBudgetResult, ServerError> {
3548 let arr = match raw {
3549 Value::Array(arr) => arr,
3550 _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3551 };
3552
3553 let status = match arr.first() {
3554 Some(Ok(Value::Int(n))) => *n,
3555 _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3556 };
3557
3558 if status == 1 {
3559 let sub = arr
3560 .get(1)
3561 .and_then(|v| match v {
3562 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3563 Ok(Value::SimpleString(s)) => Some(s.clone()),
3564 _ => None,
3565 })
3566 .unwrap_or_default();
3567
3568 if sub == "ALREADY_SATISFIED" {
3569 Ok(CreateBudgetResult::AlreadySatisfied {
3570 budget_id: budget_id.clone(),
3571 })
3572 } else {
3573 Ok(CreateBudgetResult::Created {
3574 budget_id: budget_id.clone(),
3575 })
3576 }
3577 } else {
3578 let error_code = arr
3579 .get(1)
3580 .and_then(|v| match v {
3581 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3582 Ok(Value::SimpleString(s)) => Some(s.clone()),
3583 _ => None,
3584 })
3585 .unwrap_or_else(|| "unknown".to_owned());
3586 Err(ServerError::OperationFailed(format!(
3587 "ff_create_budget failed: {error_code}"
3588 )))
3589 }
3590}
3591
3592fn parse_quota_create_result(
3593 raw: &Value,
3594 quota_policy_id: &QuotaPolicyId,
3595) -> Result<CreateQuotaPolicyResult, ServerError> {
3596 let arr = match raw {
3597 Value::Array(arr) => arr,
3598 _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3599 };
3600
3601 let status = match arr.first() {
3602 Some(Ok(Value::Int(n))) => *n,
3603 _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3604 };
3605
3606 if status == 1 {
3607 let sub = arr
3608 .get(1)
3609 .and_then(|v| match v {
3610 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3611 Ok(Value::SimpleString(s)) => Some(s.clone()),
3612 _ => None,
3613 })
3614 .unwrap_or_default();
3615
3616 if sub == "ALREADY_SATISFIED" {
3617 Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3618 quota_policy_id: quota_policy_id.clone(),
3619 })
3620 } else {
3621 Ok(CreateQuotaPolicyResult::Created {
3622 quota_policy_id: quota_policy_id.clone(),
3623 })
3624 }
3625 } else {
3626 let error_code = arr
3627 .get(1)
3628 .and_then(|v| match v {
3629 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3630 Ok(Value::SimpleString(s)) => Some(s.clone()),
3631 _ => None,
3632 })
3633 .unwrap_or_else(|| "unknown".to_owned());
3634 Err(ServerError::OperationFailed(format!(
3635 "ff_create_quota_policy failed: {error_code}"
3636 )))
3637 }
3638}
3639
3640fn parse_create_flow_result(
3643 raw: &Value,
3644 flow_id: &FlowId,
3645) -> Result<CreateFlowResult, ServerError> {
3646 let arr = match raw {
3647 Value::Array(arr) => arr,
3648 _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3649 };
3650 let status = match arr.first() {
3651 Some(Ok(Value::Int(n))) => *n,
3652 _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3653 };
3654 if status == 1 {
3655 let sub = fcall_field_str(arr, 1);
3656 if sub == "ALREADY_SATISFIED" {
3657 Ok(CreateFlowResult::AlreadySatisfied {
3658 flow_id: flow_id.clone(),
3659 })
3660 } else {
3661 Ok(CreateFlowResult::Created {
3662 flow_id: flow_id.clone(),
3663 })
3664 }
3665 } else {
3666 let error_code = fcall_field_str(arr, 1);
3667 Err(ServerError::OperationFailed(format!(
3668 "ff_create_flow failed: {error_code}"
3669 )))
3670 }
3671}
3672
3673fn parse_add_execution_to_flow_result(
3674 raw: &Value,
3675) -> Result<AddExecutionToFlowResult, ServerError> {
3676 let arr = match raw {
3677 Value::Array(arr) => arr,
3678 _ => {
3679 return Err(ServerError::Script(
3680 "ff_add_execution_to_flow: expected Array".into(),
3681 ))
3682 }
3683 };
3684 let status = match arr.first() {
3685 Some(Ok(Value::Int(n))) => *n,
3686 _ => {
3687 return Err(ServerError::Script(
3688 "ff_add_execution_to_flow: bad status code".into(),
3689 ))
3690 }
3691 };
3692 if status == 1 {
3693 let sub = fcall_field_str(arr, 1);
3694 let eid_str = fcall_field_str(arr, 2);
3695 let nc_str = fcall_field_str(arr, 3);
3696 let eid = ExecutionId::parse(&eid_str)
3697 .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3698 let nc: u32 = nc_str.parse().unwrap_or(0);
3699 if sub == "ALREADY_SATISFIED" {
3700 Ok(AddExecutionToFlowResult::AlreadyMember {
3701 execution_id: eid,
3702 node_count: nc,
3703 })
3704 } else {
3705 Ok(AddExecutionToFlowResult::Added {
3706 execution_id: eid,
3707 new_node_count: nc,
3708 })
3709 }
3710 } else {
3711 let error_code = fcall_field_str(arr, 1);
3712 Err(ServerError::OperationFailed(format!(
3713 "ff_add_execution_to_flow failed: {error_code}"
3714 )))
3715 }
3716}
3717
3718enum ParsedCancelFlow {
3724 Cancelled {
3725 policy: String,
3726 member_execution_ids: Vec<String>,
3727 },
3728 AlreadyTerminal,
3729}
3730
3731fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3737 let arr = match raw {
3738 Value::Array(arr) => arr,
3739 _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3740 };
3741 let status = match arr.first() {
3742 Some(Ok(Value::Int(n))) => *n,
3743 _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3744 };
3745 if status != 1 {
3746 let error_code = fcall_field_str(arr, 1);
3747 if error_code == "flow_already_terminal" {
3748 return Ok(ParsedCancelFlow::AlreadyTerminal);
3749 }
3750 return Err(ServerError::OperationFailed(format!(
3751 "ff_cancel_flow failed: {error_code}"
3752 )));
3753 }
3754 let policy = fcall_field_str(arr, 2);
3756 let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3759 for i in 3..arr.len() {
3760 members.push(fcall_field_str(arr, i));
3761 }
3762 Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3763}
3764
3765fn parse_stage_dependency_edge_result(
3766 raw: &Value,
3767) -> Result<StageDependencyEdgeResult, ServerError> {
3768 let arr = match raw {
3769 Value::Array(arr) => arr,
3770 _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3771 };
3772 let status = match arr.first() {
3773 Some(Ok(Value::Int(n))) => *n,
3774 _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3775 };
3776 if status == 1 {
3777 let edge_id_str = fcall_field_str(arr, 2);
3778 let rev_str = fcall_field_str(arr, 3);
3779 let edge_id = EdgeId::parse(&edge_id_str)
3780 .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3781 let rev: u64 = rev_str.parse().unwrap_or(0);
3782 Ok(StageDependencyEdgeResult::Staged {
3783 edge_id,
3784 new_graph_revision: rev,
3785 })
3786 } else {
3787 let error_code = fcall_field_str(arr, 1);
3788 Err(ServerError::OperationFailed(format!(
3789 "ff_stage_dependency_edge failed: {error_code}"
3790 )))
3791 }
3792}
3793
3794fn parse_apply_dependency_result(
3795 raw: &Value,
3796) -> Result<ApplyDependencyToChildResult, ServerError> {
3797 let arr = match raw {
3798 Value::Array(arr) => arr,
3799 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3800 };
3801 let status = match arr.first() {
3802 Some(Ok(Value::Int(n))) => *n,
3803 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3804 };
3805 if status == 1 {
3806 let sub = fcall_field_str(arr, 1);
3807 if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3808 Ok(ApplyDependencyToChildResult::AlreadyApplied)
3809 } else {
3810 let count_str = fcall_field_str(arr, 2);
3812 let count: u32 = count_str.parse().unwrap_or(0);
3813 Ok(ApplyDependencyToChildResult::Applied {
3814 unsatisfied_count: count,
3815 })
3816 }
3817 } else {
3818 let error_code = fcall_field_str(arr, 1);
3819 Err(ServerError::OperationFailed(format!(
3820 "ff_apply_dependency_to_child failed: {error_code}"
3821 )))
3822 }
3823}
3824
3825fn parse_deliver_signal_result(
3826 raw: &Value,
3827 signal_id: &SignalId,
3828) -> Result<DeliverSignalResult, ServerError> {
3829 let arr = match raw {
3830 Value::Array(arr) => arr,
3831 _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3832 };
3833 let status = match arr.first() {
3834 Some(Ok(Value::Int(n))) => *n,
3835 _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3836 };
3837 if status == 1 {
3838 let sub = fcall_field_str(arr, 1);
3839 if sub == "DUPLICATE" {
3840 let existing_str = fcall_field_str(arr, 2);
3842 let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3843 Ok(DeliverSignalResult::Duplicate {
3844 existing_signal_id: existing_id,
3845 })
3846 } else {
3847 let effect = fcall_field_str(arr, 3);
3849 Ok(DeliverSignalResult::Accepted {
3850 signal_id: signal_id.clone(),
3851 effect,
3852 })
3853 }
3854 } else {
3855 let error_code = fcall_field_str(arr, 1);
3856 Err(ServerError::OperationFailed(format!(
3857 "ff_deliver_signal failed: {error_code}"
3858 )))
3859 }
3860}
3861
3862fn parse_change_priority_result(
3863 raw: &Value,
3864 execution_id: &ExecutionId,
3865) -> Result<ChangePriorityResult, ServerError> {
3866 let arr = match raw {
3867 Value::Array(arr) => arr,
3868 _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3869 };
3870 let status = match arr.first() {
3871 Some(Ok(Value::Int(n))) => *n,
3872 _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3873 };
3874 if status == 1 {
3875 Ok(ChangePriorityResult::Changed {
3876 execution_id: execution_id.clone(),
3877 })
3878 } else {
3879 let error_code = fcall_field_str(arr, 1);
3880 Err(ServerError::OperationFailed(format!(
3881 "ff_change_priority failed: {error_code}"
3882 )))
3883 }
3884}
3885
3886fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3887 let arr = match raw {
3888 Value::Array(arr) => arr,
3889 _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3890 };
3891 let status = match arr.first() {
3892 Some(Ok(Value::Int(n))) => *n,
3893 _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3894 };
3895 if status == 1 {
3896 let unsatisfied = fcall_field_str(arr, 2);
3898 let ps = if unsatisfied == "0" {
3899 PublicState::Waiting
3900 } else {
3901 PublicState::WaitingChildren
3902 };
3903 Ok(ReplayExecutionResult::Replayed { public_state: ps })
3904 } else {
3905 let error_code = fcall_field_str(arr, 1);
3906 Err(ServerError::OperationFailed(format!(
3907 "ff_replay_execution failed: {error_code}"
3908 )))
3909 }
3910}
3911
3912fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3923 match e {
3924 ff_script::error::ScriptError::Valkey(valkey_err) => {
3925 crate::server::backend_context(valkey_err, "stream FCALL transport")
3926 }
3927 other => ServerError::Script(other.to_string()),
3928 }
3929}
3930
3931fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3932 match arr.get(index) {
3933 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3934 Some(Ok(Value::SimpleString(s))) => s.clone(),
3935 Some(Ok(Value::Int(n))) => n.to_string(),
3936 _ => String::new(),
3937 }
3938}
3939
3940fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3944 let arr = match raw {
3945 Value::Array(arr) => arr,
3946 _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3947 };
3948 let status_code = match arr.first() {
3949 Some(Ok(Value::Int(n))) => *n,
3950 _ => {
3951 return Err(ServerError::Script(
3952 "ff_report_usage_and_check: expected Int status code".into(),
3953 ))
3954 }
3955 };
3956 if status_code != 1 {
3957 let error_code = fcall_field_str(arr, 1);
3958 return Err(ServerError::OperationFailed(format!(
3959 "ff_report_usage_and_check failed: {error_code}"
3960 )));
3961 }
3962 let sub_status = fcall_field_str(arr, 1);
3963 match sub_status.as_str() {
3964 "OK" => Ok(ReportUsageResult::Ok),
3965 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3966 "SOFT_BREACH" => {
3967 let dim = fcall_field_str(arr, 2);
3968 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3969 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3970 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3971 }
3972 "HARD_BREACH" => {
3973 let dim = fcall_field_str(arr, 2);
3974 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3975 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3976 Ok(ReportUsageResult::HardBreach {
3977 dimension: dim,
3978 current_usage: current,
3979 hard_limit: limit,
3980 })
3981 }
3982 _ => Err(ServerError::OperationFailed(format!(
3983 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3984 ))),
3985 }
3986}
3987
3988fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3989 let arr = match raw {
3990 Value::Array(arr) => arr,
3991 _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3992 };
3993 let status = match arr.first() {
3994 Some(Ok(Value::Int(n))) => *n,
3995 _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3996 };
3997 if status == 1 {
3998 let sub = fcall_field_str(arr, 1);
3999 if sub == "ALREADY_SATISFIED" {
4000 let reason = fcall_field_str(arr, 2);
4001 Ok(RevokeLeaseResult::AlreadySatisfied { reason })
4002 } else {
4003 let lid = fcall_field_str(arr, 2);
4004 let epoch = fcall_field_str(arr, 3);
4005 Ok(RevokeLeaseResult::Revoked {
4006 lease_id: lid,
4007 lease_epoch: epoch,
4008 })
4009 }
4010 } else {
4011 let error_code = fcall_field_str(arr, 1);
4012 Err(ServerError::OperationFailed(format!(
4013 "ff_revoke_lease failed: {error_code}"
4014 )))
4015 }
4016}
4017
4018fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
4024 if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
4025 return true;
4026 }
4027 e.detail()
4028 .map(|d| {
4029 d.contains("Function not loaded")
4030 || d.contains("No matching function")
4031 || d.contains("function not found")
4032 })
4033 .unwrap_or(false)
4034 || e.to_string().contains("Function not loaded")
4035}
4036
4037async fn fcall_with_reload_on_client(
4040 client: &Client,
4041 function: &str,
4042 keys: &[&str],
4043 args: &[&str],
4044) -> Result<Value, ServerError> {
4045 match client.fcall(function, keys, args).await {
4046 Ok(v) => Ok(v),
4047 Err(e) if is_function_not_loaded(&e) => {
4048 tracing::warn!(function, "Lua library not found on server, reloading");
4049 ff_script::loader::ensure_library(client)
4050 .await
4051 .map_err(ServerError::LibraryLoad)?;
4052 client
4053 .fcall(function, keys, args)
4054 .await
4055 .map_err(ServerError::from)
4056 }
4057 Err(e) => Err(ServerError::from(e)),
4058 }
4059}
4060
4061async fn build_cancel_execution_fcall(
4065 client: &Client,
4066 partition_config: &PartitionConfig,
4067 args: &CancelExecutionArgs,
4068) -> Result<(Vec<String>, Vec<String>), ServerError> {
4069 let partition = execution_partition(&args.execution_id, partition_config);
4070 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4071 let idx = IndexKeys::new(&partition);
4072
4073 let lane_str: Option<String> = client
4074 .hget(&ctx.core(), "lane_id")
4075 .await
4076 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
4077 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4078
4079 let dyn_fields: Vec<Option<String>> = client
4080 .cmd("HMGET")
4081 .arg(ctx.core())
4082 .arg("current_attempt_index")
4083 .arg("current_waitpoint_id")
4084 .arg("current_worker_instance_id")
4085 .execute()
4086 .await
4087 .map_err(|e| crate::server::backend_context(e, "HMGET cancel pre-read"))?;
4088
4089 let att_idx_val = dyn_fields.first()
4090 .and_then(|v| v.as_ref())
4091 .and_then(|s| s.parse::<u32>().ok())
4092 .unwrap_or(0);
4093 let att_idx = AttemptIndex::new(att_idx_val);
4094 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4095 let wp_id = if wp_id_str.is_empty() {
4096 WaitpointId::new()
4097 } else {
4098 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4099 };
4100 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4101 let wiid = WorkerInstanceId::new(&wiid_str);
4102
4103 let keys: Vec<String> = vec![
4104 ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
4126 let argv: Vec<String> = vec![
4127 args.execution_id.to_string(),
4128 args.reason.clone(),
4129 args.source.to_string(),
4130 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4131 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4132 ];
4133 Ok((keys, argv))
4134}
4135
4136const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4140
4141fn extract_backend_kind(e: &ServerError) -> Option<ff_core::BackendErrorKind> {
4152 e.backend_kind()
4153}
4154
4155async fn ack_cancel_member(
4172 client: &Client,
4173 pending_cancels_key: &str,
4174 cancel_backlog_key: &str,
4175 eid_str: &str,
4176 flow_id: &str,
4177) {
4178 let keys = [pending_cancels_key, cancel_backlog_key];
4179 let args_v = [eid_str, flow_id];
4180 let fut: Result<Value, _> =
4181 client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4182 if let Err(e) = fut {
4183 tracing::warn!(
4184 flow_id = %flow_id,
4185 execution_id = %eid_str,
4186 error = %e,
4187 "ff_ack_cancel_member failed; reconciler will drain on next pass"
4188 );
4189 }
4190}
4191
4192fn is_terminal_ack_error(err: &ServerError) -> bool {
4201 match err {
4202 ServerError::OperationFailed(msg) => {
4203 msg.contains("execution_not_active") || msg.contains("execution_not_found")
4204 }
4205 _ => false,
4206 }
4207}
4208
4209async fn cancel_member_execution(
4210 client: &Client,
4211 partition_config: &PartitionConfig,
4212 eid_str: &str,
4213 reason: &str,
4214 now: TimestampMs,
4215) -> Result<(), ServerError> {
4216 let execution_id = ExecutionId::parse(eid_str)
4217 .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4218 let args = CancelExecutionArgs {
4219 execution_id: execution_id.clone(),
4220 reason: reason.to_owned(),
4221 source: CancelSource::OperatorOverride,
4222 lease_id: None,
4223 lease_epoch: None,
4224 attempt_id: None,
4225 now,
4226 };
4227
4228 let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4229 for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4230 let is_last = attempt_idx + 1 == attempts;
4231 match try_cancel_member_once(client, partition_config, &args).await {
4232 Ok(()) => return Ok(()),
4233 Err(e) => {
4234 let retryable = extract_backend_kind(&e)
4238 .map(|k| k.is_retryable())
4239 .unwrap_or(false);
4240 if !retryable || is_last {
4241 return Err(e);
4242 }
4243 tracing::debug!(
4244 execution_id = %execution_id,
4245 attempt = attempt_idx + 1,
4246 delay_ms = *delay_ms,
4247 error = %e,
4248 "cancel_member_execution: transient error, retrying"
4249 );
4250 tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4251 }
4252 }
4253 }
4254 Err(ServerError::OperationFailed(format!(
4258 "cancel_member_execution: retries exhausted for {execution_id}"
4259 )))
4260}
4261
4262async fn try_cancel_member_once(
4265 client: &Client,
4266 partition_config: &PartitionConfig,
4267 args: &CancelExecutionArgs,
4268) -> Result<(), ServerError> {
4269 let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4270 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4271 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4272 let raw =
4273 fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4274 parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4275}
4276
4277fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4278 let arr = match raw {
4279 Value::Array(arr) => arr,
4280 _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4281 };
4282 let status = match arr.first() {
4283 Some(Ok(Value::Int(n))) => *n,
4284 _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4285 };
4286 if status == 1 {
4287 let next_str = fcall_field_str(arr, 2);
4288 let next_ms: i64 = next_str.parse().unwrap_or(0);
4289 Ok(ResetBudgetResult::Reset {
4290 next_reset_at: TimestampMs::from_millis(next_ms),
4291 })
4292 } else {
4293 let error_code = fcall_field_str(arr, 1);
4294 Err(ServerError::OperationFailed(format!(
4295 "ff_reset_budget failed: {error_code}"
4296 )))
4297 }
4298}
4299
4300#[cfg(test)]
4301mod tests {
4302 use super::*;
4303 use ferriskey::ErrorKind;
4304
4305 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4306 ferriskey::Error::from((kind, "synthetic"))
4307 }
4308
4309 #[test]
4312 fn create_budget_rejects_over_cap_dimension_count() {
4313 let n = MAX_BUDGET_DIMENSIONS + 1;
4314 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4315 let hard = vec![1u64; n];
4316 let soft = vec![0u64; n];
4317 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4318 match err {
4319 ServerError::InvalidInput(msg) => {
4320 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4321 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4322 assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4323 }
4324 other => panic!("expected InvalidInput, got {other:?}"),
4325 }
4326 }
4327
4328 #[test]
4329 fn create_budget_accepts_exactly_cap_dimensions() {
4330 let n = MAX_BUDGET_DIMENSIONS;
4331 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4332 let hard = vec![1u64; n];
4333 let soft = vec![0u64; n];
4334 assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4335 }
4336
4337 #[test]
4338 fn create_budget_rejects_hard_limit_length_mismatch() {
4339 let dims = vec!["a".to_string(), "b".to_string()];
4340 let hard = vec![1u64]; let soft = vec![0u64, 0u64];
4342 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4343 match err {
4344 ServerError::InvalidInput(msg) => {
4345 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4346 assert!(msg.contains("hard_limits=1"), "got: {msg}");
4347 assert!(msg.contains("dimensions=2"), "got: {msg}");
4348 }
4349 other => panic!("expected InvalidInput, got {other:?}"),
4350 }
4351 }
4352
4353 #[test]
4354 fn create_budget_rejects_soft_limit_length_mismatch() {
4355 let dims = vec!["a".to_string(), "b".to_string()];
4356 let hard = vec![1u64, 2u64];
4357 let soft = vec![0u64, 0u64, 0u64]; let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4359 match err {
4360 ServerError::InvalidInput(msg) => {
4361 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4362 assert!(msg.contains("soft_limits=3"), "got: {msg}");
4363 }
4364 other => panic!("expected InvalidInput, got {other:?}"),
4365 }
4366 }
4367
4368 #[test]
4369 fn report_usage_rejects_over_cap_dimension_count() {
4370 let n = MAX_BUDGET_DIMENSIONS + 1;
4371 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4372 let deltas = vec![1u64; n];
4373 let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4374 match err {
4375 ServerError::InvalidInput(msg) => {
4376 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4377 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4378 }
4379 other => panic!("expected InvalidInput, got {other:?}"),
4380 }
4381 }
4382
4383 #[test]
4384 fn report_usage_accepts_exactly_cap_dimensions() {
4385 let n = MAX_BUDGET_DIMENSIONS;
4386 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4387 let deltas = vec![1u64; n];
4388 assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4389 }
4390
4391 #[test]
4392 fn report_usage_rejects_delta_length_mismatch() {
4393 let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4394 let deltas = vec![1u64, 2u64]; let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4396 match err {
4397 ServerError::InvalidInput(msg) => {
4398 assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4399 assert!(msg.contains("dimensions=3"), "got: {msg}");
4400 assert!(msg.contains("deltas=2"), "got: {msg}");
4401 }
4402 other => panic!("expected InvalidInput, got {other:?}"),
4403 }
4404 }
4405
4406 #[test]
4407 fn report_usage_accepts_empty_dimensions() {
4408 assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4411 }
4412
4413 #[test]
4414 fn is_retryable_backend_variant_uses_kind_table() {
4415 assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
4417 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4418 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4419 assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4424 assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4425 assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
4426 assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
4427 assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4429
4430 assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4432 assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4433 assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4434 }
4435
4436 #[test]
4437 fn is_retryable_backend_context_uses_kind_table() {
4438 let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
4439 assert!(err.is_retryable());
4440
4441 let err =
4442 crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
4443 assert!(!err.is_retryable());
4444 }
4445
4446 #[test]
4447 fn is_retryable_library_load_delegates_to_inner_kind() {
4448 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4449 mk_fk_err(ErrorKind::IoError),
4450 ));
4451 assert!(err.is_retryable());
4452
4453 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4454 mk_fk_err(ErrorKind::AuthenticationFailed),
4455 ));
4456 assert!(!err.is_retryable());
4457
4458 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4459 expected: "1".into(),
4460 got: "2".into(),
4461 });
4462 assert!(!err.is_retryable());
4463 }
4464
4465 #[test]
4466 fn is_retryable_business_logic_variants_are_false() {
4467 assert!(!ServerError::NotFound("x".into()).is_retryable());
4468 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4469 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4470 assert!(!ServerError::Script("x".into()).is_retryable());
4471 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4472 }
4473
4474 #[test]
4475 fn backend_kind_delegates_through_library_load() {
4476 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4477 mk_fk_err(ErrorKind::ClusterDown),
4478 ));
4479 assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
4480
4481 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4482 expected: "1".into(),
4483 got: "2".into(),
4484 });
4485 assert_eq!(err.backend_kind(), None);
4486 }
4487
4488 #[test]
4491 fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4492 let info = "\
4496# Server\r\n\
4497redis_version:7.2.4\r\n\
4498valkey_version:9.0.3\r\n\
4499server_mode:cluster\r\n\
4500os:Linux\r\n";
4501 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4502 }
4503
4504 #[test]
4505 fn parse_valkey_version_real_valkey_8_cluster_body() {
4506 let info = "\
4510# Server\r\n\
4511redis_version:7.2.4\r\n\
4512server_name:valkey\r\n\
4513valkey_version:9.0.3\r\n\
4514valkey_release_stage:ga\r\n\
4515redis_git_sha1:00000000\r\n\
4516server_mode:cluster\r\n";
4517 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4518 }
4519
4520 #[test]
4521 fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4522 let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4525 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4526 }
4527
4528 #[test]
4529 fn parse_valkey_version_rejects_redis_backend() {
4530 let info = "\
4535# Server\r\n\
4536redis_version:7.4.0\r\n\
4537redis_mode:standalone\r\n\
4538os:Linux\r\n";
4539 let err = parse_valkey_version(info).unwrap_err();
4540 assert!(matches!(err, ServerError::OperationFailed(_)));
4541 let msg = err.to_string();
4542 assert!(
4543 msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4544 "expected Redis-rejection message, got: {msg}"
4545 );
4546 }
4547
4548 #[test]
4549 fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4550 let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4552 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4553 }
4554
4555 #[test]
4556 fn parse_valkey_version_errors_when_no_version_field() {
4557 let info = "# Server\r\nfoo:bar\r\n";
4558 let err = parse_valkey_version(info).unwrap_err();
4559 assert!(matches!(err, ServerError::OperationFailed(_)));
4560 assert!(
4561 err.to_string().contains("missing"),
4562 "expected 'missing' in message, got: {err}"
4563 );
4564 }
4565
4566 #[test]
4567 fn parse_valkey_version_errors_on_non_numeric_major() {
4568 let info = "valkey_version:invalid.x.y\n";
4569 let err = parse_valkey_version(info).unwrap_err();
4570 assert!(matches!(err, ServerError::OperationFailed(_)));
4571 assert!(err.to_string().contains("non-numeric major"));
4572 }
4573
4574 #[test]
4575 fn parse_valkey_version_errors_on_non_numeric_minor() {
4576 let info = "valkey_version:7.x.0\n";
4577 let err = parse_valkey_version(info).unwrap_err();
4578 assert!(matches!(err, ServerError::OperationFailed(_)));
4579 assert!(err.to_string().contains("non-numeric minor"));
4580 }
4581
4582 #[test]
4583 fn parse_valkey_version_errors_on_missing_minor() {
4584 let info = "valkey_version:7\n";
4587 let err = parse_valkey_version(info).unwrap_err();
4588 assert!(matches!(err, ServerError::OperationFailed(_)));
4589 assert!(err.to_string().contains("missing minor"));
4590 }
4591
4592 #[test]
4593 fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4594 let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4598 let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4599 let map = Value::Map(vec![
4600 (
4601 Value::SimpleString("127.0.0.1:7000".to_string()),
4602 Value::VerbatimString {
4603 format: ferriskey::value::VerbatimFormat::Text,
4604 text: body_a.to_string(),
4605 },
4606 ),
4607 (
4608 Value::SimpleString("127.0.0.1:7001".to_string()),
4609 Value::VerbatimString {
4610 format: ferriskey::value::VerbatimFormat::Text,
4611 text: body_b.to_string(),
4612 },
4613 ),
4614 ]);
4615 let bodies = extract_info_bodies(&map).unwrap();
4616 assert_eq!(bodies.len(), 2);
4617 assert_eq!(bodies[0], body_a);
4618 assert_eq!(bodies[1], body_b);
4619 }
4620
4621 #[test]
4622 fn extract_info_bodies_handles_simple_string() {
4623 let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4624 let v = Value::SimpleString(body_text.to_string());
4625 let bodies = extract_info_bodies(&v).unwrap();
4626 assert_eq!(bodies, vec![body_text.to_string()]);
4627 }
4628
4629 #[test]
4630 fn extract_info_bodies_rejects_empty_cluster_map() {
4631 let map = Value::Map(vec![]);
4632 let err = extract_info_bodies(&map).unwrap_err();
4633 assert!(matches!(err, ServerError::OperationFailed(_)));
4634 assert!(err.to_string().contains("empty map"));
4635 }
4636
4637 #[test]
4643 fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4644 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4648 let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4649 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4650 let map = Value::Map(vec![
4651 (
4652 Value::SimpleString("node1:6379".to_string()),
4653 Value::VerbatimString {
4654 format: ferriskey::value::VerbatimFormat::Text,
4655 text: body_node1.to_string(),
4656 },
4657 ),
4658 (
4659 Value::SimpleString("node2:6379".to_string()),
4660 Value::VerbatimString {
4661 format: ferriskey::value::VerbatimFormat::Text,
4662 text: body_node2.to_string(),
4663 },
4664 ),
4665 (
4666 Value::SimpleString("node3:6379".to_string()),
4667 Value::VerbatimString {
4668 format: ferriskey::value::VerbatimFormat::Text,
4669 text: body_node3.to_string(),
4670 },
4671 ),
4672 ]);
4673
4674 let bodies = extract_info_bodies(&map).unwrap();
4675 let min = bodies
4676 .iter()
4677 .map(|b| parse_valkey_version(b).unwrap())
4678 .min()
4679 .unwrap();
4680
4681 assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4682 assert!(
4683 min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4684 "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4685 );
4686 }
4687
4688 #[test]
4692 fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4693 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4694 let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4695 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4696 let map = Value::Map(vec![
4697 (
4698 Value::SimpleString("node1:6379".to_string()),
4699 Value::VerbatimString {
4700 format: ferriskey::value::VerbatimFormat::Text,
4701 text: body_node1.to_string(),
4702 },
4703 ),
4704 (
4705 Value::SimpleString("node2:6379".to_string()),
4706 Value::VerbatimString {
4707 format: ferriskey::value::VerbatimFormat::Text,
4708 text: body_node2.to_string(),
4709 },
4710 ),
4711 (
4712 Value::SimpleString("node3:6379".to_string()),
4713 Value::VerbatimString {
4714 format: ferriskey::value::VerbatimFormat::Text,
4715 text: body_node3.to_string(),
4716 },
4717 ),
4718 ]);
4719
4720 let bodies = extract_info_bodies(&map).unwrap();
4721 let min = bodies
4722 .iter()
4723 .map(|b| parse_valkey_version(b).unwrap())
4724 .min()
4725 .unwrap();
4726
4727 assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4728 assert!(
4729 min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4730 "all-above-floor cluster must pass the gate"
4731 );
4732 }
4733
4734 #[test]
4735 fn valkey_version_too_low_is_not_retryable() {
4736 let err = ServerError::ValkeyVersionTooLow {
4737 detected: "7.0".into(),
4738 required: "7.2".into(),
4739 };
4740 assert!(!err.is_retryable());
4741 assert_eq!(err.backend_kind(), None);
4742 }
4743
4744 #[test]
4745 fn valkey_version_too_low_error_message_includes_both_versions() {
4746 let err = ServerError::ValkeyVersionTooLow {
4747 detected: "7.0".into(),
4748 required: "7.2".into(),
4749 };
4750 let msg = err.to_string();
4751 assert!(msg.contains("7.0"), "detected version in message: {msg}");
4752 assert!(msg.contains("7.2"), "required version in message: {msg}");
4753 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4754 }
4755}