1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
15 ListExecutionsPage, PendingWaitpointInfo, ReplayExecutionResult,
16 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17 RevokeLeaseResult,
18 RotateWaitpointHmacSecretArgs,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22 self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23 IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26 budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27 PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53fn validate_create_budget_dimensions(
63 dimensions: &[String],
64 hard_limits: &[u64],
65 soft_limits: &[u64],
66) -> Result<(), ServerError> {
67 let dim_count = dimensions.len();
68 if dim_count > MAX_BUDGET_DIMENSIONS {
69 return Err(ServerError::InvalidInput(format!(
70 "too_many_dimensions: limit={}, got={}",
71 MAX_BUDGET_DIMENSIONS, dim_count
72 )));
73 }
74 if hard_limits.len() != dim_count {
75 return Err(ServerError::InvalidInput(format!(
76 "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77 dim_count,
78 hard_limits.len()
79 )));
80 }
81 if soft_limits.len() != dim_count {
82 return Err(ServerError::InvalidInput(format!(
83 "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84 dim_count,
85 soft_limits.len()
86 )));
87 }
88 Ok(())
89}
90
91fn validate_report_usage_dimensions(
97 dimensions: &[String],
98 deltas: &[u64],
99) -> Result<(), ServerError> {
100 let dim_count = dimensions.len();
101 if dim_count > MAX_BUDGET_DIMENSIONS {
102 return Err(ServerError::InvalidInput(format!(
103 "too_many_dimensions: limit={}, got={}",
104 MAX_BUDGET_DIMENSIONS, dim_count
105 )));
106 }
107 if deltas.len() != dim_count {
108 return Err(ServerError::InvalidInput(format!(
109 "dimension_delta_array_mismatch: dimensions={} deltas={}",
110 dim_count,
111 deltas.len()
112 )));
113 }
114 Ok(())
115}
116
117pub struct Server {
122 client: Client,
123 tail_client: Client,
144 stream_semaphore: Arc<tokio::sync::Semaphore>,
157 xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194 engine: Engine,
195 config: ServerConfig,
196 scheduler: Arc<ff_scheduler::Scheduler>,
201 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204 metrics: Arc<ff_observability::Metrics>,
212}
213
214#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217 #[error("backend: {0}")]
224 Backend(#[from] ff_core::BackendError),
225 #[error("backend ({context}): {source}")]
228 BackendContext {
229 #[source]
230 source: ff_core::BackendError,
231 context: String,
232 },
233 #[error("config: {0}")]
234 Config(#[from] crate::config::ConfigError),
235 #[error("library load: {0}")]
236 LibraryLoad(#[from] ff_script::loader::LoadError),
237 #[error("partition mismatch: {0}")]
238 PartitionMismatch(String),
239 #[error("not found: {0}")]
240 NotFound(String),
241 #[error("invalid input: {0}")]
242 InvalidInput(String),
243 #[error("operation failed: {0}")]
244 OperationFailed(String),
245 #[error("script: {0}")]
246 Script(String),
247 #[error("too many concurrent {0} calls (max: {1})")]
256 ConcurrencyLimitExceeded(&'static str, u32),
257 #[error(
262 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
263 )]
264 ValkeyVersionTooLow {
265 detected: String,
266 required: String,
267 },
268}
269
270impl From<ferriskey::Error> for ServerError {
275 fn from(err: ferriskey::Error) -> Self {
276 Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
277 }
278}
279
280pub(crate) fn backend_context(
283 err: ferriskey::Error,
284 context: impl Into<String>,
285) -> ServerError {
286 ServerError::BackendContext {
287 source: ff_backend_valkey::backend_error_from_ferriskey(&err),
288 context: context.into(),
289 }
290}
291
292impl ServerError {
293 pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
301 match self {
302 Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
303 Self::LibraryLoad(e) => e
304 .valkey_kind()
305 .map(ff_backend_valkey::classify_ferriskey_kind),
306 _ => None,
307 }
308 }
309
310 pub fn is_retryable(&self) -> bool {
317 match self {
318 Self::Backend(be) | Self::BackendContext { source: be, .. } => {
319 be.kind().is_retryable()
320 }
321 Self::LibraryLoad(load_err) => load_err
322 .valkey_kind()
323 .map(is_retryable_kind)
324 .unwrap_or(false),
325 Self::Config(_)
326 | Self::PartitionMismatch(_)
327 | Self::NotFound(_)
328 | Self::InvalidInput(_)
329 | Self::OperationFailed(_)
330 | Self::Script(_) => false,
331 Self::ConcurrencyLimitExceeded(_, _) => true,
335 Self::ValkeyVersionTooLow { .. } => false,
337 }
338 }
339}
340
341impl Server {
342 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
350 Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
351 }
352
353 pub async fn start_with_metrics(
363 config: ServerConfig,
364 metrics: Arc<ff_observability::Metrics>,
365 ) -> Result<Self, ServerError> {
366 tracing::info!(
368 host = %config.host, port = config.port,
369 tls = config.tls, cluster = config.cluster,
370 "connecting to Valkey"
371 );
372 let mut builder = ClientBuilder::new()
373 .host(&config.host, config.port)
374 .connect_timeout(Duration::from_secs(10))
375 .request_timeout(Duration::from_millis(5000));
376 if config.tls {
377 builder = builder.tls();
378 }
379 if config.cluster {
380 builder = builder.cluster();
381 }
382 let client = builder
383 .build()
384 .await
385 .map_err(|e| crate::server::backend_context(e, "connect"))?;
386
387 let pong: String = client
389 .cmd("PING")
390 .execute()
391 .await
392 .map_err(|e| crate::server::backend_context(e, "PING"))?;
393 if pong != "PONG" {
394 return Err(ServerError::OperationFailed(format!(
395 "unexpected PING response: {pong}"
396 )));
397 }
398 tracing::info!("Valkey connection established");
399
400 verify_valkey_version(&client).await?;
405
406 validate_or_create_partition_config(&client, &config.partition_config).await?;
408
409 initialize_waitpoint_hmac_secret(
414 &client,
415 &config.partition_config,
416 &config.waitpoint_hmac_secret,
417 )
418 .await?;
419
420 if !config.skip_library_load {
422 tracing::info!("loading flowfabric Lua library");
423 ff_script::loader::ensure_library(&client)
424 .await
425 .map_err(ServerError::LibraryLoad)?;
426 } else {
427 tracing::info!("skipping library load (skip_library_load=true)");
428 }
429
430 if !config.lanes.is_empty() {
444 let lane_strs: Vec<&str> = config.lanes.iter().map(|l| l.as_str()).collect();
445 let _: i64 = client
446 .cmd("SADD")
447 .arg(ff_core::keys::lanes_index_key().as_str())
448 .arg(lane_strs.as_slice())
449 .execute()
450 .await
451 .map_err(|e| crate::server::backend_context(e, "SADD ff:idx:lanes (seed)"))?;
452 tracing::info!(
453 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
454 "seeded lanes index (ff:idx:lanes)"
455 );
456 }
457
458 let engine_cfg = ff_engine::EngineConfig {
461 partition_config: config.partition_config,
462 lanes: config.lanes.clone(),
463 lease_expiry_interval: config.engine_config.lease_expiry_interval,
464 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
465 index_reconciler_interval: config.engine_config.index_reconciler_interval,
466 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
467 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
468 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
469 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
470 budget_reset_interval: config.engine_config.budget_reset_interval,
471 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
472 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
473 unblock_interval: config.engine_config.unblock_interval,
474 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
475 flow_projector_interval: config.engine_config.flow_projector_interval,
476 execution_deadline_interval: config.engine_config.execution_deadline_interval,
477 cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
478 edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
479 edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
480 scanner_filter: config.engine_config.scanner_filter.clone(),
481 };
482 let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
493 config.host.clone(),
494 config.port,
495 );
496 valkey_conn.tls = config.tls;
497 valkey_conn.cluster = config.cluster;
498 let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
499 client.clone(),
500 config.partition_config,
501 valkey_conn,
502 );
503 let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
504 .await
505 .map_err(|e| ServerError::OperationFailed(format!(
506 "subscribe_completions: {e}"
507 )))?;
508
509 let engine = Engine::start_with_completions(
510 engine_cfg,
511 client.clone(),
512 metrics.clone(),
513 completion_stream,
514 );
515
516 tracing::info!("opening dedicated tail connection");
522 let mut tail_builder = ClientBuilder::new()
523 .host(&config.host, config.port)
524 .connect_timeout(Duration::from_secs(10))
525 .request_timeout(Duration::from_millis(5000));
529 if config.tls {
530 tail_builder = tail_builder.tls();
531 }
532 if config.cluster {
533 tail_builder = tail_builder.cluster();
534 }
535 let tail_client = tail_builder
536 .build()
537 .await
538 .map_err(|e| crate::server::backend_context(e, "connect (tail)"))?;
539 let tail_pong: String = tail_client
540 .cmd("PING")
541 .execute()
542 .await
543 .map_err(|e| crate::server::backend_context(e, "PING (tail)"))?;
544 if tail_pong != "PONG" {
545 return Err(ServerError::OperationFailed(format!(
546 "tail client unexpected PING response: {tail_pong}"
547 )));
548 }
549
550 let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
551 config.max_concurrent_stream_ops as usize,
552 ));
553 let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
554 tracing::info!(
555 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
556 "stream-op client ready (read + tail share the semaphore; \
557 tails additionally serialize via xread_block_lock)"
558 );
559
560 if config.api_token.is_none() {
568 tracing::warn!(
569 listen_addr = %config.listen_addr,
570 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
571 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
572 FF_API_TOKEN for any deployment reachable from untrusted \
573 networks."
574 );
575 tracing::warn!(
581 listen_addr = %config.listen_addr,
582 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
583 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
584 and GET /v1/executions/{{id}}/result returns raw completion payload \
585 bytes (may contain PII). Both are UNAUTHENTICATED in this \
586 configuration."
587 );
588 }
589
590 tracing::info!(
595 flow_partitions = config.partition_config.num_flow_partitions,
596 budget_partitions = config.partition_config.num_budget_partitions,
597 quota_partitions = config.partition_config.num_quota_partitions,
598 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
599 listen_addr = %config.listen_addr,
600 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
601 config.partition_config.num_flow_partitions,
602 config.partition_config.num_budget_partitions,
603 config.partition_config.num_quota_partitions,
604 );
605
606 let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
607 client.clone(),
608 config.partition_config,
609 metrics.clone(),
610 ));
611
612 Ok(Self {
613 client,
614 tail_client,
615 stream_semaphore,
616 xread_block_lock,
617 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
622 engine,
623 config,
624 scheduler,
625 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
626 metrics,
627 })
628 }
629
630 pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
632 &self.metrics
633 }
634
635 pub fn client(&self) -> &Client {
637 &self.client
638 }
639
640 async fn fcall_with_reload(
647 &self,
648 function: &str,
649 keys: &[&str],
650 args: &[&str],
651 ) -> Result<Value, ServerError> {
652 fcall_with_reload_on_client(&self.client, function, keys, args).await
653 }
654
655 pub fn config(&self) -> &ServerConfig {
657 &self.config
658 }
659
660 pub fn partition_config(&self) -> &PartitionConfig {
662 &self.config.partition_config
663 }
664
665 pub async fn create_execution(
671 &self,
672 args: &CreateExecutionArgs,
673 ) -> Result<CreateExecutionResult, ServerError> {
674 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
675 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
676 let idx = IndexKeys::new(&partition);
677
678 let lane = &args.lane_id;
679 let tag = partition.hash_tag();
680 let idem_key = match &args.idempotency_key {
681 Some(k) if !k.is_empty() => {
682 keys::idempotency_key(&tag, args.namespace.as_str(), k)
683 }
684 _ => ctx.noop(),
685 };
686
687 let delay_str = args
688 .delay_until
689 .map(|d| d.0.to_string())
690 .unwrap_or_default();
691 let is_delayed = !delay_str.is_empty();
692
693 let scheduling_zset = if is_delayed {
698 idx.lane_delayed(lane)
699 } else {
700 idx.lane_eligible(lane)
701 };
702
703 let fcall_keys: Vec<String> = vec![
704 ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
713
714 let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
715
716 let fcall_args: Vec<String> = vec![
722 args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
729 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
730 .unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
734 .map(|_| "86400000".to_string())
735 .unwrap_or_default(), tags_json, args.execution_deadline_at
738 .map(|d| d.to_string())
739 .unwrap_or_default(), args.partition_id.to_string(), ];
742
743 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
744 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
745
746 let raw: Value = self
747 .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
748 .await?;
749
750 parse_create_result(&raw, &args.execution_id)
751 }
752
753 pub async fn cancel_execution(
755 &self,
756 args: &CancelExecutionArgs,
757 ) -> Result<CancelExecutionResult, ServerError> {
758 let raw = self
759 .fcall_cancel_execution_with_reload(args)
760 .await?;
761 parse_cancel_result(&raw, &args.execution_id)
762 }
763
764 async fn fcall_cancel_execution_with_reload(
768 &self,
769 args: &CancelExecutionArgs,
770 ) -> Result<Value, ServerError> {
771 let (keys, argv) = build_cancel_execution_fcall(
772 &self.client,
773 &self.config.partition_config,
774 args,
775 )
776 .await?;
777 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
778 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
779 self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
780 }
781
782 pub async fn get_execution_state(
787 &self,
788 execution_id: &ExecutionId,
789 ) -> Result<PublicState, ServerError> {
790 let partition = execution_partition(execution_id, &self.config.partition_config);
791 let ctx = ExecKeyContext::new(&partition, execution_id);
792
793 let state_str: Option<String> = self
794 .client
795 .hget(&ctx.core(), "public_state")
796 .await
797 .map_err(|e| crate::server::backend_context(e, "HGET public_state"))?;
798
799 match state_str {
800 Some(s) => {
801 let quoted = format!("\"{s}\"");
802 serde_json::from_str("ed).map_err(|e| {
803 ServerError::Script(format!(
804 "invalid public_state '{s}' for {execution_id}: {e}"
805 ))
806 })
807 }
808 None => Err(ServerError::NotFound(format!(
809 "execution not found: {execution_id}"
810 ))),
811 }
812 }
813
814 pub async fn get_execution_result(
841 &self,
842 execution_id: &ExecutionId,
843 ) -> Result<Option<Vec<u8>>, ServerError> {
844 let partition = execution_partition(execution_id, &self.config.partition_config);
845 let ctx = ExecKeyContext::new(&partition, execution_id);
846
847 let payload: Option<Vec<u8>> = self
857 .client
858 .cmd("GET")
859 .arg(ctx.result())
860 .execute()
861 .await
862 .map_err(|e| crate::server::backend_context(e, "GET exec result"))?;
863 Ok(payload)
864 }
865
866 pub async fn list_pending_waitpoints(
912 &self,
913 execution_id: &ExecutionId,
914 ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
915 let partition = execution_partition(execution_id, &self.config.partition_config);
916 let ctx = ExecKeyContext::new(&partition, execution_id);
917
918 let core_exists: bool = self
919 .client
920 .cmd("EXISTS")
921 .arg(ctx.core())
922 .execute()
923 .await
924 .map_err(|e| crate::server::backend_context(e, "EXISTS exec_core (pending waitpoints)"))?;
925 if !core_exists {
926 return Err(ServerError::NotFound(format!(
927 "execution not found: {execution_id}"
928 )));
929 }
930
931 const WAITPOINTS_SSCAN_COUNT: usize = 100;
939 let waitpoints_key = ctx.waitpoints();
940 let mut wp_ids_raw: Vec<String> = Vec::new();
941 let mut cursor: String = "0".to_owned();
942 loop {
943 let reply: (String, Vec<String>) = self
944 .client
945 .cmd("SSCAN")
946 .arg(&waitpoints_key)
947 .arg(&cursor)
948 .arg("COUNT")
949 .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
950 .execute()
951 .await
952 .map_err(|e| crate::server::backend_context(e, "SSCAN waitpoints"))?;
953 cursor = reply.0;
954 wp_ids_raw.extend(reply.1);
955 if cursor == "0" {
956 break;
957 }
958 }
959
960 wp_ids_raw.sort_unstable();
968 wp_ids_raw.dedup();
969
970 if wp_ids_raw.is_empty() {
971 return Ok(Vec::new());
972 }
973
974 let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
978 for raw in &wp_ids_raw {
979 match WaitpointId::parse(raw) {
980 Ok(id) => wp_ids.push(id),
981 Err(e) => tracing::warn!(
982 raw_id = %raw,
983 error = %e,
984 execution_id = %execution_id,
985 "list_pending_waitpoints: skipping unparseable waitpoint_id"
986 ),
987 }
988 }
989 if wp_ids.is_empty() {
990 return Ok(Vec::new());
991 }
992
993 const WP_FIELDS: [&str; 6] = [
997 "state",
998 "waitpoint_key",
999 "waitpoint_token",
1000 "created_at",
1001 "activated_at",
1002 "expires_at",
1003 ];
1004
1005 let mut pass1 = self.client.pipeline();
1010 let mut wp_slots = Vec::with_capacity(wp_ids.len());
1011 let mut cond_slots = Vec::with_capacity(wp_ids.len());
1012 for wp_id in &wp_ids {
1013 let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
1014 cmd = cmd.arg(ctx.waitpoint(wp_id));
1015 for f in WP_FIELDS {
1016 cmd = cmd.arg(f);
1017 }
1018 wp_slots.push(cmd.finish());
1019
1020 cond_slots.push(
1021 pass1
1022 .cmd::<Option<String>>("HGET")
1023 .arg(ctx.waitpoint_condition(wp_id))
1024 .arg("total_matchers")
1025 .finish(),
1026 );
1027 }
1028 pass1
1029 .execute()
1030 .await
1031 .map_err(|e| crate::server::backend_context(e, "pipeline HMGET waitpoints + HGET total_matchers"))?;
1032
1033 struct Kept {
1039 wp_id: WaitpointId,
1040 wp_fields: Vec<Option<String>>,
1041 total_matchers: usize,
1042 }
1043 let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
1044 for ((wp_id, wp_slot), cond_slot) in wp_ids
1045 .iter()
1046 .zip(wp_slots)
1047 .zip(cond_slots)
1048 {
1049 let wp_fields: Vec<Option<String>> =
1050 wp_slot.value().map_err(|e| crate::server::backend_context(e, format!("pipeline slot HMGET waitpoint {wp_id}")))?;
1051
1052 if wp_fields.iter().all(Option::is_none) {
1055 let _ = cond_slot.value();
1057 continue;
1058 }
1059 let state_ref = wp_fields
1060 .first()
1061 .and_then(|v| v.as_deref())
1062 .unwrap_or("");
1063 if state_ref != "pending" && state_ref != "active" {
1064 let _ = cond_slot.value();
1065 continue;
1066 }
1067 let token_ref = wp_fields
1068 .get(2)
1069 .and_then(|v| v.as_deref())
1070 .unwrap_or("");
1071 if token_ref.is_empty() {
1072 let _ = cond_slot.value();
1073 tracing::warn!(
1074 waitpoint_id = %wp_id,
1075 execution_id = %execution_id,
1076 waitpoint_hash_key = %ctx.waitpoint(wp_id),
1077 state = %state_ref,
1078 "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1079 field is empty — likely storage corruption (half-populated write, \
1080 operator edit, or interrupted script). Skipping this entry in the \
1081 response. HGETALL the waitpoint_hash_key to inspect."
1082 );
1083 continue;
1084 }
1085
1086 let total_matchers = cond_slot
1087 .value()
1088 .map_err(|e| crate::server::backend_context(e, format!("pipeline slot HGET total_matchers {wp_id}")))?
1089 .and_then(|s| s.parse::<usize>().ok())
1090 .unwrap_or(0);
1091
1092 kept.push(Kept {
1093 wp_id: wp_id.clone(),
1094 wp_fields,
1095 total_matchers,
1096 });
1097 }
1098
1099 if kept.is_empty() {
1100 return Ok(Vec::new());
1101 }
1102
1103 let mut pass2 = self.client.pipeline();
1108 let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1109 let mut pass2_needed = false;
1110 for k in &kept {
1111 if k.total_matchers == 0 {
1112 matcher_slots.push(None);
1113 continue;
1114 }
1115 pass2_needed = true;
1116 let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1117 cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1118 for i in 0..k.total_matchers {
1119 cmd = cmd.arg(format!("matcher:{i}:name"));
1120 }
1121 matcher_slots.push(Some(cmd.finish()));
1122 }
1123 if pass2_needed {
1124 pass2.execute().await.map_err(|e| crate::server::backend_context(e, "pipeline HMGET wp_condition matchers"))?;
1125 }
1126
1127 let parse_ts = |raw: &str| -> Option<TimestampMs> {
1128 if raw.is_empty() {
1129 None
1130 } else {
1131 raw.parse::<i64>().ok().map(TimestampMs)
1132 }
1133 };
1134
1135 let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1136 for (k, slot) in kept.into_iter().zip(matcher_slots) {
1137 let get = |i: usize| -> &str {
1138 k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1139 };
1140
1141 let required_signal_names: Vec<String> = match slot {
1144 None => Vec::new(),
1145 Some(s) => {
1146 let vals: Vec<Option<String>> =
1147 s.value().map_err(|e| crate::server::backend_context(e, format!(
1148 "pipeline slot HMGET wp_condition matchers {}",
1149 k.wp_id
1150 )))?;
1151 vals.into_iter()
1152 .flatten()
1153 .filter(|name| !name.is_empty())
1154 .collect()
1155 }
1156 };
1157
1158 out.push(PendingWaitpointInfo {
1159 waitpoint_id: k.wp_id,
1160 waitpoint_key: get(1).to_owned(),
1161 state: get(0).to_owned(),
1162 waitpoint_token: WaitpointToken(get(2).to_owned()),
1163 required_signal_names,
1164 created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1165 activated_at: parse_ts(get(4)),
1166 expires_at: parse_ts(get(5)),
1167 });
1168 }
1169
1170 Ok(out)
1171 }
1172
1173 pub async fn create_budget(
1177 &self,
1178 args: &CreateBudgetArgs,
1179 ) -> Result<CreateBudgetResult, ServerError> {
1180 validate_create_budget_dimensions(
1182 &args.dimensions,
1183 &args.hard_limits,
1184 &args.soft_limits,
1185 )?;
1186 let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1187 let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1188 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1189 let policies_index = keys::budget_policies_index(bctx.hash_tag());
1190
1191 let fcall_keys: Vec<String> = vec![
1194 bctx.definition(),
1195 bctx.limits(),
1196 bctx.usage(),
1197 resets_key,
1198 policies_index,
1199 ];
1200
1201 let dim_count = args.dimensions.len();
1205 let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1206 fcall_args.push(args.budget_id.to_string());
1207 fcall_args.push(args.scope_type.clone());
1208 fcall_args.push(args.scope_id.clone());
1209 fcall_args.push(args.enforcement_mode.clone());
1210 fcall_args.push(args.on_hard_limit.clone());
1211 fcall_args.push(args.on_soft_limit.clone());
1212 fcall_args.push(args.reset_interval_ms.to_string());
1213 fcall_args.push(args.now.to_string());
1214 fcall_args.push(dim_count.to_string());
1215 for dim in &args.dimensions {
1216 fcall_args.push(dim.clone());
1217 }
1218 for hard in &args.hard_limits {
1219 fcall_args.push(hard.to_string());
1220 }
1221 for soft in &args.soft_limits {
1222 fcall_args.push(soft.to_string());
1223 }
1224
1225 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1226 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1227
1228 let raw: Value = self
1229 .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1230 .await?;
1231
1232 parse_budget_create_result(&raw, &args.budget_id)
1233 }
1234
1235 pub async fn create_quota_policy(
1237 &self,
1238 args: &CreateQuotaPolicyArgs,
1239 ) -> Result<CreateQuotaPolicyResult, ServerError> {
1240 let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1241 let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1242
1243 let fcall_keys: Vec<String> = vec![
1246 qctx.definition(),
1247 qctx.window("requests_per_window"),
1248 qctx.concurrency(),
1249 qctx.admitted_set(),
1250 keys::quota_policies_index(qctx.hash_tag()),
1251 ];
1252
1253 let fcall_args: Vec<String> = vec![
1256 args.quota_policy_id.to_string(),
1257 args.window_seconds.to_string(),
1258 args.max_requests_per_window.to_string(),
1259 args.max_concurrent.to_string(),
1260 args.now.to_string(),
1261 ];
1262
1263 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1264 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1265
1266 let raw: Value = self
1267 .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1268 .await?;
1269
1270 parse_quota_create_result(&raw, &args.quota_policy_id)
1271 }
1272
1273 pub async fn get_budget_status(
1275 &self,
1276 budget_id: &BudgetId,
1277 ) -> Result<BudgetStatus, ServerError> {
1278 let partition = budget_partition(budget_id, &self.config.partition_config);
1279 let bctx = BudgetKeyContext::new(&partition, budget_id);
1280
1281 let def: HashMap<String, String> = self
1283 .client
1284 .hgetall(&bctx.definition())
1285 .await
1286 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_def"))?;
1287
1288 if def.is_empty() {
1289 return Err(ServerError::NotFound(format!(
1290 "budget not found: {budget_id}"
1291 )));
1292 }
1293
1294 let usage_raw: HashMap<String, String> = self
1296 .client
1297 .hgetall(&bctx.usage())
1298 .await
1299 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_usage"))?;
1300 let usage: HashMap<String, u64> = usage_raw
1301 .into_iter()
1302 .filter(|(k, _)| k != "_init")
1303 .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1304 .collect();
1305
1306 let limits_raw: HashMap<String, String> = self
1308 .client
1309 .hgetall(&bctx.limits())
1310 .await
1311 .map_err(|e| crate::server::backend_context(e, "HGETALL budget_limits"))?;
1312 let mut hard_limits = HashMap::new();
1313 let mut soft_limits = HashMap::new();
1314 for (k, v) in &limits_raw {
1315 if let Some(dim) = k.strip_prefix("hard:") {
1316 hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1317 } else if let Some(dim) = k.strip_prefix("soft:") {
1318 soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1319 }
1320 }
1321
1322 let non_empty = |s: Option<&String>| -> Option<String> {
1323 s.filter(|v| !v.is_empty()).cloned()
1324 };
1325
1326 Ok(BudgetStatus {
1327 budget_id: budget_id.to_string(),
1328 scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1329 scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1330 enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1331 usage,
1332 hard_limits,
1333 soft_limits,
1334 breach_count: def
1335 .get("breach_count")
1336 .and_then(|v| v.parse().ok())
1337 .unwrap_or(0),
1338 soft_breach_count: def
1339 .get("soft_breach_count")
1340 .and_then(|v| v.parse().ok())
1341 .unwrap_or(0),
1342 last_breach_at: non_empty(def.get("last_breach_at")),
1343 last_breach_dim: non_empty(def.get("last_breach_dim")),
1344 next_reset_at: non_empty(def.get("next_reset_at")),
1345 created_at: non_empty(def.get("created_at")),
1346 })
1347 }
1348
1349 pub async fn report_usage(
1351 &self,
1352 budget_id: &BudgetId,
1353 args: &ReportUsageArgs,
1354 ) -> Result<ReportUsageResult, ServerError> {
1355 validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1357 let partition = budget_partition(budget_id, &self.config.partition_config);
1358 let bctx = BudgetKeyContext::new(&partition, budget_id);
1359
1360 let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1362
1363 let dim_count = args.dimensions.len();
1365 let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1366 fcall_args.push(dim_count.to_string());
1367 for dim in &args.dimensions {
1368 fcall_args.push(dim.clone());
1369 }
1370 for delta in &args.deltas {
1371 fcall_args.push(delta.to_string());
1372 }
1373 fcall_args.push(args.now.to_string());
1374 let dedup_key_val = args
1375 .dedup_key
1376 .as_ref()
1377 .filter(|k| !k.is_empty())
1378 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1379 .unwrap_or_default();
1380 fcall_args.push(dedup_key_val);
1381
1382 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1383 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1384
1385 let raw: Value = self
1386 .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1387 .await?;
1388
1389 parse_report_usage_result(&raw)
1390 }
1391
1392 pub async fn reset_budget(
1394 &self,
1395 budget_id: &BudgetId,
1396 ) -> Result<ResetBudgetResult, ServerError> {
1397 let partition = budget_partition(budget_id, &self.config.partition_config);
1398 let bctx = BudgetKeyContext::new(&partition, budget_id);
1399 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1400
1401 let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1403
1404 let now = TimestampMs::now();
1406 let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1407
1408 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1409 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1410
1411 let raw: Value = self
1412 .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1413 .await?;
1414
1415 parse_reset_budget_result(&raw)
1416 }
1417
1418 pub async fn create_flow(
1422 &self,
1423 args: &CreateFlowArgs,
1424 ) -> Result<CreateFlowResult, ServerError> {
1425 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1426 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1427 let fidx = FlowIndexKeys::new(&partition);
1428
1429 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1431
1432 let fcall_args: Vec<String> = vec![
1434 args.flow_id.to_string(),
1435 args.flow_kind.clone(),
1436 args.namespace.to_string(),
1437 args.now.to_string(),
1438 ];
1439
1440 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1441 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1442
1443 let raw: Value = self
1444 .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1445 .await?;
1446
1447 parse_create_flow_result(&raw, &args.flow_id)
1448 }
1449
1450 pub async fn add_execution_to_flow(
1488 &self,
1489 args: &AddExecutionToFlowArgs,
1490 ) -> Result<AddExecutionToFlowResult, ServerError> {
1491 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1492 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1493 let fidx = FlowIndexKeys::new(&partition);
1494
1495 let exec_partition =
1499 execution_partition(&args.execution_id, &self.config.partition_config);
1500 let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1501
1502 if exec_partition.index != partition.index {
1511 return Err(ServerError::PartitionMismatch(format!(
1512 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1513 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1514 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1515 exec_p = exec_partition.index,
1516 flow_p = partition.index,
1517 )));
1518 }
1519
1520 let fcall_keys: Vec<String> = vec![
1522 fctx.core(),
1523 fctx.members(),
1524 fidx.flow_index(),
1525 ectx.core(),
1526 ];
1527
1528 let fcall_args: Vec<String> = vec![
1530 args.flow_id.to_string(),
1531 args.execution_id.to_string(),
1532 args.now.to_string(),
1533 ];
1534
1535 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1536 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1537
1538 let raw: Value = self
1539 .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1540 .await?;
1541
1542 parse_add_execution_to_flow_result(&raw)
1543 }
1544
1545 pub async fn cancel_flow(
1587 &self,
1588 args: &CancelFlowArgs,
1589 ) -> Result<CancelFlowResult, ServerError> {
1590 self.cancel_flow_inner(args, false).await
1591 }
1592
1593 pub async fn cancel_flow_wait(
1597 &self,
1598 args: &CancelFlowArgs,
1599 ) -> Result<CancelFlowResult, ServerError> {
1600 self.cancel_flow_inner(args, true).await
1601 }
1602
1603 async fn cancel_flow_inner(
1604 &self,
1605 args: &CancelFlowArgs,
1606 wait: bool,
1607 ) -> Result<CancelFlowResult, ServerError> {
1608 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1609 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1610 let fidx = FlowIndexKeys::new(&partition);
1611
1612 const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1617
1618 let fcall_keys: Vec<String> = vec![
1620 fctx.core(),
1621 fctx.members(),
1622 fidx.flow_index(),
1623 fctx.pending_cancels(),
1624 fidx.cancel_backlog(),
1625 ];
1626
1627 let fcall_args: Vec<String> = vec![
1629 args.flow_id.to_string(),
1630 args.reason.clone(),
1631 args.cancellation_policy.clone(),
1632 args.now.to_string(),
1633 CANCEL_RECONCILER_GRACE_MS.to_string(),
1634 ];
1635
1636 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1637 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1638
1639 let raw: Value = self
1640 .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1641 .await?;
1642
1643 let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1644 ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1645 (policy, member_execution_ids)
1646 }
1647 ParsedCancelFlow::AlreadyTerminal => {
1653 let flow_meta: Vec<Option<String>> = self
1654 .client
1655 .cmd("HMGET")
1656 .arg(fctx.core())
1657 .arg("cancellation_policy")
1658 .arg("cancel_reason")
1659 .execute()
1660 .await
1661 .map_err(|e| crate::server::backend_context(e, "HMGET flow_core cancellation_policy,cancel_reason"))?;
1662 let stored_policy = flow_meta
1663 .first()
1664 .and_then(|v| v.as_ref())
1665 .filter(|s| !s.is_empty())
1666 .cloned();
1667 let stored_reason = flow_meta
1668 .get(1)
1669 .and_then(|v| v.as_ref())
1670 .filter(|s| !s.is_empty())
1671 .cloned();
1672 let all_members: Vec<String> = self
1673 .client
1674 .cmd("SMEMBERS")
1675 .arg(fctx.members())
1676 .execute()
1677 .await
1678 .map_err(|e| crate::server::backend_context(e, "SMEMBERS flow members (already terminal)"))?;
1679 let total_members = all_members.len();
1686 let stored_members: Vec<String> = all_members
1687 .into_iter()
1688 .take(ALREADY_TERMINAL_MEMBER_CAP)
1689 .collect();
1690 tracing::debug!(
1691 flow_id = %args.flow_id,
1692 stored_policy = stored_policy.as_deref().unwrap_or(""),
1693 stored_reason = stored_reason.as_deref().unwrap_or(""),
1694 total_members,
1695 returned_members = stored_members.len(),
1696 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1697 );
1698 return Ok(CancelFlowResult::Cancelled {
1699 cancellation_policy: stored_policy
1703 .unwrap_or_else(|| args.cancellation_policy.clone()),
1704 member_execution_ids: stored_members,
1705 });
1706 }
1707 };
1708 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1709
1710 if !needs_dispatch {
1711 return Ok(CancelFlowResult::Cancelled {
1712 cancellation_policy: policy,
1713 member_execution_ids: members,
1714 });
1715 }
1716
1717 let pending_cancels_key = fctx.pending_cancels();
1718 let cancel_backlog_key = fidx.cancel_backlog();
1719
1720 if wait {
1721 let mut failed: Vec<String> = Vec::new();
1730 for eid_str in &members {
1731 match cancel_member_execution(
1732 &self.client,
1733 &self.config.partition_config,
1734 eid_str,
1735 &args.reason,
1736 args.now,
1737 )
1738 .await
1739 {
1740 Ok(()) => {
1741 ack_cancel_member(
1742 &self.client,
1743 &pending_cancels_key,
1744 &cancel_backlog_key,
1745 eid_str,
1746 &args.flow_id.to_string(),
1747 )
1748 .await;
1749 }
1750 Err(e) => {
1751 if is_terminal_ack_error(&e) {
1758 ack_cancel_member(
1759 &self.client,
1760 &pending_cancels_key,
1761 &cancel_backlog_key,
1762 eid_str,
1763 &args.flow_id.to_string(),
1764 )
1765 .await;
1766 continue;
1767 }
1768 tracing::warn!(
1769 execution_id = %eid_str,
1770 error = %e,
1771 "cancel_flow(wait): individual execution cancel failed \
1772 (transport/contract fault; reconciler will retry if transient)"
1773 );
1774 failed.push(eid_str.clone());
1775 }
1776 }
1777 }
1778 if failed.is_empty() {
1779 return Ok(CancelFlowResult::Cancelled {
1780 cancellation_policy: policy,
1781 member_execution_ids: members,
1782 });
1783 }
1784 return Ok(CancelFlowResult::PartiallyCancelled {
1785 cancellation_policy: policy,
1786 member_execution_ids: members,
1787 failed_member_execution_ids: failed,
1788 });
1789 }
1790
1791 let client = self.client.clone();
1794 let partition_config = self.config.partition_config;
1795 let reason = args.reason.clone();
1796 let now = args.now;
1797 let dispatch_members = members.clone();
1798 let flow_id = args.flow_id.clone();
1799 let mut guard = self.background_tasks.lock().await;
1805
1806 while let Some(joined) = guard.try_join_next() {
1813 if let Err(e) = joined {
1814 tracing::warn!(
1815 error = %e,
1816 "cancel_flow: background dispatch task panicked or was aborted"
1817 );
1818 }
1819 }
1820
1821 let pending_key_owned = pending_cancels_key.clone();
1822 let backlog_key_owned = cancel_backlog_key.clone();
1823 let flow_id_str = args.flow_id.to_string();
1824
1825 guard.spawn(async move {
1826 use futures::stream::StreamExt;
1833 const CONCURRENCY: usize = 16;
1834
1835 let member_count = dispatch_members.len();
1836 let flow_id_for_log = flow_id.clone();
1837 futures::stream::iter(dispatch_members)
1838 .map(|eid_str| {
1839 let client = client.clone();
1840 let reason = reason.clone();
1841 let flow_id = flow_id.clone();
1842 let pending = pending_key_owned.clone();
1843 let backlog = backlog_key_owned.clone();
1844 let flow_id_str = flow_id_str.clone();
1845 async move {
1846 match cancel_member_execution(
1847 &client,
1848 &partition_config,
1849 &eid_str,
1850 &reason,
1851 now,
1852 )
1853 .await
1854 {
1855 Ok(()) => {
1856 ack_cancel_member(
1857 &client,
1858 &pending,
1859 &backlog,
1860 &eid_str,
1861 &flow_id_str,
1862 )
1863 .await;
1864 }
1865 Err(e) => {
1866 if is_terminal_ack_error(&e) {
1867 ack_cancel_member(
1868 &client,
1869 &pending,
1870 &backlog,
1871 &eid_str,
1872 &flow_id_str,
1873 )
1874 .await;
1875 } else {
1876 tracing::warn!(
1877 flow_id = %flow_id,
1878 execution_id = %eid_str,
1879 error = %e,
1880 "cancel_flow(async): individual execution cancel failed \
1881 (transport/contract fault; reconciler will retry if transient)"
1882 );
1883 }
1884 }
1885 }
1886 }
1887 })
1888 .buffer_unordered(CONCURRENCY)
1889 .for_each(|()| async {})
1890 .await;
1891
1892 tracing::debug!(
1893 flow_id = %flow_id_for_log,
1894 member_count,
1895 concurrency = CONCURRENCY,
1896 "cancel_flow: background member dispatch complete"
1897 );
1898 });
1899 drop(guard);
1900
1901 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1902 Ok(CancelFlowResult::CancellationScheduled {
1903 cancellation_policy: policy,
1904 member_count,
1905 member_execution_ids: members,
1906 })
1907 }
1908
1909 pub async fn stage_dependency_edge(
1914 &self,
1915 args: &StageDependencyEdgeArgs,
1916 ) -> Result<StageDependencyEdgeResult, ServerError> {
1917 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1918 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1919
1920 let fcall_keys: Vec<String> = vec![
1922 fctx.core(),
1923 fctx.members(),
1924 fctx.edge(&args.edge_id),
1925 fctx.outgoing(&args.upstream_execution_id),
1926 fctx.incoming(&args.downstream_execution_id),
1927 fctx.grant(&args.edge_id.to_string()),
1928 ];
1929
1930 let fcall_args: Vec<String> = vec![
1933 args.flow_id.to_string(),
1934 args.edge_id.to_string(),
1935 args.upstream_execution_id.to_string(),
1936 args.downstream_execution_id.to_string(),
1937 args.dependency_kind.clone(),
1938 args.data_passing_ref.clone().unwrap_or_default(),
1939 args.expected_graph_revision.to_string(),
1940 args.now.to_string(),
1941 ];
1942
1943 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1944 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1945
1946 let raw: Value = self
1947 .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1948 .await?;
1949
1950 parse_stage_dependency_edge_result(&raw)
1951 }
1952
1953 pub async fn apply_dependency_to_child(
1958 &self,
1959 args: &ApplyDependencyToChildArgs,
1960 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1961 let partition = execution_partition(
1962 &args.downstream_execution_id,
1963 &self.config.partition_config,
1964 );
1965 let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1966 let idx = IndexKeys::new(&partition);
1967 let flow_partition = ff_core::partition::flow_partition(
1968 &args.flow_id,
1969 &self.config.partition_config,
1970 );
1971 let flow_ctx = ff_core::keys::FlowKeyContext::new(&flow_partition, &args.flow_id);
1972
1973 let lane_str: Option<String> = self
1975 .client
1976 .hget(&ctx.core(), "lane_id")
1977 .await
1978 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
1979 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1980
1981 let fcall_keys: Vec<String> = vec![
1984 ctx.core(),
1985 ctx.deps_meta(),
1986 ctx.deps_unresolved(),
1987 ctx.dep_edge(&args.edge_id),
1988 idx.lane_eligible(&lane),
1989 idx.lane_blocked_dependencies(&lane),
1990 ctx.deps_all_edges(),
1991 flow_ctx.edgegroup(&args.downstream_execution_id),
1992 ];
1993
1994 let fcall_args: Vec<String> = vec![
1997 args.flow_id.to_string(),
1998 args.edge_id.to_string(),
1999 args.upstream_execution_id.to_string(),
2000 args.graph_revision.to_string(),
2001 args.dependency_kind.clone(),
2002 args.data_passing_ref.clone().unwrap_or_default(),
2003 args.now.to_string(),
2004 ];
2005
2006 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2007 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2008
2009 let raw: Value = self
2010 .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
2011 .await?;
2012
2013 parse_apply_dependency_result(&raw)
2014 }
2015
2016 pub async fn deliver_signal(
2023 &self,
2024 args: &DeliverSignalArgs,
2025 ) -> Result<DeliverSignalResult, ServerError> {
2026 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
2027 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
2028 let idx = IndexKeys::new(&partition);
2029
2030 let lane_str: Option<String> = self
2032 .client
2033 .hget(&ctx.core(), "lane_id")
2034 .await
2035 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2036 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2037
2038 let wp_id = &args.waitpoint_id;
2039 let sig_id = &args.signal_id;
2040 let idem_key = args
2041 .idempotency_key
2042 .as_ref()
2043 .filter(|k| !k.is_empty())
2044 .map(|k| ctx.signal_dedup(wp_id, k))
2045 .unwrap_or_else(|| ctx.noop());
2046
2047 let fcall_keys: Vec<String> = vec![
2053 ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
2068
2069 let fcall_args: Vec<String> = vec![
2076 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
2084 .map(|p| String::from_utf8_lossy(p).into_owned())
2085 .unwrap_or_default(), args.payload_encoding
2087 .clone()
2088 .unwrap_or_else(|| "json".to_owned()), args.idempotency_key
2090 .clone()
2091 .unwrap_or_default(), args.correlation_id
2093 .clone()
2094 .unwrap_or_default(), args.target_scope.clone(), args.created_at
2097 .map(|ts| ts.to_string())
2098 .unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
2103 .unwrap_or(10_000)
2104 .to_string(), args.waitpoint_token.as_str().to_owned(), ];
2109
2110 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2111 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2112
2113 let raw: Value = self
2114 .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2115 .await?;
2116
2117 parse_deliver_signal_result(&raw, &args.signal_id)
2118 }
2119
2120 pub async fn change_priority(
2124 &self,
2125 execution_id: &ExecutionId,
2126 new_priority: i32,
2127 ) -> Result<ChangePriorityResult, ServerError> {
2128 let partition = execution_partition(execution_id, &self.config.partition_config);
2129 let ctx = ExecKeyContext::new(&partition, execution_id);
2130 let idx = IndexKeys::new(&partition);
2131
2132 let lane_str: Option<String> = self
2134 .client
2135 .hget(&ctx.core(), "lane_id")
2136 .await
2137 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
2138 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2139
2140 let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2142
2143 let fcall_args: Vec<String> = vec![
2145 execution_id.to_string(),
2146 new_priority.to_string(),
2147 ];
2148
2149 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2150 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2151
2152 let raw: Value = self
2153 .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2154 .await?;
2155
2156 parse_change_priority_result(&raw, execution_id)
2157 }
2158
2159 pub async fn claim_for_worker(
2173 &self,
2174 lane: &LaneId,
2175 worker_id: &WorkerId,
2176 worker_instance_id: &WorkerInstanceId,
2177 worker_capabilities: &std::collections::BTreeSet<String>,
2178 grant_ttl_ms: u64,
2179 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2180 self.scheduler
2181 .claim_for_worker(
2182 lane,
2183 worker_id,
2184 worker_instance_id,
2185 worker_capabilities,
2186 grant_ttl_ms,
2187 )
2188 .await
2189 .map_err(|e| match e {
2190 ff_scheduler::SchedulerError::Valkey(inner) => ServerError::from(inner),
2191 ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2192 crate::server::backend_context(source, context)
2193 }
2194 ff_scheduler::SchedulerError::Config(msg) => ServerError::InvalidInput(msg),
2195 })
2196 }
2197
2198 pub async fn revoke_lease(
2200 &self,
2201 execution_id: &ExecutionId,
2202 ) -> Result<RevokeLeaseResult, ServerError> {
2203 let partition = execution_partition(execution_id, &self.config.partition_config);
2204 let ctx = ExecKeyContext::new(&partition, execution_id);
2205 let idx = IndexKeys::new(&partition);
2206
2207 let wiid_str: Option<String> = self
2209 .client
2210 .hget(&ctx.core(), "current_worker_instance_id")
2211 .await
2212 .map_err(|e| crate::server::backend_context(e, "HGET worker_instance_id"))?;
2213 let wiid = match wiid_str {
2214 Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2215 _ => {
2216 return Err(ServerError::NotFound(format!(
2217 "no active lease for execution {execution_id} (no current_worker_instance_id)"
2218 )));
2219 }
2220 };
2221
2222 let fcall_keys: Vec<String> = vec![
2224 ctx.core(),
2225 ctx.lease_current(),
2226 ctx.lease_history(),
2227 idx.lease_expiry(),
2228 idx.worker_leases(&wiid),
2229 ];
2230
2231 let fcall_args: Vec<String> = vec![
2233 execution_id.to_string(),
2234 String::new(), "operator_revoke".to_owned(),
2236 ];
2237
2238 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2239 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2240
2241 let raw: Value = self
2242 .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2243 .await?;
2244
2245 parse_revoke_lease_result(&raw)
2246 }
2247
2248 pub async fn get_execution(
2250 &self,
2251 execution_id: &ExecutionId,
2252 ) -> Result<ExecutionInfo, ServerError> {
2253 let partition = execution_partition(execution_id, &self.config.partition_config);
2254 let ctx = ExecKeyContext::new(&partition, execution_id);
2255
2256 let fields: HashMap<String, String> = self
2257 .client
2258 .hgetall(&ctx.core())
2259 .await
2260 .map_err(|e| crate::server::backend_context(e, "HGETALL exec_core"))?;
2261
2262 if fields.is_empty() {
2263 return Err(ServerError::NotFound(format!(
2264 "execution not found: {execution_id}"
2265 )));
2266 }
2267
2268 let parse_enum = |field: &str| -> String {
2269 fields.get(field).cloned().unwrap_or_default()
2270 };
2271 fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2272 let quoted = format!("\"{raw}\"");
2273 serde_json::from_str("ed).map_err(|e| {
2274 ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2275 })
2276 }
2277
2278 let lp_str = parse_enum("lifecycle_phase");
2279 let os_str = parse_enum("ownership_state");
2280 let es_str = parse_enum("eligibility_state");
2281 let br_str = parse_enum("blocking_reason");
2282 let to_str = parse_enum("terminal_outcome");
2283 let as_str = parse_enum("attempt_state");
2284 let ps_str = parse_enum("public_state");
2285
2286 let state_vector = StateVector {
2287 lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2288 ownership_state: deserialize("ownership_state", &os_str)?,
2289 eligibility_state: deserialize("eligibility_state", &es_str)?,
2290 blocking_reason: deserialize("blocking_reason", &br_str)?,
2291 terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2292 attempt_state: deserialize("attempt_state", &as_str)?,
2293 public_state: deserialize("public_state", &ps_str)?,
2294 };
2295
2296 let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2303
2304 let started_at_opt = fields
2311 .get("started_at")
2312 .filter(|s| !s.is_empty())
2313 .cloned();
2314 let completed_at_opt = fields
2315 .get("completed_at")
2316 .filter(|s| !s.is_empty())
2317 .cloned();
2318
2319 Ok(ExecutionInfo {
2320 execution_id: execution_id.clone(),
2321 namespace: parse_enum("namespace"),
2322 lane_id: parse_enum("lane_id"),
2323 priority: fields
2324 .get("priority")
2325 .and_then(|v| v.parse().ok())
2326 .unwrap_or(0),
2327 execution_kind: parse_enum("execution_kind"),
2328 state_vector,
2329 public_state: deserialize("public_state", &ps_str)?,
2330 created_at: parse_enum("created_at"),
2331 started_at: started_at_opt,
2332 completed_at: completed_at_opt,
2333 current_attempt_index: fields
2334 .get("current_attempt_index")
2335 .and_then(|v| v.parse().ok())
2336 .unwrap_or(0),
2337 flow_id: flow_id_val,
2338 blocking_detail: parse_enum("blocking_detail"),
2339 })
2340 }
2341
2342 pub async fn list_executions_page(
2352 &self,
2353 partition_id: u16,
2354 cursor: Option<ExecutionId>,
2355 limit: usize,
2356 ) -> Result<ListExecutionsPage, ServerError> {
2357 if limit == 0 {
2358 return Ok(ListExecutionsPage::new(Vec::new(), None));
2359 }
2360 let partition = ff_core::partition::Partition {
2361 family: ff_core::partition::PartitionFamily::Execution,
2362 index: partition_id,
2363 };
2364 let idx = IndexKeys::new(&partition);
2365 let all_key = idx.all_executions();
2366
2367 let raw_members: Vec<String> = self
2368 .client
2369 .cmd("SMEMBERS")
2370 .arg(&all_key)
2371 .execute()
2372 .await
2373 .map_err(|e| crate::server::backend_context(e, format!("SMEMBERS {all_key}")))?;
2374
2375 if raw_members.is_empty() {
2376 return Ok(ListExecutionsPage::new(Vec::new(), None));
2377 }
2378
2379 let mut parsed: Vec<ExecutionId> = Vec::with_capacity(raw_members.len());
2380 for raw in &raw_members {
2381 match ExecutionId::parse(raw) {
2382 Ok(id) => parsed.push(id),
2383 Err(e) => {
2384 tracing::warn!(
2385 raw_id = %raw,
2386 error = %e,
2387 set = %all_key,
2388 "list_executions_page: SMEMBERS member failed to parse as ExecutionId \
2389 (data corruption?)"
2390 );
2391 }
2392 }
2393 }
2394 parsed.sort_by(|a, b| a.as_str().cmp(b.as_str()));
2395
2396 let filtered: Vec<ExecutionId> = if let Some(c) = cursor.as_ref() {
2397 let cs = c.as_str();
2398 parsed.into_iter().filter(|e| e.as_str() > cs).collect()
2399 } else {
2400 parsed
2401 };
2402
2403 let effective_limit = limit.min(1000);
2404 let has_more = filtered.len() > effective_limit;
2405 let page: Vec<ExecutionId> = filtered.into_iter().take(effective_limit).collect();
2406 let next_cursor = if has_more { page.last().cloned() } else { None };
2407 Ok(ListExecutionsPage::new(page, next_cursor))
2408 }
2409
2410 pub async fn replay_execution(
2415 &self,
2416 execution_id: &ExecutionId,
2417 ) -> Result<ReplayExecutionResult, ServerError> {
2418 let partition = execution_partition(execution_id, &self.config.partition_config);
2419 let ctx = ExecKeyContext::new(&partition, execution_id);
2420 let idx = IndexKeys::new(&partition);
2421
2422 let dyn_fields: Vec<Option<String>> = self
2434 .client
2435 .cmd("HMGET")
2436 .arg(ctx.core())
2437 .arg("lane_id")
2438 .arg("flow_id")
2439 .arg("terminal_outcome")
2440 .execute()
2441 .await
2442 .map_err(|e| crate::server::backend_context(e, "HMGET replay pre-read"))?;
2443 let lane = LaneId::new(
2444 dyn_fields
2445 .first()
2446 .and_then(|v| v.as_ref())
2447 .cloned()
2448 .unwrap_or_else(|| "default".to_owned()),
2449 );
2450 let flow_id_str = dyn_fields
2451 .get(1)
2452 .and_then(|v| v.as_ref())
2453 .cloned()
2454 .unwrap_or_default();
2455 let terminal_outcome = dyn_fields
2456 .get(2)
2457 .and_then(|v| v.as_ref())
2458 .cloned()
2459 .unwrap_or_default();
2460
2461 let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2462
2463 let mut fcall_keys: Vec<String> = vec![
2465 ctx.core(),
2466 idx.lane_terminal(&lane),
2467 idx.lane_eligible(&lane),
2468 ctx.lease_history(),
2469 ];
2470
2471 let now = TimestampMs::now();
2473 let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2474
2475 if is_skipped_flow_member {
2476 let flow_id = FlowId::parse(&flow_id_str)
2480 .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2481 let flow_part =
2482 flow_partition(&flow_id, &self.config.partition_config);
2483 let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2484 let edge_ids: Vec<String> = self
2485 .client
2486 .cmd("SMEMBERS")
2487 .arg(flow_ctx.incoming(execution_id))
2488 .execute()
2489 .await
2490 .map_err(|e| crate::server::backend_context(e, "SMEMBERS replay edges"))?;
2491
2492 fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
2497 let edge_id = EdgeId::parse(eid_str)
2498 .unwrap_or_else(|_| EdgeId::new());
2499 fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
2502 }
2503
2504 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2505 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2506
2507 let raw: Value = self
2508 .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2509 .await?;
2510
2511 parse_replay_result(&raw)
2512 }
2513
2514 pub async fn read_attempt_stream(
2526 &self,
2527 execution_id: &ExecutionId,
2528 attempt_index: AttemptIndex,
2529 from_id: &str,
2530 to_id: &str,
2531 count_limit: u64,
2532 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2533 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2534
2535 if count_limit == 0 {
2536 return Err(ServerError::InvalidInput(
2537 "count_limit must be >= 1".to_owned(),
2538 ));
2539 }
2540
2541 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2546 Ok(p) => p,
2547 Err(tokio::sync::TryAcquireError::NoPermits) => {
2548 return Err(ServerError::ConcurrencyLimitExceeded(
2549 "stream_ops",
2550 self.config.max_concurrent_stream_ops,
2551 ));
2552 }
2553 Err(tokio::sync::TryAcquireError::Closed) => {
2554 return Err(ServerError::OperationFailed(
2555 "stream semaphore closed (server shutting down)".into(),
2556 ));
2557 }
2558 };
2559
2560 let args = ReadFramesArgs {
2561 execution_id: execution_id.clone(),
2562 attempt_index,
2563 from_id: from_id.to_owned(),
2564 to_id: to_id.to_owned(),
2565 count_limit,
2566 };
2567
2568 let partition = execution_partition(execution_id, &self.config.partition_config);
2569 let ctx = ExecKeyContext::new(&partition, execution_id);
2570 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2571
2572 let result = ff_script::functions::stream::ff_read_attempt_stream(
2576 &self.tail_client, &keys, &args,
2577 )
2578 .await
2579 .map_err(script_error_to_server);
2580
2581 drop(permit);
2582
2583 match result? {
2584 ReadFramesResult::Frames(f) => Ok(f),
2585 }
2586 }
2587
2588 pub async fn tail_attempt_stream(
2606 &self,
2607 execution_id: &ExecutionId,
2608 attempt_index: AttemptIndex,
2609 last_id: &str,
2610 block_ms: u64,
2611 count_limit: u64,
2612 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2613 if count_limit == 0 {
2614 return Err(ServerError::InvalidInput(
2615 "count_limit must be >= 1".to_owned(),
2616 ));
2617 }
2618
2619 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2635 Ok(p) => p,
2636 Err(tokio::sync::TryAcquireError::NoPermits) => {
2637 return Err(ServerError::ConcurrencyLimitExceeded(
2638 "stream_ops",
2639 self.config.max_concurrent_stream_ops,
2640 ));
2641 }
2642 Err(tokio::sync::TryAcquireError::Closed) => {
2643 return Err(ServerError::OperationFailed(
2644 "stream semaphore closed (server shutting down)".into(),
2645 ));
2646 }
2647 };
2648
2649 let partition = execution_partition(execution_id, &self.config.partition_config);
2650 let ctx = ExecKeyContext::new(&partition, execution_id);
2651 let stream_key = ctx.stream(attempt_index);
2652 let stream_meta_key = ctx.stream_meta(attempt_index);
2653
2654 let _xread_guard = self.xread_block_lock.lock().await;
2662
2663 let result = ff_script::stream_tail::xread_block(
2664 &self.tail_client,
2665 &stream_key,
2666 &stream_meta_key,
2667 last_id,
2668 block_ms,
2669 count_limit,
2670 )
2671 .await
2672 .map_err(script_error_to_server);
2673
2674 drop(_xread_guard);
2675 drop(permit);
2676 result
2677 }
2678
2679 pub async fn shutdown(self) {
2702 tracing::info!("shutting down FlowFabric server");
2703
2704 self.stream_semaphore.close();
2709 tracing::info!(
2710 "stream semaphore closed; no new read/tail attempts will be accepted"
2711 );
2712
2713 let drain_timeout = Duration::from_secs(15);
2717 let background = self.background_tasks.clone();
2718 let drain = async move {
2719 let mut guard = background.lock().await;
2720 while guard.join_next().await.is_some() {}
2721 };
2722 match tokio::time::timeout(drain_timeout, drain).await {
2723 Ok(()) => {}
2724 Err(_) => {
2725 tracing::warn!(
2726 timeout_s = drain_timeout.as_secs(),
2727 "shutdown: background tasks did not finish in time, aborting"
2728 );
2729 self.background_tasks.lock().await.abort_all();
2730 }
2731 }
2732
2733 self.engine.shutdown().await;
2734 tracing::info!("FlowFabric server shutdown complete");
2735 }
2736}
2737
2738const REQUIRED_VALKEY_MAJOR: u32 = 7;
2744const REQUIRED_VALKEY_MINOR: u32 = 2;
2745
2746const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2751
2752async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2777 let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2778 let mut backoff = Duration::from_millis(200);
2779 loop {
2780 let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2781 match query_valkey_version(client).await {
2782 Ok((detected_major, detected_minor))
2783 if (detected_major, detected_minor)
2784 >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2785 {
2786 tracing::info!(
2787 detected_major,
2788 detected_minor,
2789 required_major = REQUIRED_VALKEY_MAJOR,
2790 required_minor = REQUIRED_VALKEY_MINOR,
2791 "Valkey version accepted"
2792 );
2793 return Ok(());
2794 }
2795 Ok((detected_major, detected_minor)) => (
2796 true,
2800 ServerError::ValkeyVersionTooLow {
2801 detected: format!("{detected_major}.{detected_minor}"),
2802 required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2803 },
2804 format!(
2805 "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2806 ),
2807 ),
2808 Err(e) => {
2809 let retryable = e
2814 .backend_kind()
2815 .map(|k| k.is_retryable())
2816 .unwrap_or(true);
2820 let detail = e.to_string();
2821 (retryable, e, detail)
2822 }
2823 };
2824
2825 if !should_retry {
2826 return Err(err_for_budget_exhaust);
2827 }
2828 if tokio::time::Instant::now() >= deadline {
2829 return Err(err_for_budget_exhaust);
2830 }
2831 tracing::warn!(
2832 backoff_ms = backoff.as_millis() as u64,
2833 detail = %log_detail,
2834 "valkey version check transient failure; retrying"
2835 );
2836 tokio::time::sleep(backoff).await;
2837 backoff = (backoff * 2).min(Duration::from_secs(5));
2838 }
2839}
2840
2841async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2854 let raw: Value = client
2855 .cmd("INFO")
2856 .arg("server")
2857 .execute()
2858 .await
2859 .map_err(|e| crate::server::backend_context(e, "INFO server"))?;
2860 let bodies = extract_info_bodies(&raw)?;
2861 let mut min_version: Option<(u32, u32)> = None;
2867 for body in &bodies {
2868 let version = parse_valkey_version(body)?;
2869 min_version = Some(match min_version {
2870 None => version,
2871 Some(existing) => existing.min(version),
2872 });
2873 }
2874 min_version.ok_or_else(|| {
2875 ServerError::OperationFailed(
2876 "valkey version check: cluster INFO returned no node bodies".into(),
2877 )
2878 })
2879}
2880
2881fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2887 match raw {
2888 Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2889 Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2890 Value::SimpleString(s) => Ok(vec![s.clone()]),
2891 Value::Map(entries) => {
2892 if entries.is_empty() {
2893 return Err(ServerError::OperationFailed(
2894 "valkey version check: cluster INFO returned empty map".into(),
2895 ));
2896 }
2897 let mut out = Vec::with_capacity(entries.len());
2898 for (_, body) in entries {
2899 out.extend(extract_info_bodies(body)?);
2900 }
2901 Ok(out)
2902 }
2903 other => Err(ServerError::OperationFailed(format!(
2904 "valkey version check: unexpected INFO shape: {other:?}"
2905 ))),
2906 }
2907}
2908
2909fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2924 let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2925 let trimmed = line.trim();
2926 let mut parts = trimmed.split('.');
2927 let major_str = parts.next().unwrap_or("").trim();
2928 if major_str.is_empty() {
2929 return Err(ServerError::OperationFailed(format!(
2930 "valkey version check: empty version field in '{trimmed}'"
2931 )));
2932 }
2933 let major = major_str.parse::<u32>().map_err(|_| {
2934 ServerError::OperationFailed(format!(
2935 "valkey version check: non-numeric major in '{trimmed}'"
2936 ))
2937 })?;
2938 let minor_str = parts.next().unwrap_or("").trim();
2942 if minor_str.is_empty() {
2943 return Err(ServerError::OperationFailed(format!(
2944 "valkey version check: missing minor component in '{trimmed}'"
2945 )));
2946 }
2947 let minor = minor_str.parse::<u32>().map_err(|_| {
2948 ServerError::OperationFailed(format!(
2949 "valkey version check: non-numeric minor in '{trimmed}'"
2950 ))
2951 })?;
2952 Ok((major, minor))
2953 };
2954 if let Some(valkey_line) = info
2956 .lines()
2957 .find_map(|line| line.strip_prefix("valkey_version:"))
2958 {
2959 return extract_major_minor(valkey_line);
2960 }
2961 let server_is_valkey = info
2966 .lines()
2967 .map(str::trim)
2968 .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
2969 if !server_is_valkey {
2970 return Err(ServerError::OperationFailed(
2971 "valkey version check: INFO missing valkey_version and server_name:valkey marker \
2972 (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
2973 .into(),
2974 ));
2975 }
2976 if let Some(redis_line) = info
2980 .lines()
2981 .find_map(|line| line.strip_prefix("redis_version:"))
2982 {
2983 return extract_major_minor(redis_line);
2984 }
2985 Err(ServerError::OperationFailed(
2986 "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
2987 .into(),
2988 ))
2989}
2990
2991async fn validate_or_create_partition_config(
2998 client: &Client,
2999 config: &PartitionConfig,
3000) -> Result<(), ServerError> {
3001 let key = keys::global_config_partitions();
3002
3003 let existing: HashMap<String, String> = client
3004 .hgetall(&key)
3005 .await
3006 .map_err(|e| crate::server::backend_context(e, format!("HGETALL {key}")))?;
3007
3008 if existing.is_empty() {
3009 tracing::info!("first boot: creating {key}");
3011 client
3012 .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
3013 .await
3014 .map_err(|e| crate::server::backend_context(e, "HSET num_flow_partitions"))?;
3015 client
3016 .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
3017 .await
3018 .map_err(|e| crate::server::backend_context(e, "HSET num_budget_partitions"))?;
3019 client
3020 .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
3021 .await
3022 .map_err(|e| crate::server::backend_context(e, "HSET num_quota_partitions"))?;
3023 return Ok(());
3024 }
3025
3026 let check = |field: &str, expected: u16| -> Result<(), ServerError> {
3028 let stored: u16 = existing
3029 .get(field)
3030 .and_then(|v| v.parse().ok())
3031 .unwrap_or(0);
3032 if stored != expected {
3033 return Err(ServerError::PartitionMismatch(format!(
3034 "{field}: stored={stored}, config={expected}. \
3035 Partition counts are fixed at deployment time. \
3036 Either fix your config or migrate the data."
3037 )));
3038 }
3039 Ok(())
3040 };
3041
3042 check("num_flow_partitions", config.num_flow_partitions)?;
3043 check("num_budget_partitions", config.num_budget_partitions)?;
3044 check("num_quota_partitions", config.num_quota_partitions)?;
3045
3046 tracing::info!("partition config validated against stored {key}");
3047 Ok(())
3048}
3049
3050const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3056
3057enum PartitionBootOutcome {
3060 Match,
3062 Mismatch,
3064 Repaired,
3066 Installed,
3068}
3069
3070const BOOT_INIT_CONCURRENCY: usize = 16;
3075
3076async fn init_one_partition(
3077 client: &Client,
3078 partition: Partition,
3079 secret_hex: &str,
3080) -> Result<PartitionBootOutcome, ServerError> {
3081 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3082
3083 let stored_kid: Option<String> = client
3091 .cmd("HGET")
3092 .arg(&key)
3093 .arg("current_kid")
3094 .execute()
3095 .await
3096 .map_err(|e| crate::server::backend_context(e, format!("HGET {key} current_kid (init probe)")))?;
3097
3098 if let Some(stored_kid) = stored_kid {
3099 let field = format!("secret:{stored_kid}");
3103 let stored_secret: Option<String> = client
3104 .hget(&key, &field)
3105 .await
3106 .map_err(|e| crate::server::backend_context(e, format!("HGET {key} secret:<kid> (init check)")))?;
3107 if stored_secret.is_none() {
3108 client
3114 .hset(&key, &field, secret_hex)
3115 .await
3116 .map_err(|e| crate::server::backend_context(e, format!("HSET {key} secret:<kid> (repair torn write)")))?;
3117 return Ok(PartitionBootOutcome::Repaired);
3118 }
3119 if stored_secret.as_deref() != Some(secret_hex) {
3120 return Ok(PartitionBootOutcome::Mismatch);
3121 }
3122 return Ok(PartitionBootOutcome::Match);
3123 }
3124
3125 let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3129 let _: i64 = client
3130 .cmd("HSET")
3131 .arg(&key)
3132 .arg("current_kid")
3133 .arg(WAITPOINT_HMAC_INITIAL_KID)
3134 .arg(&secret_field)
3135 .arg(secret_hex)
3136 .execute()
3137 .await
3138 .map_err(|e| crate::server::backend_context(e, format!("HSET {key} (init waitpoint HMAC atomic)")))?;
3139 Ok(PartitionBootOutcome::Installed)
3140}
3141
3142async fn initialize_waitpoint_hmac_secret(
3154 client: &Client,
3155 partition_config: &PartitionConfig,
3156 secret_hex: &str,
3157) -> Result<(), ServerError> {
3158 use futures::stream::{FuturesUnordered, StreamExt};
3159
3160 let n = partition_config.num_flow_partitions;
3161 tracing::info!(
3162 partitions = n,
3163 concurrency = BOOT_INIT_CONCURRENCY,
3164 "installing waitpoint HMAC secret across {n} execution partitions"
3165 );
3166
3167 let mut mismatch_count: u16 = 0;
3168 let mut repaired_count: u16 = 0;
3169 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3170 let mut next_index: u16 = 0;
3171
3172 loop {
3173 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3174 let partition = Partition {
3175 family: PartitionFamily::Execution,
3176 index: next_index,
3177 };
3178 let client = client.clone();
3179 let secret_hex = secret_hex.to_owned();
3180 pending.push(async move {
3181 init_one_partition(&client, partition, &secret_hex).await
3182 });
3183 next_index += 1;
3184 }
3185 match pending.next().await {
3186 Some(res) => match res? {
3187 PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3188 PartitionBootOutcome::Mismatch => mismatch_count += 1,
3189 PartitionBootOutcome::Repaired => repaired_count += 1,
3190 },
3191 None => break,
3192 }
3193 }
3194
3195 if repaired_count > 0 {
3196 tracing::warn!(
3197 repaired_partitions = repaired_count,
3198 total_partitions = n,
3199 "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3200 (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3201 );
3202 }
3203
3204 if mismatch_count > 0 {
3205 tracing::warn!(
3206 mismatched_partitions = mismatch_count,
3207 total_partitions = n,
3208 "stored/env secret mismatch on {mismatch_count} partitions — \
3209 env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3210 run POST /v1/admin/rotate-waitpoint-secret to sync"
3211 );
3212 }
3213
3214 tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3215 Ok(())
3216}
3217
3218#[derive(Debug, Clone, serde::Serialize)]
3220pub struct RotateWaitpointSecretResult {
3221 pub rotated: u16,
3223 pub failed: Vec<u16>,
3228 pub new_kid: String,
3230}
3231
3232impl Server {
3233 pub async fn rotate_waitpoint_secret(
3241 &self,
3242 new_kid: &str,
3243 new_secret_hex: &str,
3244 ) -> Result<RotateWaitpointSecretResult, ServerError> {
3245 if new_kid.is_empty() || new_kid.contains(':') {
3246 return Err(ServerError::OperationFailed(
3247 "new_kid must be non-empty and must not contain ':'".into(),
3248 ));
3249 }
3250 if new_secret_hex.is_empty()
3251 || !new_secret_hex.len().is_multiple_of(2)
3252 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3253 {
3254 return Err(ServerError::OperationFailed(
3255 "new_secret_hex must be a non-empty even-length hex string".into(),
3256 ));
3257 }
3258
3259 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3267 Ok(p) => p,
3268 Err(tokio::sync::TryAcquireError::NoPermits) => {
3269 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3270 }
3271 Err(tokio::sync::TryAcquireError::Closed) => {
3272 return Err(ServerError::OperationFailed(
3273 "admin rotate semaphore closed (server shutting down)".into(),
3274 ));
3275 }
3276 };
3277
3278 let n = self.config.partition_config.num_flow_partitions;
3279 let grace_ms = self.config.waitpoint_hmac_grace_ms;
3283
3284 use futures::stream::{FuturesUnordered, StreamExt};
3295
3296 let mut rotated = 0u16;
3297 let mut failed = Vec::new();
3298 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3299 let mut next_index: u16 = 0;
3300
3301 loop {
3302 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3303 let partition = Partition {
3304 family: PartitionFamily::Execution,
3305 index: next_index,
3306 };
3307 let idx = next_index;
3308 let new_kid_owned = new_kid.to_owned();
3313 let new_secret_owned = new_secret_hex.to_owned();
3314 let partition_owned = partition;
3315 let fut = async move {
3316 let outcome = self
3317 .rotate_single_partition(
3318 &partition_owned,
3319 &new_kid_owned,
3320 &new_secret_owned,
3321 grace_ms,
3322 )
3323 .await;
3324 (idx, partition_owned, outcome)
3325 };
3326 pending.push(fut);
3327 next_index += 1;
3328 }
3329 match pending.next().await {
3330 Some((idx, partition, outcome)) => match outcome {
3331 Ok(()) => {
3332 rotated += 1;
3333 tracing::debug!(
3341 partition = %partition,
3342 new_kid = %new_kid,
3343 "waitpoint_hmac_rotated"
3344 );
3345 }
3346 Err(e) => {
3347 tracing::error!(
3351 target: "audit",
3352 partition = %partition,
3353 err = %e,
3354 "waitpoint_hmac_rotation_failed"
3355 );
3356 failed.push(idx);
3357 }
3358 },
3359 None => break,
3360 }
3361 }
3362
3363 tracing::info!(
3367 target: "audit",
3368 new_kid = %new_kid,
3369 total_partitions = n,
3370 rotated,
3371 failed_count = failed.len(),
3372 "waitpoint_hmac_rotation_complete"
3373 );
3374
3375 Ok(RotateWaitpointSecretResult {
3376 rotated,
3377 failed,
3378 new_kid: new_kid.to_owned(),
3379 })
3380 }
3381
3382 async fn rotate_single_partition(
3389 &self,
3390 partition: &Partition,
3391 new_kid: &str,
3392 new_secret_hex: &str,
3393 grace_ms: u64,
3394 ) -> Result<(), ServerError> {
3395 let idx = IndexKeys::new(partition);
3396 let args = RotateWaitpointHmacSecretArgs {
3397 new_kid: new_kid.to_owned(),
3398 new_secret_hex: new_secret_hex.to_owned(),
3399 grace_ms,
3400 };
3401 let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3402 &self.client,
3403 &idx,
3404 &args,
3405 )
3406 .await
3407 .map_err(|e| match e {
3408 ff_script::ScriptError::RotationConflict(kid) => {
3412 ServerError::OperationFailed(format!(
3413 "rotation conflict: kid {kid} already installed with a \
3414 different secret. Either use a fresh kid or restore the \
3415 original secret for this kid before retrying."
3416 ))
3417 }
3418 ff_script::ScriptError::Valkey(v) => crate::server::backend_context(
3419 v,
3420 format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3421 ),
3422 other => ServerError::OperationFailed(format!(
3423 "rotation failed on partition {partition}: {other}"
3424 )),
3425 })?;
3426 let _ = outcome;
3429 Ok(())
3430 }
3431}
3432
3433fn parse_create_result(
3436 raw: &Value,
3437 execution_id: &ExecutionId,
3438) -> Result<CreateExecutionResult, ServerError> {
3439 let arr = match raw {
3440 Value::Array(arr) => arr,
3441 _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3442 };
3443
3444 let status = match arr.first() {
3445 Some(Ok(Value::Int(n))) => *n,
3446 _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3447 };
3448
3449 if status == 1 {
3450 let sub = arr
3452 .get(1)
3453 .and_then(|v| match v {
3454 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3455 Ok(Value::SimpleString(s)) => Some(s.clone()),
3456 _ => None,
3457 })
3458 .unwrap_or_default();
3459
3460 if sub == "DUPLICATE" {
3461 Ok(CreateExecutionResult::Duplicate {
3462 execution_id: execution_id.clone(),
3463 })
3464 } else {
3465 Ok(CreateExecutionResult::Created {
3466 execution_id: execution_id.clone(),
3467 public_state: PublicState::Waiting,
3468 })
3469 }
3470 } else {
3471 let error_code = arr
3472 .get(1)
3473 .and_then(|v| match v {
3474 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3475 Ok(Value::SimpleString(s)) => Some(s.clone()),
3476 _ => None,
3477 })
3478 .unwrap_or_else(|| "unknown".to_owned());
3479 Err(ServerError::OperationFailed(format!(
3480 "ff_create_execution failed: {error_code}"
3481 )))
3482 }
3483}
3484
3485fn parse_cancel_result(
3486 raw: &Value,
3487 execution_id: &ExecutionId,
3488) -> Result<CancelExecutionResult, ServerError> {
3489 let arr = match raw {
3490 Value::Array(arr) => arr,
3491 _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3492 };
3493
3494 let status = match arr.first() {
3495 Some(Ok(Value::Int(n))) => *n,
3496 _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3497 };
3498
3499 if status == 1 {
3500 Ok(CancelExecutionResult::Cancelled {
3501 execution_id: execution_id.clone(),
3502 public_state: PublicState::Cancelled,
3503 })
3504 } else {
3505 let error_code = arr
3506 .get(1)
3507 .and_then(|v| match v {
3508 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3509 Ok(Value::SimpleString(s)) => Some(s.clone()),
3510 _ => None,
3511 })
3512 .unwrap_or_else(|| "unknown".to_owned());
3513 Err(ServerError::OperationFailed(format!(
3514 "ff_cancel_execution failed: {error_code}"
3515 )))
3516 }
3517}
3518
3519fn parse_budget_create_result(
3520 raw: &Value,
3521 budget_id: &BudgetId,
3522) -> Result<CreateBudgetResult, ServerError> {
3523 let arr = match raw {
3524 Value::Array(arr) => arr,
3525 _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3526 };
3527
3528 let status = match arr.first() {
3529 Some(Ok(Value::Int(n))) => *n,
3530 _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3531 };
3532
3533 if status == 1 {
3534 let sub = arr
3535 .get(1)
3536 .and_then(|v| match v {
3537 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3538 Ok(Value::SimpleString(s)) => Some(s.clone()),
3539 _ => None,
3540 })
3541 .unwrap_or_default();
3542
3543 if sub == "ALREADY_SATISFIED" {
3544 Ok(CreateBudgetResult::AlreadySatisfied {
3545 budget_id: budget_id.clone(),
3546 })
3547 } else {
3548 Ok(CreateBudgetResult::Created {
3549 budget_id: budget_id.clone(),
3550 })
3551 }
3552 } else {
3553 let error_code = arr
3554 .get(1)
3555 .and_then(|v| match v {
3556 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3557 Ok(Value::SimpleString(s)) => Some(s.clone()),
3558 _ => None,
3559 })
3560 .unwrap_or_else(|| "unknown".to_owned());
3561 Err(ServerError::OperationFailed(format!(
3562 "ff_create_budget failed: {error_code}"
3563 )))
3564 }
3565}
3566
3567fn parse_quota_create_result(
3568 raw: &Value,
3569 quota_policy_id: &QuotaPolicyId,
3570) -> Result<CreateQuotaPolicyResult, ServerError> {
3571 let arr = match raw {
3572 Value::Array(arr) => arr,
3573 _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3574 };
3575
3576 let status = match arr.first() {
3577 Some(Ok(Value::Int(n))) => *n,
3578 _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3579 };
3580
3581 if status == 1 {
3582 let sub = arr
3583 .get(1)
3584 .and_then(|v| match v {
3585 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3586 Ok(Value::SimpleString(s)) => Some(s.clone()),
3587 _ => None,
3588 })
3589 .unwrap_or_default();
3590
3591 if sub == "ALREADY_SATISFIED" {
3592 Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3593 quota_policy_id: quota_policy_id.clone(),
3594 })
3595 } else {
3596 Ok(CreateQuotaPolicyResult::Created {
3597 quota_policy_id: quota_policy_id.clone(),
3598 })
3599 }
3600 } else {
3601 let error_code = arr
3602 .get(1)
3603 .and_then(|v| match v {
3604 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3605 Ok(Value::SimpleString(s)) => Some(s.clone()),
3606 _ => None,
3607 })
3608 .unwrap_or_else(|| "unknown".to_owned());
3609 Err(ServerError::OperationFailed(format!(
3610 "ff_create_quota_policy failed: {error_code}"
3611 )))
3612 }
3613}
3614
3615fn parse_create_flow_result(
3618 raw: &Value,
3619 flow_id: &FlowId,
3620) -> Result<CreateFlowResult, ServerError> {
3621 let arr = match raw {
3622 Value::Array(arr) => arr,
3623 _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3624 };
3625 let status = match arr.first() {
3626 Some(Ok(Value::Int(n))) => *n,
3627 _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3628 };
3629 if status == 1 {
3630 let sub = fcall_field_str(arr, 1);
3631 if sub == "ALREADY_SATISFIED" {
3632 Ok(CreateFlowResult::AlreadySatisfied {
3633 flow_id: flow_id.clone(),
3634 })
3635 } else {
3636 Ok(CreateFlowResult::Created {
3637 flow_id: flow_id.clone(),
3638 })
3639 }
3640 } else {
3641 let error_code = fcall_field_str(arr, 1);
3642 Err(ServerError::OperationFailed(format!(
3643 "ff_create_flow failed: {error_code}"
3644 )))
3645 }
3646}
3647
3648fn parse_add_execution_to_flow_result(
3649 raw: &Value,
3650) -> Result<AddExecutionToFlowResult, ServerError> {
3651 let arr = match raw {
3652 Value::Array(arr) => arr,
3653 _ => {
3654 return Err(ServerError::Script(
3655 "ff_add_execution_to_flow: expected Array".into(),
3656 ))
3657 }
3658 };
3659 let status = match arr.first() {
3660 Some(Ok(Value::Int(n))) => *n,
3661 _ => {
3662 return Err(ServerError::Script(
3663 "ff_add_execution_to_flow: bad status code".into(),
3664 ))
3665 }
3666 };
3667 if status == 1 {
3668 let sub = fcall_field_str(arr, 1);
3669 let eid_str = fcall_field_str(arr, 2);
3670 let nc_str = fcall_field_str(arr, 3);
3671 let eid = ExecutionId::parse(&eid_str)
3672 .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3673 let nc: u32 = nc_str.parse().unwrap_or(0);
3674 if sub == "ALREADY_SATISFIED" {
3675 Ok(AddExecutionToFlowResult::AlreadyMember {
3676 execution_id: eid,
3677 node_count: nc,
3678 })
3679 } else {
3680 Ok(AddExecutionToFlowResult::Added {
3681 execution_id: eid,
3682 new_node_count: nc,
3683 })
3684 }
3685 } else {
3686 let error_code = fcall_field_str(arr, 1);
3687 Err(ServerError::OperationFailed(format!(
3688 "ff_add_execution_to_flow failed: {error_code}"
3689 )))
3690 }
3691}
3692
3693enum ParsedCancelFlow {
3699 Cancelled {
3700 policy: String,
3701 member_execution_ids: Vec<String>,
3702 },
3703 AlreadyTerminal,
3704}
3705
3706fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3712 let arr = match raw {
3713 Value::Array(arr) => arr,
3714 _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3715 };
3716 let status = match arr.first() {
3717 Some(Ok(Value::Int(n))) => *n,
3718 _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3719 };
3720 if status != 1 {
3721 let error_code = fcall_field_str(arr, 1);
3722 if error_code == "flow_already_terminal" {
3723 return Ok(ParsedCancelFlow::AlreadyTerminal);
3724 }
3725 return Err(ServerError::OperationFailed(format!(
3726 "ff_cancel_flow failed: {error_code}"
3727 )));
3728 }
3729 let policy = fcall_field_str(arr, 2);
3731 let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3734 for i in 3..arr.len() {
3735 members.push(fcall_field_str(arr, i));
3736 }
3737 Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3738}
3739
3740fn parse_stage_dependency_edge_result(
3741 raw: &Value,
3742) -> Result<StageDependencyEdgeResult, ServerError> {
3743 let arr = match raw {
3744 Value::Array(arr) => arr,
3745 _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3746 };
3747 let status = match arr.first() {
3748 Some(Ok(Value::Int(n))) => *n,
3749 _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3750 };
3751 if status == 1 {
3752 let edge_id_str = fcall_field_str(arr, 2);
3753 let rev_str = fcall_field_str(arr, 3);
3754 let edge_id = EdgeId::parse(&edge_id_str)
3755 .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3756 let rev: u64 = rev_str.parse().unwrap_or(0);
3757 Ok(StageDependencyEdgeResult::Staged {
3758 edge_id,
3759 new_graph_revision: rev,
3760 })
3761 } else {
3762 let error_code = fcall_field_str(arr, 1);
3763 Err(ServerError::OperationFailed(format!(
3764 "ff_stage_dependency_edge failed: {error_code}"
3765 )))
3766 }
3767}
3768
3769fn parse_apply_dependency_result(
3770 raw: &Value,
3771) -> Result<ApplyDependencyToChildResult, ServerError> {
3772 let arr = match raw {
3773 Value::Array(arr) => arr,
3774 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3775 };
3776 let status = match arr.first() {
3777 Some(Ok(Value::Int(n))) => *n,
3778 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3779 };
3780 if status == 1 {
3781 let sub = fcall_field_str(arr, 1);
3782 if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3783 Ok(ApplyDependencyToChildResult::AlreadyApplied)
3784 } else {
3785 let count_str = fcall_field_str(arr, 2);
3787 let count: u32 = count_str.parse().unwrap_or(0);
3788 Ok(ApplyDependencyToChildResult::Applied {
3789 unsatisfied_count: count,
3790 })
3791 }
3792 } else {
3793 let error_code = fcall_field_str(arr, 1);
3794 Err(ServerError::OperationFailed(format!(
3795 "ff_apply_dependency_to_child failed: {error_code}"
3796 )))
3797 }
3798}
3799
3800fn parse_deliver_signal_result(
3801 raw: &Value,
3802 signal_id: &SignalId,
3803) -> Result<DeliverSignalResult, ServerError> {
3804 let arr = match raw {
3805 Value::Array(arr) => arr,
3806 _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3807 };
3808 let status = match arr.first() {
3809 Some(Ok(Value::Int(n))) => *n,
3810 _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3811 };
3812 if status == 1 {
3813 let sub = fcall_field_str(arr, 1);
3814 if sub == "DUPLICATE" {
3815 let existing_str = fcall_field_str(arr, 2);
3817 let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3818 Ok(DeliverSignalResult::Duplicate {
3819 existing_signal_id: existing_id,
3820 })
3821 } else {
3822 let effect = fcall_field_str(arr, 3);
3824 Ok(DeliverSignalResult::Accepted {
3825 signal_id: signal_id.clone(),
3826 effect,
3827 })
3828 }
3829 } else {
3830 let error_code = fcall_field_str(arr, 1);
3831 Err(ServerError::OperationFailed(format!(
3832 "ff_deliver_signal failed: {error_code}"
3833 )))
3834 }
3835}
3836
3837fn parse_change_priority_result(
3838 raw: &Value,
3839 execution_id: &ExecutionId,
3840) -> Result<ChangePriorityResult, ServerError> {
3841 let arr = match raw {
3842 Value::Array(arr) => arr,
3843 _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3844 };
3845 let status = match arr.first() {
3846 Some(Ok(Value::Int(n))) => *n,
3847 _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3848 };
3849 if status == 1 {
3850 Ok(ChangePriorityResult::Changed {
3851 execution_id: execution_id.clone(),
3852 })
3853 } else {
3854 let error_code = fcall_field_str(arr, 1);
3855 Err(ServerError::OperationFailed(format!(
3856 "ff_change_priority failed: {error_code}"
3857 )))
3858 }
3859}
3860
3861fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3862 let arr = match raw {
3863 Value::Array(arr) => arr,
3864 _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3865 };
3866 let status = match arr.first() {
3867 Some(Ok(Value::Int(n))) => *n,
3868 _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3869 };
3870 if status == 1 {
3871 let unsatisfied = fcall_field_str(arr, 2);
3873 let ps = if unsatisfied == "0" {
3874 PublicState::Waiting
3875 } else {
3876 PublicState::WaitingChildren
3877 };
3878 Ok(ReplayExecutionResult::Replayed { public_state: ps })
3879 } else {
3880 let error_code = fcall_field_str(arr, 1);
3881 Err(ServerError::OperationFailed(format!(
3882 "ff_replay_execution failed: {error_code}"
3883 )))
3884 }
3885}
3886
3887fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3898 match e {
3899 ff_script::error::ScriptError::Valkey(valkey_err) => {
3900 crate::server::backend_context(valkey_err, "stream FCALL transport")
3901 }
3902 other => ServerError::Script(other.to_string()),
3903 }
3904}
3905
3906fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3907 match arr.get(index) {
3908 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3909 Some(Ok(Value::SimpleString(s))) => s.clone(),
3910 Some(Ok(Value::Int(n))) => n.to_string(),
3911 _ => String::new(),
3912 }
3913}
3914
3915fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3919 let arr = match raw {
3920 Value::Array(arr) => arr,
3921 _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3922 };
3923 let status_code = match arr.first() {
3924 Some(Ok(Value::Int(n))) => *n,
3925 _ => {
3926 return Err(ServerError::Script(
3927 "ff_report_usage_and_check: expected Int status code".into(),
3928 ))
3929 }
3930 };
3931 if status_code != 1 {
3932 let error_code = fcall_field_str(arr, 1);
3933 return Err(ServerError::OperationFailed(format!(
3934 "ff_report_usage_and_check failed: {error_code}"
3935 )));
3936 }
3937 let sub_status = fcall_field_str(arr, 1);
3938 match sub_status.as_str() {
3939 "OK" => Ok(ReportUsageResult::Ok),
3940 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3941 "SOFT_BREACH" => {
3942 let dim = fcall_field_str(arr, 2);
3943 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3944 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3945 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3946 }
3947 "HARD_BREACH" => {
3948 let dim = fcall_field_str(arr, 2);
3949 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3950 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3951 Ok(ReportUsageResult::HardBreach {
3952 dimension: dim,
3953 current_usage: current,
3954 hard_limit: limit,
3955 })
3956 }
3957 _ => Err(ServerError::OperationFailed(format!(
3958 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3959 ))),
3960 }
3961}
3962
3963fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3964 let arr = match raw {
3965 Value::Array(arr) => arr,
3966 _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3967 };
3968 let status = match arr.first() {
3969 Some(Ok(Value::Int(n))) => *n,
3970 _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3971 };
3972 if status == 1 {
3973 let sub = fcall_field_str(arr, 1);
3974 if sub == "ALREADY_SATISFIED" {
3975 let reason = fcall_field_str(arr, 2);
3976 Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3977 } else {
3978 let lid = fcall_field_str(arr, 2);
3979 let epoch = fcall_field_str(arr, 3);
3980 Ok(RevokeLeaseResult::Revoked {
3981 lease_id: lid,
3982 lease_epoch: epoch,
3983 })
3984 }
3985 } else {
3986 let error_code = fcall_field_str(arr, 1);
3987 Err(ServerError::OperationFailed(format!(
3988 "ff_revoke_lease failed: {error_code}"
3989 )))
3990 }
3991}
3992
3993fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3999 if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
4000 return true;
4001 }
4002 e.detail()
4003 .map(|d| {
4004 d.contains("Function not loaded")
4005 || d.contains("No matching function")
4006 || d.contains("function not found")
4007 })
4008 .unwrap_or(false)
4009 || e.to_string().contains("Function not loaded")
4010}
4011
4012async fn fcall_with_reload_on_client(
4015 client: &Client,
4016 function: &str,
4017 keys: &[&str],
4018 args: &[&str],
4019) -> Result<Value, ServerError> {
4020 match client.fcall(function, keys, args).await {
4021 Ok(v) => Ok(v),
4022 Err(e) if is_function_not_loaded(&e) => {
4023 tracing::warn!(function, "Lua library not found on server, reloading");
4024 ff_script::loader::ensure_library(client)
4025 .await
4026 .map_err(ServerError::LibraryLoad)?;
4027 client
4028 .fcall(function, keys, args)
4029 .await
4030 .map_err(ServerError::from)
4031 }
4032 Err(e) => Err(ServerError::from(e)),
4033 }
4034}
4035
4036async fn build_cancel_execution_fcall(
4040 client: &Client,
4041 partition_config: &PartitionConfig,
4042 args: &CancelExecutionArgs,
4043) -> Result<(Vec<String>, Vec<String>), ServerError> {
4044 let partition = execution_partition(&args.execution_id, partition_config);
4045 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4046 let idx = IndexKeys::new(&partition);
4047
4048 let lane_str: Option<String> = client
4049 .hget(&ctx.core(), "lane_id")
4050 .await
4051 .map_err(|e| crate::server::backend_context(e, "HGET lane_id"))?;
4052 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4053
4054 let dyn_fields: Vec<Option<String>> = client
4055 .cmd("HMGET")
4056 .arg(ctx.core())
4057 .arg("current_attempt_index")
4058 .arg("current_waitpoint_id")
4059 .arg("current_worker_instance_id")
4060 .execute()
4061 .await
4062 .map_err(|e| crate::server::backend_context(e, "HMGET cancel pre-read"))?;
4063
4064 let att_idx_val = dyn_fields.first()
4065 .and_then(|v| v.as_ref())
4066 .and_then(|s| s.parse::<u32>().ok())
4067 .unwrap_or(0);
4068 let att_idx = AttemptIndex::new(att_idx_val);
4069 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4070 let wp_id = if wp_id_str.is_empty() {
4071 WaitpointId::new()
4072 } else {
4073 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4074 };
4075 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4076 let wiid = WorkerInstanceId::new(&wiid_str);
4077
4078 let keys: Vec<String> = vec![
4079 ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
4101 let argv: Vec<String> = vec![
4102 args.execution_id.to_string(),
4103 args.reason.clone(),
4104 args.source.to_string(),
4105 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4106 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4107 ];
4108 Ok((keys, argv))
4109}
4110
4111const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4115
4116fn extract_backend_kind(e: &ServerError) -> Option<ff_core::BackendErrorKind> {
4127 e.backend_kind()
4128}
4129
4130async fn ack_cancel_member(
4147 client: &Client,
4148 pending_cancels_key: &str,
4149 cancel_backlog_key: &str,
4150 eid_str: &str,
4151 flow_id: &str,
4152) {
4153 let keys = [pending_cancels_key, cancel_backlog_key];
4154 let args_v = [eid_str, flow_id];
4155 let fut: Result<Value, _> =
4156 client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4157 if let Err(e) = fut {
4158 tracing::warn!(
4159 flow_id = %flow_id,
4160 execution_id = %eid_str,
4161 error = %e,
4162 "ff_ack_cancel_member failed; reconciler will drain on next pass"
4163 );
4164 }
4165}
4166
4167fn is_terminal_ack_error(err: &ServerError) -> bool {
4176 match err {
4177 ServerError::OperationFailed(msg) => {
4178 msg.contains("execution_not_active") || msg.contains("execution_not_found")
4179 }
4180 _ => false,
4181 }
4182}
4183
4184async fn cancel_member_execution(
4185 client: &Client,
4186 partition_config: &PartitionConfig,
4187 eid_str: &str,
4188 reason: &str,
4189 now: TimestampMs,
4190) -> Result<(), ServerError> {
4191 let execution_id = ExecutionId::parse(eid_str)
4192 .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4193 let args = CancelExecutionArgs {
4194 execution_id: execution_id.clone(),
4195 reason: reason.to_owned(),
4196 source: CancelSource::OperatorOverride,
4197 lease_id: None,
4198 lease_epoch: None,
4199 attempt_id: None,
4200 now,
4201 };
4202
4203 let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4204 for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4205 let is_last = attempt_idx + 1 == attempts;
4206 match try_cancel_member_once(client, partition_config, &args).await {
4207 Ok(()) => return Ok(()),
4208 Err(e) => {
4209 let retryable = extract_backend_kind(&e)
4213 .map(|k| k.is_retryable())
4214 .unwrap_or(false);
4215 if !retryable || is_last {
4216 return Err(e);
4217 }
4218 tracing::debug!(
4219 execution_id = %execution_id,
4220 attempt = attempt_idx + 1,
4221 delay_ms = *delay_ms,
4222 error = %e,
4223 "cancel_member_execution: transient error, retrying"
4224 );
4225 tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4226 }
4227 }
4228 }
4229 Err(ServerError::OperationFailed(format!(
4233 "cancel_member_execution: retries exhausted for {execution_id}"
4234 )))
4235}
4236
4237async fn try_cancel_member_once(
4240 client: &Client,
4241 partition_config: &PartitionConfig,
4242 args: &CancelExecutionArgs,
4243) -> Result<(), ServerError> {
4244 let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4245 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4246 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4247 let raw =
4248 fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4249 parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4250}
4251
4252fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4253 let arr = match raw {
4254 Value::Array(arr) => arr,
4255 _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4256 };
4257 let status = match arr.first() {
4258 Some(Ok(Value::Int(n))) => *n,
4259 _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4260 };
4261 if status == 1 {
4262 let next_str = fcall_field_str(arr, 2);
4263 let next_ms: i64 = next_str.parse().unwrap_or(0);
4264 Ok(ResetBudgetResult::Reset {
4265 next_reset_at: TimestampMs::from_millis(next_ms),
4266 })
4267 } else {
4268 let error_code = fcall_field_str(arr, 1);
4269 Err(ServerError::OperationFailed(format!(
4270 "ff_reset_budget failed: {error_code}"
4271 )))
4272 }
4273}
4274
4275#[cfg(test)]
4276mod tests {
4277 use super::*;
4278 use ferriskey::ErrorKind;
4279
4280 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4281 ferriskey::Error::from((kind, "synthetic"))
4282 }
4283
4284 #[test]
4287 fn create_budget_rejects_over_cap_dimension_count() {
4288 let n = MAX_BUDGET_DIMENSIONS + 1;
4289 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4290 let hard = vec![1u64; n];
4291 let soft = vec![0u64; n];
4292 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4293 match err {
4294 ServerError::InvalidInput(msg) => {
4295 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4296 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4297 assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4298 }
4299 other => panic!("expected InvalidInput, got {other:?}"),
4300 }
4301 }
4302
4303 #[test]
4304 fn create_budget_accepts_exactly_cap_dimensions() {
4305 let n = MAX_BUDGET_DIMENSIONS;
4306 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4307 let hard = vec![1u64; n];
4308 let soft = vec![0u64; n];
4309 assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4310 }
4311
4312 #[test]
4313 fn create_budget_rejects_hard_limit_length_mismatch() {
4314 let dims = vec!["a".to_string(), "b".to_string()];
4315 let hard = vec![1u64]; let soft = vec![0u64, 0u64];
4317 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4318 match err {
4319 ServerError::InvalidInput(msg) => {
4320 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4321 assert!(msg.contains("hard_limits=1"), "got: {msg}");
4322 assert!(msg.contains("dimensions=2"), "got: {msg}");
4323 }
4324 other => panic!("expected InvalidInput, got {other:?}"),
4325 }
4326 }
4327
4328 #[test]
4329 fn create_budget_rejects_soft_limit_length_mismatch() {
4330 let dims = vec!["a".to_string(), "b".to_string()];
4331 let hard = vec![1u64, 2u64];
4332 let soft = vec![0u64, 0u64, 0u64]; let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4334 match err {
4335 ServerError::InvalidInput(msg) => {
4336 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4337 assert!(msg.contains("soft_limits=3"), "got: {msg}");
4338 }
4339 other => panic!("expected InvalidInput, got {other:?}"),
4340 }
4341 }
4342
4343 #[test]
4344 fn report_usage_rejects_over_cap_dimension_count() {
4345 let n = MAX_BUDGET_DIMENSIONS + 1;
4346 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4347 let deltas = vec![1u64; n];
4348 let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4349 match err {
4350 ServerError::InvalidInput(msg) => {
4351 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4352 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4353 }
4354 other => panic!("expected InvalidInput, got {other:?}"),
4355 }
4356 }
4357
4358 #[test]
4359 fn report_usage_accepts_exactly_cap_dimensions() {
4360 let n = MAX_BUDGET_DIMENSIONS;
4361 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4362 let deltas = vec![1u64; n];
4363 assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4364 }
4365
4366 #[test]
4367 fn report_usage_rejects_delta_length_mismatch() {
4368 let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4369 let deltas = vec![1u64, 2u64]; let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4371 match err {
4372 ServerError::InvalidInput(msg) => {
4373 assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4374 assert!(msg.contains("dimensions=3"), "got: {msg}");
4375 assert!(msg.contains("deltas=2"), "got: {msg}");
4376 }
4377 other => panic!("expected InvalidInput, got {other:?}"),
4378 }
4379 }
4380
4381 #[test]
4382 fn report_usage_accepts_empty_dimensions() {
4383 assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4386 }
4387
4388 #[test]
4389 fn is_retryable_backend_variant_uses_kind_table() {
4390 assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
4392 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4393 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4394 assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4399 assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4400 assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
4401 assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
4402 assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4404
4405 assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4407 assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4408 assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4409 }
4410
4411 #[test]
4412 fn is_retryable_backend_context_uses_kind_table() {
4413 let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
4414 assert!(err.is_retryable());
4415
4416 let err =
4417 crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
4418 assert!(!err.is_retryable());
4419 }
4420
4421 #[test]
4422 fn is_retryable_library_load_delegates_to_inner_kind() {
4423 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4424 mk_fk_err(ErrorKind::IoError),
4425 ));
4426 assert!(err.is_retryable());
4427
4428 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4429 mk_fk_err(ErrorKind::AuthenticationFailed),
4430 ));
4431 assert!(!err.is_retryable());
4432
4433 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4434 expected: "1".into(),
4435 got: "2".into(),
4436 });
4437 assert!(!err.is_retryable());
4438 }
4439
4440 #[test]
4441 fn is_retryable_business_logic_variants_are_false() {
4442 assert!(!ServerError::NotFound("x".into()).is_retryable());
4443 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4444 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4445 assert!(!ServerError::Script("x".into()).is_retryable());
4446 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4447 }
4448
4449 #[test]
4450 fn backend_kind_delegates_through_library_load() {
4451 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4452 mk_fk_err(ErrorKind::ClusterDown),
4453 ));
4454 assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
4455
4456 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4457 expected: "1".into(),
4458 got: "2".into(),
4459 });
4460 assert_eq!(err.backend_kind(), None);
4461 }
4462
4463 #[test]
4466 fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4467 let info = "\
4471# Server\r\n\
4472redis_version:7.2.4\r\n\
4473valkey_version:9.0.3\r\n\
4474server_mode:cluster\r\n\
4475os:Linux\r\n";
4476 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4477 }
4478
4479 #[test]
4480 fn parse_valkey_version_real_valkey_8_cluster_body() {
4481 let info = "\
4485# Server\r\n\
4486redis_version:7.2.4\r\n\
4487server_name:valkey\r\n\
4488valkey_version:9.0.3\r\n\
4489valkey_release_stage:ga\r\n\
4490redis_git_sha1:00000000\r\n\
4491server_mode:cluster\r\n";
4492 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4493 }
4494
4495 #[test]
4496 fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4497 let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4500 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4501 }
4502
4503 #[test]
4504 fn parse_valkey_version_rejects_redis_backend() {
4505 let info = "\
4510# Server\r\n\
4511redis_version:7.4.0\r\n\
4512redis_mode:standalone\r\n\
4513os:Linux\r\n";
4514 let err = parse_valkey_version(info).unwrap_err();
4515 assert!(matches!(err, ServerError::OperationFailed(_)));
4516 let msg = err.to_string();
4517 assert!(
4518 msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4519 "expected Redis-rejection message, got: {msg}"
4520 );
4521 }
4522
4523 #[test]
4524 fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4525 let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4527 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4528 }
4529
4530 #[test]
4531 fn parse_valkey_version_errors_when_no_version_field() {
4532 let info = "# Server\r\nfoo:bar\r\n";
4533 let err = parse_valkey_version(info).unwrap_err();
4534 assert!(matches!(err, ServerError::OperationFailed(_)));
4535 assert!(
4536 err.to_string().contains("missing"),
4537 "expected 'missing' in message, got: {err}"
4538 );
4539 }
4540
4541 #[test]
4542 fn parse_valkey_version_errors_on_non_numeric_major() {
4543 let info = "valkey_version:invalid.x.y\n";
4544 let err = parse_valkey_version(info).unwrap_err();
4545 assert!(matches!(err, ServerError::OperationFailed(_)));
4546 assert!(err.to_string().contains("non-numeric major"));
4547 }
4548
4549 #[test]
4550 fn parse_valkey_version_errors_on_non_numeric_minor() {
4551 let info = "valkey_version:7.x.0\n";
4552 let err = parse_valkey_version(info).unwrap_err();
4553 assert!(matches!(err, ServerError::OperationFailed(_)));
4554 assert!(err.to_string().contains("non-numeric minor"));
4555 }
4556
4557 #[test]
4558 fn parse_valkey_version_errors_on_missing_minor() {
4559 let info = "valkey_version:7\n";
4562 let err = parse_valkey_version(info).unwrap_err();
4563 assert!(matches!(err, ServerError::OperationFailed(_)));
4564 assert!(err.to_string().contains("missing minor"));
4565 }
4566
4567 #[test]
4568 fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4569 let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4573 let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4574 let map = Value::Map(vec![
4575 (
4576 Value::SimpleString("127.0.0.1:7000".to_string()),
4577 Value::VerbatimString {
4578 format: ferriskey::value::VerbatimFormat::Text,
4579 text: body_a.to_string(),
4580 },
4581 ),
4582 (
4583 Value::SimpleString("127.0.0.1:7001".to_string()),
4584 Value::VerbatimString {
4585 format: ferriskey::value::VerbatimFormat::Text,
4586 text: body_b.to_string(),
4587 },
4588 ),
4589 ]);
4590 let bodies = extract_info_bodies(&map).unwrap();
4591 assert_eq!(bodies.len(), 2);
4592 assert_eq!(bodies[0], body_a);
4593 assert_eq!(bodies[1], body_b);
4594 }
4595
4596 #[test]
4597 fn extract_info_bodies_handles_simple_string() {
4598 let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4599 let v = Value::SimpleString(body_text.to_string());
4600 let bodies = extract_info_bodies(&v).unwrap();
4601 assert_eq!(bodies, vec![body_text.to_string()]);
4602 }
4603
4604 #[test]
4605 fn extract_info_bodies_rejects_empty_cluster_map() {
4606 let map = Value::Map(vec![]);
4607 let err = extract_info_bodies(&map).unwrap_err();
4608 assert!(matches!(err, ServerError::OperationFailed(_)));
4609 assert!(err.to_string().contains("empty map"));
4610 }
4611
4612 #[test]
4618 fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4619 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4623 let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4624 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4625 let map = Value::Map(vec![
4626 (
4627 Value::SimpleString("node1:6379".to_string()),
4628 Value::VerbatimString {
4629 format: ferriskey::value::VerbatimFormat::Text,
4630 text: body_node1.to_string(),
4631 },
4632 ),
4633 (
4634 Value::SimpleString("node2:6379".to_string()),
4635 Value::VerbatimString {
4636 format: ferriskey::value::VerbatimFormat::Text,
4637 text: body_node2.to_string(),
4638 },
4639 ),
4640 (
4641 Value::SimpleString("node3:6379".to_string()),
4642 Value::VerbatimString {
4643 format: ferriskey::value::VerbatimFormat::Text,
4644 text: body_node3.to_string(),
4645 },
4646 ),
4647 ]);
4648
4649 let bodies = extract_info_bodies(&map).unwrap();
4650 let min = bodies
4651 .iter()
4652 .map(|b| parse_valkey_version(b).unwrap())
4653 .min()
4654 .unwrap();
4655
4656 assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4657 assert!(
4658 min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4659 "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4660 );
4661 }
4662
4663 #[test]
4667 fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4668 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4669 let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4670 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4671 let map = Value::Map(vec![
4672 (
4673 Value::SimpleString("node1:6379".to_string()),
4674 Value::VerbatimString {
4675 format: ferriskey::value::VerbatimFormat::Text,
4676 text: body_node1.to_string(),
4677 },
4678 ),
4679 (
4680 Value::SimpleString("node2:6379".to_string()),
4681 Value::VerbatimString {
4682 format: ferriskey::value::VerbatimFormat::Text,
4683 text: body_node2.to_string(),
4684 },
4685 ),
4686 (
4687 Value::SimpleString("node3:6379".to_string()),
4688 Value::VerbatimString {
4689 format: ferriskey::value::VerbatimFormat::Text,
4690 text: body_node3.to_string(),
4691 },
4692 ),
4693 ]);
4694
4695 let bodies = extract_info_bodies(&map).unwrap();
4696 let min = bodies
4697 .iter()
4698 .map(|b| parse_valkey_version(b).unwrap())
4699 .min()
4700 .unwrap();
4701
4702 assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4703 assert!(
4704 min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4705 "all-above-floor cluster must pass the gate"
4706 );
4707 }
4708
4709 #[test]
4710 fn valkey_version_too_low_is_not_retryable() {
4711 let err = ServerError::ValkeyVersionTooLow {
4712 detected: "7.0".into(),
4713 required: "7.2".into(),
4714 };
4715 assert!(!err.is_retryable());
4716 assert_eq!(err.backend_kind(), None);
4717 }
4718
4719 #[test]
4720 fn valkey_version_too_low_error_message_includes_both_versions() {
4721 let err = ServerError::ValkeyVersionTooLow {
4722 detected: "7.0".into(),
4723 required: "7.2".into(),
4724 };
4725 let msg = err.to_string();
4726 assert!(msg.contains("7.0"), "detected version in message: {msg}");
4727 assert!(msg.contains("7.2"), "required version in message: {msg}");
4728 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4729 }
4730}