1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15 ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17 RevokeLeaseResult,
18 RotateWaitpointHmacSecretArgs,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22 self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23 IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26 budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27 PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42pub(crate) use ff_script::functions::budget::MAX_BUDGET_DIMENSIONS;
52
53fn validate_create_budget_dimensions(
63 dimensions: &[String],
64 hard_limits: &[u64],
65 soft_limits: &[u64],
66) -> Result<(), ServerError> {
67 let dim_count = dimensions.len();
68 if dim_count > MAX_BUDGET_DIMENSIONS {
69 return Err(ServerError::InvalidInput(format!(
70 "too_many_dimensions: limit={}, got={}",
71 MAX_BUDGET_DIMENSIONS, dim_count
72 )));
73 }
74 if hard_limits.len() != dim_count {
75 return Err(ServerError::InvalidInput(format!(
76 "dimension_limit_array_mismatch: dimensions={} hard_limits={}",
77 dim_count,
78 hard_limits.len()
79 )));
80 }
81 if soft_limits.len() != dim_count {
82 return Err(ServerError::InvalidInput(format!(
83 "dimension_limit_array_mismatch: dimensions={} soft_limits={}",
84 dim_count,
85 soft_limits.len()
86 )));
87 }
88 Ok(())
89}
90
91fn validate_report_usage_dimensions(
97 dimensions: &[String],
98 deltas: &[u64],
99) -> Result<(), ServerError> {
100 let dim_count = dimensions.len();
101 if dim_count > MAX_BUDGET_DIMENSIONS {
102 return Err(ServerError::InvalidInput(format!(
103 "too_many_dimensions: limit={}, got={}",
104 MAX_BUDGET_DIMENSIONS, dim_count
105 )));
106 }
107 if deltas.len() != dim_count {
108 return Err(ServerError::InvalidInput(format!(
109 "dimension_delta_array_mismatch: dimensions={} deltas={}",
110 dim_count,
111 deltas.len()
112 )));
113 }
114 Ok(())
115}
116
117pub struct Server {
122 client: Client,
123 tail_client: Client,
144 stream_semaphore: Arc<tokio::sync::Semaphore>,
157 xread_block_lock: Arc<tokio::sync::Mutex<()>>,
186 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
194 engine: Engine,
195 config: ServerConfig,
196 scheduler: Arc<ff_scheduler::Scheduler>,
201 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
204 metrics: Arc<ff_observability::Metrics>,
212}
213
214#[derive(Debug, thiserror::Error)]
216pub enum ServerError {
217 #[error("valkey: {0}")]
219 Valkey(#[from] ferriskey::Error),
220 #[error("valkey ({context}): {source}")]
222 ValkeyContext {
223 #[source]
224 source: ferriskey::Error,
225 context: String,
226 },
227 #[error("config: {0}")]
228 Config(#[from] crate::config::ConfigError),
229 #[error("library load: {0}")]
230 LibraryLoad(#[from] ff_script::loader::LoadError),
231 #[error("partition mismatch: {0}")]
232 PartitionMismatch(String),
233 #[error("not found: {0}")]
234 NotFound(String),
235 #[error("invalid input: {0}")]
236 InvalidInput(String),
237 #[error("operation failed: {0}")]
238 OperationFailed(String),
239 #[error("script: {0}")]
240 Script(String),
241 #[error("too many concurrent {0} calls (max: {1})")]
250 ConcurrencyLimitExceeded(&'static str, u32),
251 #[error(
256 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
257 )]
258 ValkeyVersionTooLow {
259 detected: String,
260 required: String,
261 },
262}
263
264impl ServerError {
265 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
269 match self {
270 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
271 Self::LibraryLoad(e) => e.valkey_kind(),
272 _ => None,
273 }
274 }
275
276 pub fn is_retryable(&self) -> bool {
282 match self {
283 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
284 is_retryable_kind(e.kind())
285 }
286 Self::LibraryLoad(load_err) => load_err
287 .valkey_kind()
288 .map(is_retryable_kind)
289 .unwrap_or(false),
290 Self::Config(_)
291 | Self::PartitionMismatch(_)
292 | Self::NotFound(_)
293 | Self::InvalidInput(_)
294 | Self::OperationFailed(_)
295 | Self::Script(_) => false,
296 Self::ConcurrencyLimitExceeded(_, _) => true,
300 Self::ValkeyVersionTooLow { .. } => false,
302 }
303 }
304}
305
306impl Server {
307 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
315 Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
316 }
317
318 pub async fn start_with_metrics(
328 config: ServerConfig,
329 metrics: Arc<ff_observability::Metrics>,
330 ) -> Result<Self, ServerError> {
331 tracing::info!(
333 host = %config.host, port = config.port,
334 tls = config.tls, cluster = config.cluster,
335 "connecting to Valkey"
336 );
337 let mut builder = ClientBuilder::new()
338 .host(&config.host, config.port)
339 .connect_timeout(Duration::from_secs(10))
340 .request_timeout(Duration::from_millis(5000));
341 if config.tls {
342 builder = builder.tls();
343 }
344 if config.cluster {
345 builder = builder.cluster();
346 }
347 let client = builder
348 .build()
349 .await
350 .map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
351
352 let pong: String = client
354 .cmd("PING")
355 .execute()
356 .await
357 .map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
358 if pong != "PONG" {
359 return Err(ServerError::OperationFailed(format!(
360 "unexpected PING response: {pong}"
361 )));
362 }
363 tracing::info!("Valkey connection established");
364
365 verify_valkey_version(&client).await?;
370
371 validate_or_create_partition_config(&client, &config.partition_config).await?;
373
374 initialize_waitpoint_hmac_secret(
379 &client,
380 &config.partition_config,
381 &config.waitpoint_hmac_secret,
382 )
383 .await?;
384
385 if !config.skip_library_load {
387 tracing::info!("loading flowfabric Lua library");
388 ff_script::loader::ensure_library(&client)
389 .await
390 .map_err(ServerError::LibraryLoad)?;
391 } else {
392 tracing::info!("skipping library load (skip_library_load=true)");
393 }
394
395 let engine_cfg = ff_engine::EngineConfig {
398 partition_config: config.partition_config,
399 lanes: config.lanes.clone(),
400 lease_expiry_interval: config.engine_config.lease_expiry_interval,
401 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
402 index_reconciler_interval: config.engine_config.index_reconciler_interval,
403 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
404 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
405 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
406 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
407 budget_reset_interval: config.engine_config.budget_reset_interval,
408 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
409 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
410 unblock_interval: config.engine_config.unblock_interval,
411 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
412 flow_projector_interval: config.engine_config.flow_projector_interval,
413 execution_deadline_interval: config.engine_config.execution_deadline_interval,
414 cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
415 scanner_filter: config.engine_config.scanner_filter.clone(),
416 };
417 let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
428 config.host.clone(),
429 config.port,
430 );
431 valkey_conn.tls = config.tls;
432 valkey_conn.cluster = config.cluster;
433 let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
434 client.clone(),
435 config.partition_config,
436 valkey_conn,
437 );
438 let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
439 .await
440 .map_err(|e| ServerError::OperationFailed(format!(
441 "subscribe_completions: {e}"
442 )))?;
443
444 let engine = Engine::start_with_completions(
445 engine_cfg,
446 client.clone(),
447 metrics.clone(),
448 completion_stream,
449 );
450
451 tracing::info!("opening dedicated tail connection");
457 let mut tail_builder = ClientBuilder::new()
458 .host(&config.host, config.port)
459 .connect_timeout(Duration::from_secs(10))
460 .request_timeout(Duration::from_millis(5000));
464 if config.tls {
465 tail_builder = tail_builder.tls();
466 }
467 if config.cluster {
468 tail_builder = tail_builder.cluster();
469 }
470 let tail_client = tail_builder
471 .build()
472 .await
473 .map_err(|e| ServerError::ValkeyContext {
474 source: e,
475 context: "connect (tail)".into(),
476 })?;
477 let tail_pong: String = tail_client
478 .cmd("PING")
479 .execute()
480 .await
481 .map_err(|e| ServerError::ValkeyContext {
482 source: e,
483 context: "PING (tail)".into(),
484 })?;
485 if tail_pong != "PONG" {
486 return Err(ServerError::OperationFailed(format!(
487 "tail client unexpected PING response: {tail_pong}"
488 )));
489 }
490
491 let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
492 config.max_concurrent_stream_ops as usize,
493 ));
494 let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
495 tracing::info!(
496 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
497 "stream-op client ready (read + tail share the semaphore; \
498 tails additionally serialize via xread_block_lock)"
499 );
500
501 if config.api_token.is_none() {
509 tracing::warn!(
510 listen_addr = %config.listen_addr,
511 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
512 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
513 FF_API_TOKEN for any deployment reachable from untrusted \
514 networks."
515 );
516 tracing::warn!(
522 listen_addr = %config.listen_addr,
523 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
524 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
525 and GET /v1/executions/{{id}}/result returns raw completion payload \
526 bytes (may contain PII). Both are UNAUTHENTICATED in this \
527 configuration."
528 );
529 }
530
531 tracing::info!(
536 flow_partitions = config.partition_config.num_flow_partitions,
537 budget_partitions = config.partition_config.num_budget_partitions,
538 quota_partitions = config.partition_config.num_quota_partitions,
539 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
540 listen_addr = %config.listen_addr,
541 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
542 config.partition_config.num_flow_partitions,
543 config.partition_config.num_budget_partitions,
544 config.partition_config.num_quota_partitions,
545 );
546
547 let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
548 client.clone(),
549 config.partition_config,
550 metrics.clone(),
551 ));
552
553 Ok(Self {
554 client,
555 tail_client,
556 stream_semaphore,
557 xread_block_lock,
558 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
563 engine,
564 config,
565 scheduler,
566 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
567 metrics,
568 })
569 }
570
571 pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
573 &self.metrics
574 }
575
576 pub fn client(&self) -> &Client {
578 &self.client
579 }
580
581 async fn fcall_with_reload(
588 &self,
589 function: &str,
590 keys: &[&str],
591 args: &[&str],
592 ) -> Result<Value, ServerError> {
593 fcall_with_reload_on_client(&self.client, function, keys, args).await
594 }
595
596 pub fn config(&self) -> &ServerConfig {
598 &self.config
599 }
600
601 pub fn partition_config(&self) -> &PartitionConfig {
603 &self.config.partition_config
604 }
605
606 pub async fn create_execution(
612 &self,
613 args: &CreateExecutionArgs,
614 ) -> Result<CreateExecutionResult, ServerError> {
615 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
616 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
617 let idx = IndexKeys::new(&partition);
618
619 let lane = &args.lane_id;
620 let tag = partition.hash_tag();
621 let idem_key = match &args.idempotency_key {
622 Some(k) if !k.is_empty() => {
623 keys::idempotency_key(&tag, args.namespace.as_str(), k)
624 }
625 _ => ctx.noop(),
626 };
627
628 let delay_str = args
629 .delay_until
630 .map(|d| d.0.to_string())
631 .unwrap_or_default();
632 let is_delayed = !delay_str.is_empty();
633
634 let scheduling_zset = if is_delayed {
639 idx.lane_delayed(lane)
640 } else {
641 idx.lane_eligible(lane)
642 };
643
644 let fcall_keys: Vec<String> = vec![
645 ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
654
655 let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
656
657 let fcall_args: Vec<String> = vec![
663 args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
670 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
671 .unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
675 .map(|_| "86400000".to_string())
676 .unwrap_or_default(), tags_json, args.execution_deadline_at
679 .map(|d| d.to_string())
680 .unwrap_or_default(), args.partition_id.to_string(), ];
683
684 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
685 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
686
687 let raw: Value = self
688 .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
689 .await?;
690
691 parse_create_result(&raw, &args.execution_id)
692 }
693
694 pub async fn cancel_execution(
696 &self,
697 args: &CancelExecutionArgs,
698 ) -> Result<CancelExecutionResult, ServerError> {
699 let raw = self
700 .fcall_cancel_execution_with_reload(args)
701 .await?;
702 parse_cancel_result(&raw, &args.execution_id)
703 }
704
705 async fn fcall_cancel_execution_with_reload(
709 &self,
710 args: &CancelExecutionArgs,
711 ) -> Result<Value, ServerError> {
712 let (keys, argv) = build_cancel_execution_fcall(
713 &self.client,
714 &self.config.partition_config,
715 args,
716 )
717 .await?;
718 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
719 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
720 self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
721 }
722
723 pub async fn get_execution_state(
728 &self,
729 execution_id: &ExecutionId,
730 ) -> Result<PublicState, ServerError> {
731 let partition = execution_partition(execution_id, &self.config.partition_config);
732 let ctx = ExecKeyContext::new(&partition, execution_id);
733
734 let state_str: Option<String> = self
735 .client
736 .hget(&ctx.core(), "public_state")
737 .await
738 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
739
740 match state_str {
741 Some(s) => {
742 let quoted = format!("\"{s}\"");
743 serde_json::from_str("ed).map_err(|e| {
744 ServerError::Script(format!(
745 "invalid public_state '{s}' for {execution_id}: {e}"
746 ))
747 })
748 }
749 None => Err(ServerError::NotFound(format!(
750 "execution not found: {execution_id}"
751 ))),
752 }
753 }
754
755 pub async fn get_execution_result(
782 &self,
783 execution_id: &ExecutionId,
784 ) -> Result<Option<Vec<u8>>, ServerError> {
785 let partition = execution_partition(execution_id, &self.config.partition_config);
786 let ctx = ExecKeyContext::new(&partition, execution_id);
787
788 let payload: Option<Vec<u8>> = self
798 .client
799 .cmd("GET")
800 .arg(ctx.result())
801 .execute()
802 .await
803 .map_err(|e| ServerError::ValkeyContext {
804 source: e,
805 context: "GET exec result".into(),
806 })?;
807 Ok(payload)
808 }
809
810 pub async fn list_pending_waitpoints(
856 &self,
857 execution_id: &ExecutionId,
858 ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
859 let partition = execution_partition(execution_id, &self.config.partition_config);
860 let ctx = ExecKeyContext::new(&partition, execution_id);
861
862 let core_exists: bool = self
863 .client
864 .cmd("EXISTS")
865 .arg(ctx.core())
866 .execute()
867 .await
868 .map_err(|e| ServerError::ValkeyContext {
869 source: e,
870 context: "EXISTS exec_core (pending waitpoints)".into(),
871 })?;
872 if !core_exists {
873 return Err(ServerError::NotFound(format!(
874 "execution not found: {execution_id}"
875 )));
876 }
877
878 const WAITPOINTS_SSCAN_COUNT: usize = 100;
886 let waitpoints_key = ctx.waitpoints();
887 let mut wp_ids_raw: Vec<String> = Vec::new();
888 let mut cursor: String = "0".to_owned();
889 loop {
890 let reply: (String, Vec<String>) = self
891 .client
892 .cmd("SSCAN")
893 .arg(&waitpoints_key)
894 .arg(&cursor)
895 .arg("COUNT")
896 .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
897 .execute()
898 .await
899 .map_err(|e| ServerError::ValkeyContext {
900 source: e,
901 context: "SSCAN waitpoints".into(),
902 })?;
903 cursor = reply.0;
904 wp_ids_raw.extend(reply.1);
905 if cursor == "0" {
906 break;
907 }
908 }
909
910 wp_ids_raw.sort_unstable();
918 wp_ids_raw.dedup();
919
920 if wp_ids_raw.is_empty() {
921 return Ok(Vec::new());
922 }
923
924 let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
928 for raw in &wp_ids_raw {
929 match WaitpointId::parse(raw) {
930 Ok(id) => wp_ids.push(id),
931 Err(e) => tracing::warn!(
932 raw_id = %raw,
933 error = %e,
934 execution_id = %execution_id,
935 "list_pending_waitpoints: skipping unparseable waitpoint_id"
936 ),
937 }
938 }
939 if wp_ids.is_empty() {
940 return Ok(Vec::new());
941 }
942
943 const WP_FIELDS: [&str; 6] = [
947 "state",
948 "waitpoint_key",
949 "waitpoint_token",
950 "created_at",
951 "activated_at",
952 "expires_at",
953 ];
954
955 let mut pass1 = self.client.pipeline();
960 let mut wp_slots = Vec::with_capacity(wp_ids.len());
961 let mut cond_slots = Vec::with_capacity(wp_ids.len());
962 for wp_id in &wp_ids {
963 let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
964 cmd = cmd.arg(ctx.waitpoint(wp_id));
965 for f in WP_FIELDS {
966 cmd = cmd.arg(f);
967 }
968 wp_slots.push(cmd.finish());
969
970 cond_slots.push(
971 pass1
972 .cmd::<Option<String>>("HGET")
973 .arg(ctx.waitpoint_condition(wp_id))
974 .arg("total_matchers")
975 .finish(),
976 );
977 }
978 pass1
979 .execute()
980 .await
981 .map_err(|e| ServerError::ValkeyContext {
982 source: e,
983 context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
984 })?;
985
986 struct Kept {
992 wp_id: WaitpointId,
993 wp_fields: Vec<Option<String>>,
994 total_matchers: usize,
995 }
996 let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
997 for ((wp_id, wp_slot), cond_slot) in wp_ids
998 .iter()
999 .zip(wp_slots)
1000 .zip(cond_slots)
1001 {
1002 let wp_fields: Vec<Option<String>> =
1003 wp_slot.value().map_err(|e| ServerError::ValkeyContext {
1004 source: e,
1005 context: format!("pipeline slot HMGET waitpoint {wp_id}"),
1006 })?;
1007
1008 if wp_fields.iter().all(Option::is_none) {
1011 let _ = cond_slot.value();
1013 continue;
1014 }
1015 let state_ref = wp_fields
1016 .first()
1017 .and_then(|v| v.as_deref())
1018 .unwrap_or("");
1019 if state_ref != "pending" && state_ref != "active" {
1020 let _ = cond_slot.value();
1021 continue;
1022 }
1023 let token_ref = wp_fields
1024 .get(2)
1025 .and_then(|v| v.as_deref())
1026 .unwrap_or("");
1027 if token_ref.is_empty() {
1028 let _ = cond_slot.value();
1029 tracing::warn!(
1030 waitpoint_id = %wp_id,
1031 execution_id = %execution_id,
1032 waitpoint_hash_key = %ctx.waitpoint(wp_id),
1033 state = %state_ref,
1034 "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
1035 field is empty — likely storage corruption (half-populated write, \
1036 operator edit, or interrupted script). Skipping this entry in the \
1037 response. HGETALL the waitpoint_hash_key to inspect."
1038 );
1039 continue;
1040 }
1041
1042 let total_matchers = cond_slot
1043 .value()
1044 .map_err(|e| ServerError::ValkeyContext {
1045 source: e,
1046 context: format!("pipeline slot HGET total_matchers {wp_id}"),
1047 })?
1048 .and_then(|s| s.parse::<usize>().ok())
1049 .unwrap_or(0);
1050
1051 kept.push(Kept {
1052 wp_id: wp_id.clone(),
1053 wp_fields,
1054 total_matchers,
1055 });
1056 }
1057
1058 if kept.is_empty() {
1059 return Ok(Vec::new());
1060 }
1061
1062 let mut pass2 = self.client.pipeline();
1067 let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
1068 let mut pass2_needed = false;
1069 for k in &kept {
1070 if k.total_matchers == 0 {
1071 matcher_slots.push(None);
1072 continue;
1073 }
1074 pass2_needed = true;
1075 let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
1076 cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
1077 for i in 0..k.total_matchers {
1078 cmd = cmd.arg(format!("matcher:{i}:name"));
1079 }
1080 matcher_slots.push(Some(cmd.finish()));
1081 }
1082 if pass2_needed {
1083 pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
1084 source: e,
1085 context: "pipeline HMGET wp_condition matchers".into(),
1086 })?;
1087 }
1088
1089 let parse_ts = |raw: &str| -> Option<TimestampMs> {
1090 if raw.is_empty() {
1091 None
1092 } else {
1093 raw.parse::<i64>().ok().map(TimestampMs)
1094 }
1095 };
1096
1097 let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
1098 for (k, slot) in kept.into_iter().zip(matcher_slots) {
1099 let get = |i: usize| -> &str {
1100 k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
1101 };
1102
1103 let required_signal_names: Vec<String> = match slot {
1106 None => Vec::new(),
1107 Some(s) => {
1108 let vals: Vec<Option<String>> =
1109 s.value().map_err(|e| ServerError::ValkeyContext {
1110 source: e,
1111 context: format!(
1112 "pipeline slot HMGET wp_condition matchers {}",
1113 k.wp_id
1114 ),
1115 })?;
1116 vals.into_iter()
1117 .flatten()
1118 .filter(|name| !name.is_empty())
1119 .collect()
1120 }
1121 };
1122
1123 out.push(PendingWaitpointInfo {
1124 waitpoint_id: k.wp_id,
1125 waitpoint_key: get(1).to_owned(),
1126 state: get(0).to_owned(),
1127 waitpoint_token: WaitpointToken(get(2).to_owned()),
1128 required_signal_names,
1129 created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
1130 activated_at: parse_ts(get(4)),
1131 expires_at: parse_ts(get(5)),
1132 });
1133 }
1134
1135 Ok(out)
1136 }
1137
1138 pub async fn create_budget(
1142 &self,
1143 args: &CreateBudgetArgs,
1144 ) -> Result<CreateBudgetResult, ServerError> {
1145 validate_create_budget_dimensions(
1147 &args.dimensions,
1148 &args.hard_limits,
1149 &args.soft_limits,
1150 )?;
1151 let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1152 let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1153 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1154 let policies_index = keys::budget_policies_index(bctx.hash_tag());
1155
1156 let fcall_keys: Vec<String> = vec![
1159 bctx.definition(),
1160 bctx.limits(),
1161 bctx.usage(),
1162 resets_key,
1163 policies_index,
1164 ];
1165
1166 let dim_count = args.dimensions.len();
1170 let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1171 fcall_args.push(args.budget_id.to_string());
1172 fcall_args.push(args.scope_type.clone());
1173 fcall_args.push(args.scope_id.clone());
1174 fcall_args.push(args.enforcement_mode.clone());
1175 fcall_args.push(args.on_hard_limit.clone());
1176 fcall_args.push(args.on_soft_limit.clone());
1177 fcall_args.push(args.reset_interval_ms.to_string());
1178 fcall_args.push(args.now.to_string());
1179 fcall_args.push(dim_count.to_string());
1180 for dim in &args.dimensions {
1181 fcall_args.push(dim.clone());
1182 }
1183 for hard in &args.hard_limits {
1184 fcall_args.push(hard.to_string());
1185 }
1186 for soft in &args.soft_limits {
1187 fcall_args.push(soft.to_string());
1188 }
1189
1190 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1191 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1192
1193 let raw: Value = self
1194 .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1195 .await?;
1196
1197 parse_budget_create_result(&raw, &args.budget_id)
1198 }
1199
1200 pub async fn create_quota_policy(
1202 &self,
1203 args: &CreateQuotaPolicyArgs,
1204 ) -> Result<CreateQuotaPolicyResult, ServerError> {
1205 let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1206 let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1207
1208 let fcall_keys: Vec<String> = vec![
1211 qctx.definition(),
1212 qctx.window("requests_per_window"),
1213 qctx.concurrency(),
1214 qctx.admitted_set(),
1215 keys::quota_policies_index(qctx.hash_tag()),
1216 ];
1217
1218 let fcall_args: Vec<String> = vec![
1221 args.quota_policy_id.to_string(),
1222 args.window_seconds.to_string(),
1223 args.max_requests_per_window.to_string(),
1224 args.max_concurrent.to_string(),
1225 args.now.to_string(),
1226 ];
1227
1228 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1229 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1230
1231 let raw: Value = self
1232 .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1233 .await?;
1234
1235 parse_quota_create_result(&raw, &args.quota_policy_id)
1236 }
1237
1238 pub async fn get_budget_status(
1240 &self,
1241 budget_id: &BudgetId,
1242 ) -> Result<BudgetStatus, ServerError> {
1243 let partition = budget_partition(budget_id, &self.config.partition_config);
1244 let bctx = BudgetKeyContext::new(&partition, budget_id);
1245
1246 let def: HashMap<String, String> = self
1248 .client
1249 .hgetall(&bctx.definition())
1250 .await
1251 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
1252
1253 if def.is_empty() {
1254 return Err(ServerError::NotFound(format!(
1255 "budget not found: {budget_id}"
1256 )));
1257 }
1258
1259 let usage_raw: HashMap<String, String> = self
1261 .client
1262 .hgetall(&bctx.usage())
1263 .await
1264 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
1265 let usage: HashMap<String, u64> = usage_raw
1266 .into_iter()
1267 .filter(|(k, _)| k != "_init")
1268 .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1269 .collect();
1270
1271 let limits_raw: HashMap<String, String> = self
1273 .client
1274 .hgetall(&bctx.limits())
1275 .await
1276 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
1277 let mut hard_limits = HashMap::new();
1278 let mut soft_limits = HashMap::new();
1279 for (k, v) in &limits_raw {
1280 if let Some(dim) = k.strip_prefix("hard:") {
1281 hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1282 } else if let Some(dim) = k.strip_prefix("soft:") {
1283 soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1284 }
1285 }
1286
1287 let non_empty = |s: Option<&String>| -> Option<String> {
1288 s.filter(|v| !v.is_empty()).cloned()
1289 };
1290
1291 Ok(BudgetStatus {
1292 budget_id: budget_id.to_string(),
1293 scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1294 scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1295 enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1296 usage,
1297 hard_limits,
1298 soft_limits,
1299 breach_count: def
1300 .get("breach_count")
1301 .and_then(|v| v.parse().ok())
1302 .unwrap_or(0),
1303 soft_breach_count: def
1304 .get("soft_breach_count")
1305 .and_then(|v| v.parse().ok())
1306 .unwrap_or(0),
1307 last_breach_at: non_empty(def.get("last_breach_at")),
1308 last_breach_dim: non_empty(def.get("last_breach_dim")),
1309 next_reset_at: non_empty(def.get("next_reset_at")),
1310 created_at: non_empty(def.get("created_at")),
1311 })
1312 }
1313
1314 pub async fn report_usage(
1316 &self,
1317 budget_id: &BudgetId,
1318 args: &ReportUsageArgs,
1319 ) -> Result<ReportUsageResult, ServerError> {
1320 validate_report_usage_dimensions(&args.dimensions, &args.deltas)?;
1322 let partition = budget_partition(budget_id, &self.config.partition_config);
1323 let bctx = BudgetKeyContext::new(&partition, budget_id);
1324
1325 let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1327
1328 let dim_count = args.dimensions.len();
1330 let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1331 fcall_args.push(dim_count.to_string());
1332 for dim in &args.dimensions {
1333 fcall_args.push(dim.clone());
1334 }
1335 for delta in &args.deltas {
1336 fcall_args.push(delta.to_string());
1337 }
1338 fcall_args.push(args.now.to_string());
1339 let dedup_key_val = args
1340 .dedup_key
1341 .as_ref()
1342 .filter(|k| !k.is_empty())
1343 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1344 .unwrap_or_default();
1345 fcall_args.push(dedup_key_val);
1346
1347 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1348 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1349
1350 let raw: Value = self
1351 .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1352 .await?;
1353
1354 parse_report_usage_result(&raw)
1355 }
1356
1357 pub async fn reset_budget(
1359 &self,
1360 budget_id: &BudgetId,
1361 ) -> Result<ResetBudgetResult, ServerError> {
1362 let partition = budget_partition(budget_id, &self.config.partition_config);
1363 let bctx = BudgetKeyContext::new(&partition, budget_id);
1364 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1365
1366 let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1368
1369 let now = TimestampMs::now();
1371 let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1372
1373 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1374 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1375
1376 let raw: Value = self
1377 .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1378 .await?;
1379
1380 parse_reset_budget_result(&raw)
1381 }
1382
1383 pub async fn create_flow(
1387 &self,
1388 args: &CreateFlowArgs,
1389 ) -> Result<CreateFlowResult, ServerError> {
1390 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1391 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1392 let fidx = FlowIndexKeys::new(&partition);
1393
1394 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1396
1397 let fcall_args: Vec<String> = vec![
1399 args.flow_id.to_string(),
1400 args.flow_kind.clone(),
1401 args.namespace.to_string(),
1402 args.now.to_string(),
1403 ];
1404
1405 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1406 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1407
1408 let raw: Value = self
1409 .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1410 .await?;
1411
1412 parse_create_flow_result(&raw, &args.flow_id)
1413 }
1414
1415 pub async fn add_execution_to_flow(
1453 &self,
1454 args: &AddExecutionToFlowArgs,
1455 ) -> Result<AddExecutionToFlowResult, ServerError> {
1456 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1457 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1458 let fidx = FlowIndexKeys::new(&partition);
1459
1460 let exec_partition =
1464 execution_partition(&args.execution_id, &self.config.partition_config);
1465 let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1466
1467 if exec_partition.index != partition.index {
1476 return Err(ServerError::PartitionMismatch(format!(
1477 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1478 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1479 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1480 exec_p = exec_partition.index,
1481 flow_p = partition.index,
1482 )));
1483 }
1484
1485 let fcall_keys: Vec<String> = vec![
1487 fctx.core(),
1488 fctx.members(),
1489 fidx.flow_index(),
1490 ectx.core(),
1491 ];
1492
1493 let fcall_args: Vec<String> = vec![
1495 args.flow_id.to_string(),
1496 args.execution_id.to_string(),
1497 args.now.to_string(),
1498 ];
1499
1500 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1501 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1502
1503 let raw: Value = self
1504 .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1505 .await?;
1506
1507 parse_add_execution_to_flow_result(&raw)
1508 }
1509
1510 pub async fn cancel_flow(
1552 &self,
1553 args: &CancelFlowArgs,
1554 ) -> Result<CancelFlowResult, ServerError> {
1555 self.cancel_flow_inner(args, false).await
1556 }
1557
1558 pub async fn cancel_flow_wait(
1562 &self,
1563 args: &CancelFlowArgs,
1564 ) -> Result<CancelFlowResult, ServerError> {
1565 self.cancel_flow_inner(args, true).await
1566 }
1567
1568 async fn cancel_flow_inner(
1569 &self,
1570 args: &CancelFlowArgs,
1571 wait: bool,
1572 ) -> Result<CancelFlowResult, ServerError> {
1573 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1574 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1575 let fidx = FlowIndexKeys::new(&partition);
1576
1577 const CANCEL_RECONCILER_GRACE_MS: u64 = 30_000;
1582
1583 let fcall_keys: Vec<String> = vec![
1585 fctx.core(),
1586 fctx.members(),
1587 fidx.flow_index(),
1588 fctx.pending_cancels(),
1589 fidx.cancel_backlog(),
1590 ];
1591
1592 let fcall_args: Vec<String> = vec![
1594 args.flow_id.to_string(),
1595 args.reason.clone(),
1596 args.cancellation_policy.clone(),
1597 args.now.to_string(),
1598 CANCEL_RECONCILER_GRACE_MS.to_string(),
1599 ];
1600
1601 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1602 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1603
1604 let raw: Value = self
1605 .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1606 .await?;
1607
1608 let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1609 ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1610 (policy, member_execution_ids)
1611 }
1612 ParsedCancelFlow::AlreadyTerminal => {
1618 let flow_meta: Vec<Option<String>> = self
1619 .client
1620 .cmd("HMGET")
1621 .arg(fctx.core())
1622 .arg("cancellation_policy")
1623 .arg("cancel_reason")
1624 .execute()
1625 .await
1626 .map_err(|e| ServerError::ValkeyContext {
1627 source: e,
1628 context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
1629 })?;
1630 let stored_policy = flow_meta
1631 .first()
1632 .and_then(|v| v.as_ref())
1633 .filter(|s| !s.is_empty())
1634 .cloned();
1635 let stored_reason = flow_meta
1636 .get(1)
1637 .and_then(|v| v.as_ref())
1638 .filter(|s| !s.is_empty())
1639 .cloned();
1640 let all_members: Vec<String> = self
1641 .client
1642 .cmd("SMEMBERS")
1643 .arg(fctx.members())
1644 .execute()
1645 .await
1646 .map_err(|e| ServerError::ValkeyContext {
1647 source: e,
1648 context: "SMEMBERS flow members (already terminal)".into(),
1649 })?;
1650 let total_members = all_members.len();
1657 let stored_members: Vec<String> = all_members
1658 .into_iter()
1659 .take(ALREADY_TERMINAL_MEMBER_CAP)
1660 .collect();
1661 tracing::debug!(
1662 flow_id = %args.flow_id,
1663 stored_policy = stored_policy.as_deref().unwrap_or(""),
1664 stored_reason = stored_reason.as_deref().unwrap_or(""),
1665 total_members,
1666 returned_members = stored_members.len(),
1667 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1668 );
1669 return Ok(CancelFlowResult::Cancelled {
1670 cancellation_policy: stored_policy
1674 .unwrap_or_else(|| args.cancellation_policy.clone()),
1675 member_execution_ids: stored_members,
1676 });
1677 }
1678 };
1679 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1680
1681 if !needs_dispatch {
1682 return Ok(CancelFlowResult::Cancelled {
1683 cancellation_policy: policy,
1684 member_execution_ids: members,
1685 });
1686 }
1687
1688 let pending_cancels_key = fctx.pending_cancels();
1689 let cancel_backlog_key = fidx.cancel_backlog();
1690
1691 if wait {
1692 let mut failed: Vec<String> = Vec::new();
1701 for eid_str in &members {
1702 match cancel_member_execution(
1703 &self.client,
1704 &self.config.partition_config,
1705 eid_str,
1706 &args.reason,
1707 args.now,
1708 )
1709 .await
1710 {
1711 Ok(()) => {
1712 ack_cancel_member(
1713 &self.client,
1714 &pending_cancels_key,
1715 &cancel_backlog_key,
1716 eid_str,
1717 &args.flow_id.to_string(),
1718 )
1719 .await;
1720 }
1721 Err(e) => {
1722 if is_terminal_ack_error(&e) {
1729 ack_cancel_member(
1730 &self.client,
1731 &pending_cancels_key,
1732 &cancel_backlog_key,
1733 eid_str,
1734 &args.flow_id.to_string(),
1735 )
1736 .await;
1737 continue;
1738 }
1739 tracing::warn!(
1740 execution_id = %eid_str,
1741 error = %e,
1742 "cancel_flow(wait): individual execution cancel failed \
1743 (transport/contract fault; reconciler will retry if transient)"
1744 );
1745 failed.push(eid_str.clone());
1746 }
1747 }
1748 }
1749 if failed.is_empty() {
1750 return Ok(CancelFlowResult::Cancelled {
1751 cancellation_policy: policy,
1752 member_execution_ids: members,
1753 });
1754 }
1755 return Ok(CancelFlowResult::PartiallyCancelled {
1756 cancellation_policy: policy,
1757 member_execution_ids: members,
1758 failed_member_execution_ids: failed,
1759 });
1760 }
1761
1762 let client = self.client.clone();
1765 let partition_config = self.config.partition_config;
1766 let reason = args.reason.clone();
1767 let now = args.now;
1768 let dispatch_members = members.clone();
1769 let flow_id = args.flow_id.clone();
1770 let mut guard = self.background_tasks.lock().await;
1776
1777 while let Some(joined) = guard.try_join_next() {
1784 if let Err(e) = joined {
1785 tracing::warn!(
1786 error = %e,
1787 "cancel_flow: background dispatch task panicked or was aborted"
1788 );
1789 }
1790 }
1791
1792 let pending_key_owned = pending_cancels_key.clone();
1793 let backlog_key_owned = cancel_backlog_key.clone();
1794 let flow_id_str = args.flow_id.to_string();
1795
1796 guard.spawn(async move {
1797 use futures::stream::StreamExt;
1804 const CONCURRENCY: usize = 16;
1805
1806 let member_count = dispatch_members.len();
1807 let flow_id_for_log = flow_id.clone();
1808 futures::stream::iter(dispatch_members)
1809 .map(|eid_str| {
1810 let client = client.clone();
1811 let reason = reason.clone();
1812 let flow_id = flow_id.clone();
1813 let pending = pending_key_owned.clone();
1814 let backlog = backlog_key_owned.clone();
1815 let flow_id_str = flow_id_str.clone();
1816 async move {
1817 match cancel_member_execution(
1818 &client,
1819 &partition_config,
1820 &eid_str,
1821 &reason,
1822 now,
1823 )
1824 .await
1825 {
1826 Ok(()) => {
1827 ack_cancel_member(
1828 &client,
1829 &pending,
1830 &backlog,
1831 &eid_str,
1832 &flow_id_str,
1833 )
1834 .await;
1835 }
1836 Err(e) => {
1837 if is_terminal_ack_error(&e) {
1838 ack_cancel_member(
1839 &client,
1840 &pending,
1841 &backlog,
1842 &eid_str,
1843 &flow_id_str,
1844 )
1845 .await;
1846 } else {
1847 tracing::warn!(
1848 flow_id = %flow_id,
1849 execution_id = %eid_str,
1850 error = %e,
1851 "cancel_flow(async): individual execution cancel failed \
1852 (transport/contract fault; reconciler will retry if transient)"
1853 );
1854 }
1855 }
1856 }
1857 }
1858 })
1859 .buffer_unordered(CONCURRENCY)
1860 .for_each(|()| async {})
1861 .await;
1862
1863 tracing::debug!(
1864 flow_id = %flow_id_for_log,
1865 member_count,
1866 concurrency = CONCURRENCY,
1867 "cancel_flow: background member dispatch complete"
1868 );
1869 });
1870 drop(guard);
1871
1872 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1873 Ok(CancelFlowResult::CancellationScheduled {
1874 cancellation_policy: policy,
1875 member_count,
1876 member_execution_ids: members,
1877 })
1878 }
1879
1880 pub async fn stage_dependency_edge(
1885 &self,
1886 args: &StageDependencyEdgeArgs,
1887 ) -> Result<StageDependencyEdgeResult, ServerError> {
1888 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1889 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1890
1891 let fcall_keys: Vec<String> = vec![
1893 fctx.core(),
1894 fctx.members(),
1895 fctx.edge(&args.edge_id),
1896 fctx.outgoing(&args.upstream_execution_id),
1897 fctx.incoming(&args.downstream_execution_id),
1898 fctx.grant(&args.edge_id.to_string()),
1899 ];
1900
1901 let fcall_args: Vec<String> = vec![
1904 args.flow_id.to_string(),
1905 args.edge_id.to_string(),
1906 args.upstream_execution_id.to_string(),
1907 args.downstream_execution_id.to_string(),
1908 args.dependency_kind.clone(),
1909 args.data_passing_ref.clone().unwrap_or_default(),
1910 args.expected_graph_revision.to_string(),
1911 args.now.to_string(),
1912 ];
1913
1914 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1915 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1916
1917 let raw: Value = self
1918 .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1919 .await?;
1920
1921 parse_stage_dependency_edge_result(&raw)
1922 }
1923
1924 pub async fn apply_dependency_to_child(
1929 &self,
1930 args: &ApplyDependencyToChildArgs,
1931 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1932 let partition = execution_partition(
1933 &args.downstream_execution_id,
1934 &self.config.partition_config,
1935 );
1936 let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1937 let idx = IndexKeys::new(&partition);
1938
1939 let lane_str: Option<String> = self
1941 .client
1942 .hget(&ctx.core(), "lane_id")
1943 .await
1944 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1945 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1946
1947 let fcall_keys: Vec<String> = vec![
1950 ctx.core(),
1951 ctx.deps_meta(),
1952 ctx.deps_unresolved(),
1953 ctx.dep_edge(&args.edge_id),
1954 idx.lane_eligible(&lane),
1955 idx.lane_blocked_dependencies(&lane),
1956 ctx.deps_all_edges(),
1957 ];
1958
1959 let fcall_args: Vec<String> = vec![
1962 args.flow_id.to_string(),
1963 args.edge_id.to_string(),
1964 args.upstream_execution_id.to_string(),
1965 args.graph_revision.to_string(),
1966 args.dependency_kind.clone(),
1967 args.data_passing_ref.clone().unwrap_or_default(),
1968 args.now.to_string(),
1969 ];
1970
1971 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1972 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1973
1974 let raw: Value = self
1975 .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1976 .await?;
1977
1978 parse_apply_dependency_result(&raw)
1979 }
1980
1981 pub async fn deliver_signal(
1988 &self,
1989 args: &DeliverSignalArgs,
1990 ) -> Result<DeliverSignalResult, ServerError> {
1991 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1992 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1993 let idx = IndexKeys::new(&partition);
1994
1995 let lane_str: Option<String> = self
1997 .client
1998 .hget(&ctx.core(), "lane_id")
1999 .await
2000 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
2001 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2002
2003 let wp_id = &args.waitpoint_id;
2004 let sig_id = &args.signal_id;
2005 let idem_key = args
2006 .idempotency_key
2007 .as_ref()
2008 .filter(|k| !k.is_empty())
2009 .map(|k| ctx.signal_dedup(wp_id, k))
2010 .unwrap_or_else(|| ctx.noop());
2011
2012 let fcall_keys: Vec<String> = vec![
2018 ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
2033
2034 let fcall_args: Vec<String> = vec![
2041 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
2049 .map(|p| String::from_utf8_lossy(p).into_owned())
2050 .unwrap_or_default(), args.payload_encoding
2052 .clone()
2053 .unwrap_or_else(|| "json".to_owned()), args.idempotency_key
2055 .clone()
2056 .unwrap_or_default(), args.correlation_id
2058 .clone()
2059 .unwrap_or_default(), args.target_scope.clone(), args.created_at
2062 .map(|ts| ts.to_string())
2063 .unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
2068 .unwrap_or(10_000)
2069 .to_string(), args.waitpoint_token.as_str().to_owned(), ];
2074
2075 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2076 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2077
2078 let raw: Value = self
2079 .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
2080 .await?;
2081
2082 parse_deliver_signal_result(&raw, &args.signal_id)
2083 }
2084
2085 pub async fn change_priority(
2089 &self,
2090 execution_id: &ExecutionId,
2091 new_priority: i32,
2092 ) -> Result<ChangePriorityResult, ServerError> {
2093 let partition = execution_partition(execution_id, &self.config.partition_config);
2094 let ctx = ExecKeyContext::new(&partition, execution_id);
2095 let idx = IndexKeys::new(&partition);
2096
2097 let lane_str: Option<String> = self
2099 .client
2100 .hget(&ctx.core(), "lane_id")
2101 .await
2102 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
2103 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
2104
2105 let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
2107
2108 let fcall_args: Vec<String> = vec![
2110 execution_id.to_string(),
2111 new_priority.to_string(),
2112 ];
2113
2114 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2115 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2116
2117 let raw: Value = self
2118 .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
2119 .await?;
2120
2121 parse_change_priority_result(&raw, execution_id)
2122 }
2123
2124 pub async fn claim_for_worker(
2138 &self,
2139 lane: &LaneId,
2140 worker_id: &WorkerId,
2141 worker_instance_id: &WorkerInstanceId,
2142 worker_capabilities: &std::collections::BTreeSet<String>,
2143 grant_ttl_ms: u64,
2144 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
2145 self.scheduler
2146 .claim_for_worker(
2147 lane,
2148 worker_id,
2149 worker_instance_id,
2150 worker_capabilities,
2151 grant_ttl_ms,
2152 )
2153 .await
2154 .map_err(|e| match e {
2155 ff_scheduler::SchedulerError::Valkey(inner) => {
2156 ServerError::Valkey(inner)
2157 }
2158 ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
2159 ServerError::ValkeyContext { source, context }
2160 }
2161 ff_scheduler::SchedulerError::Config(msg) => {
2162 ServerError::InvalidInput(msg)
2163 }
2164 })
2165 }
2166
2167 pub async fn revoke_lease(
2169 &self,
2170 execution_id: &ExecutionId,
2171 ) -> Result<RevokeLeaseResult, ServerError> {
2172 let partition = execution_partition(execution_id, &self.config.partition_config);
2173 let ctx = ExecKeyContext::new(&partition, execution_id);
2174 let idx = IndexKeys::new(&partition);
2175
2176 let wiid_str: Option<String> = self
2178 .client
2179 .hget(&ctx.core(), "current_worker_instance_id")
2180 .await
2181 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
2182 let wiid = match wiid_str {
2183 Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
2184 _ => {
2185 return Err(ServerError::NotFound(format!(
2186 "no active lease for execution {execution_id} (no current_worker_instance_id)"
2187 )));
2188 }
2189 };
2190
2191 let fcall_keys: Vec<String> = vec![
2193 ctx.core(),
2194 ctx.lease_current(),
2195 ctx.lease_history(),
2196 idx.lease_expiry(),
2197 idx.worker_leases(&wiid),
2198 ];
2199
2200 let fcall_args: Vec<String> = vec![
2202 execution_id.to_string(),
2203 String::new(), "operator_revoke".to_owned(),
2205 ];
2206
2207 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2208 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2209
2210 let raw: Value = self
2211 .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
2212 .await?;
2213
2214 parse_revoke_lease_result(&raw)
2215 }
2216
2217 pub async fn get_execution(
2219 &self,
2220 execution_id: &ExecutionId,
2221 ) -> Result<ExecutionInfo, ServerError> {
2222 let partition = execution_partition(execution_id, &self.config.partition_config);
2223 let ctx = ExecKeyContext::new(&partition, execution_id);
2224
2225 let fields: HashMap<String, String> = self
2226 .client
2227 .hgetall(&ctx.core())
2228 .await
2229 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
2230
2231 if fields.is_empty() {
2232 return Err(ServerError::NotFound(format!(
2233 "execution not found: {execution_id}"
2234 )));
2235 }
2236
2237 let parse_enum = |field: &str| -> String {
2238 fields.get(field).cloned().unwrap_or_default()
2239 };
2240 fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2241 let quoted = format!("\"{raw}\"");
2242 serde_json::from_str("ed).map_err(|e| {
2243 ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2244 })
2245 }
2246
2247 let lp_str = parse_enum("lifecycle_phase");
2248 let os_str = parse_enum("ownership_state");
2249 let es_str = parse_enum("eligibility_state");
2250 let br_str = parse_enum("blocking_reason");
2251 let to_str = parse_enum("terminal_outcome");
2252 let as_str = parse_enum("attempt_state");
2253 let ps_str = parse_enum("public_state");
2254
2255 let state_vector = StateVector {
2256 lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2257 ownership_state: deserialize("ownership_state", &os_str)?,
2258 eligibility_state: deserialize("eligibility_state", &es_str)?,
2259 blocking_reason: deserialize("blocking_reason", &br_str)?,
2260 terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2261 attempt_state: deserialize("attempt_state", &as_str)?,
2262 public_state: deserialize("public_state", &ps_str)?,
2263 };
2264
2265 let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2272
2273 let started_at_opt = fields
2280 .get("started_at")
2281 .filter(|s| !s.is_empty())
2282 .cloned();
2283 let completed_at_opt = fields
2284 .get("completed_at")
2285 .filter(|s| !s.is_empty())
2286 .cloned();
2287
2288 Ok(ExecutionInfo {
2289 execution_id: execution_id.clone(),
2290 namespace: parse_enum("namespace"),
2291 lane_id: parse_enum("lane_id"),
2292 priority: fields
2293 .get("priority")
2294 .and_then(|v| v.parse().ok())
2295 .unwrap_or(0),
2296 execution_kind: parse_enum("execution_kind"),
2297 state_vector,
2298 public_state: deserialize("public_state", &ps_str)?,
2299 created_at: parse_enum("created_at"),
2300 started_at: started_at_opt,
2301 completed_at: completed_at_opt,
2302 current_attempt_index: fields
2303 .get("current_attempt_index")
2304 .and_then(|v| v.parse().ok())
2305 .unwrap_or(0),
2306 flow_id: flow_id_val,
2307 blocking_detail: parse_enum("blocking_detail"),
2308 })
2309 }
2310
2311 pub async fn list_executions(
2315 &self,
2316 partition_id: u16,
2317 lane: &LaneId,
2318 state_filter: &str,
2319 offset: u64,
2320 limit: u64,
2321 ) -> Result<ListExecutionsResult, ServerError> {
2322 let partition = ff_core::partition::Partition {
2323 family: ff_core::partition::PartitionFamily::Execution,
2324 index: partition_id,
2325 };
2326 let idx = IndexKeys::new(&partition);
2327
2328 let zset_key = match state_filter {
2329 "eligible" => idx.lane_eligible(lane),
2330 "delayed" => idx.lane_delayed(lane),
2331 "terminal" => idx.lane_terminal(lane),
2332 "suspended" => idx.lane_suspended(lane),
2333 "active" => idx.lane_active(lane),
2334 other => {
2335 return Err(ServerError::InvalidInput(format!(
2336 "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2337 )));
2338 }
2339 };
2340
2341 let eids: Vec<String> = self
2343 .client
2344 .cmd("ZRANGE")
2345 .arg(&zset_key)
2346 .arg("-inf")
2347 .arg("+inf")
2348 .arg("BYSCORE")
2349 .arg("LIMIT")
2350 .arg(offset)
2351 .arg(limit)
2352 .execute()
2353 .await
2354 .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
2355
2356 if eids.is_empty() {
2357 return Ok(ListExecutionsResult {
2358 executions: vec![],
2359 total_returned: 0,
2360 });
2361 }
2362
2363 let mut parsed = Vec::with_capacity(eids.len());
2365 for eid_str in &eids {
2366 match ExecutionId::parse(eid_str) {
2367 Ok(id) => parsed.push(id),
2368 Err(e) => {
2369 tracing::warn!(
2370 raw_id = %eid_str,
2371 error = %e,
2372 zset = %zset_key,
2373 "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2374 );
2375 }
2376 }
2377 }
2378
2379 if parsed.is_empty() {
2380 return Ok(ListExecutionsResult {
2381 executions: vec![],
2382 total_returned: 0,
2383 });
2384 }
2385
2386 let mut pipe = self.client.pipeline();
2388 let mut slots = Vec::with_capacity(parsed.len());
2389 for eid in &parsed {
2390 let ep = execution_partition(eid, &self.config.partition_config);
2391 let ctx = ExecKeyContext::new(&ep, eid);
2392 let slot = pipe
2393 .cmd::<Vec<Option<String>>>("HMGET")
2394 .arg(ctx.core())
2395 .arg("namespace")
2396 .arg("lane_id")
2397 .arg("execution_kind")
2398 .arg("public_state")
2399 .arg("priority")
2400 .arg("created_at")
2401 .finish();
2402 slots.push(slot);
2403 }
2404
2405 pipe.execute()
2406 .await
2407 .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
2408
2409 let mut summaries = Vec::with_capacity(parsed.len());
2410 for (eid, slot) in parsed.into_iter().zip(slots) {
2411 let fields: Vec<Option<String>> = slot.value()
2412 .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
2413
2414 let field = |i: usize| -> String {
2415 fields
2416 .get(i)
2417 .and_then(|v| v.as_ref())
2418 .cloned()
2419 .unwrap_or_default()
2420 };
2421
2422 summaries.push(ExecutionSummary {
2423 execution_id: eid,
2424 namespace: field(0),
2425 lane_id: field(1),
2426 execution_kind: field(2),
2427 public_state: field(3),
2428 priority: field(4).parse().unwrap_or(0),
2429 created_at: field(5),
2430 });
2431 }
2432
2433 let total = summaries.len();
2434 Ok(ListExecutionsResult {
2435 executions: summaries,
2436 total_returned: total,
2437 })
2438 }
2439
2440 pub async fn replay_execution(
2445 &self,
2446 execution_id: &ExecutionId,
2447 ) -> Result<ReplayExecutionResult, ServerError> {
2448 let partition = execution_partition(execution_id, &self.config.partition_config);
2449 let ctx = ExecKeyContext::new(&partition, execution_id);
2450 let idx = IndexKeys::new(&partition);
2451
2452 let dyn_fields: Vec<Option<String>> = self
2464 .client
2465 .cmd("HMGET")
2466 .arg(ctx.core())
2467 .arg("lane_id")
2468 .arg("flow_id")
2469 .arg("terminal_outcome")
2470 .execute()
2471 .await
2472 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
2473 let lane = LaneId::new(
2474 dyn_fields
2475 .first()
2476 .and_then(|v| v.as_ref())
2477 .cloned()
2478 .unwrap_or_else(|| "default".to_owned()),
2479 );
2480 let flow_id_str = dyn_fields
2481 .get(1)
2482 .and_then(|v| v.as_ref())
2483 .cloned()
2484 .unwrap_or_default();
2485 let terminal_outcome = dyn_fields
2486 .get(2)
2487 .and_then(|v| v.as_ref())
2488 .cloned()
2489 .unwrap_or_default();
2490
2491 let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2492
2493 let mut fcall_keys: Vec<String> = vec![
2495 ctx.core(),
2496 idx.lane_terminal(&lane),
2497 idx.lane_eligible(&lane),
2498 ctx.lease_history(),
2499 ];
2500
2501 let now = TimestampMs::now();
2503 let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2504
2505 if is_skipped_flow_member {
2506 let flow_id = FlowId::parse(&flow_id_str)
2510 .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2511 let flow_part =
2512 flow_partition(&flow_id, &self.config.partition_config);
2513 let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2514 let edge_ids: Vec<String> = self
2515 .client
2516 .cmd("SMEMBERS")
2517 .arg(flow_ctx.incoming(execution_id))
2518 .execute()
2519 .await
2520 .map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
2521
2522 fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
2527 let edge_id = EdgeId::parse(eid_str)
2528 .unwrap_or_else(|_| EdgeId::new());
2529 fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
2532 }
2533
2534 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2535 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2536
2537 let raw: Value = self
2538 .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2539 .await?;
2540
2541 parse_replay_result(&raw)
2542 }
2543
2544 pub async fn read_attempt_stream(
2556 &self,
2557 execution_id: &ExecutionId,
2558 attempt_index: AttemptIndex,
2559 from_id: &str,
2560 to_id: &str,
2561 count_limit: u64,
2562 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2563 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2564
2565 if count_limit == 0 {
2566 return Err(ServerError::InvalidInput(
2567 "count_limit must be >= 1".to_owned(),
2568 ));
2569 }
2570
2571 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2576 Ok(p) => p,
2577 Err(tokio::sync::TryAcquireError::NoPermits) => {
2578 return Err(ServerError::ConcurrencyLimitExceeded(
2579 "stream_ops",
2580 self.config.max_concurrent_stream_ops,
2581 ));
2582 }
2583 Err(tokio::sync::TryAcquireError::Closed) => {
2584 return Err(ServerError::OperationFailed(
2585 "stream semaphore closed (server shutting down)".into(),
2586 ));
2587 }
2588 };
2589
2590 let args = ReadFramesArgs {
2591 execution_id: execution_id.clone(),
2592 attempt_index,
2593 from_id: from_id.to_owned(),
2594 to_id: to_id.to_owned(),
2595 count_limit,
2596 };
2597
2598 let partition = execution_partition(execution_id, &self.config.partition_config);
2599 let ctx = ExecKeyContext::new(&partition, execution_id);
2600 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2601
2602 let result = ff_script::functions::stream::ff_read_attempt_stream(
2606 &self.tail_client, &keys, &args,
2607 )
2608 .await
2609 .map_err(script_error_to_server);
2610
2611 drop(permit);
2612
2613 match result? {
2614 ReadFramesResult::Frames(f) => Ok(f),
2615 }
2616 }
2617
2618 pub async fn tail_attempt_stream(
2636 &self,
2637 execution_id: &ExecutionId,
2638 attempt_index: AttemptIndex,
2639 last_id: &str,
2640 block_ms: u64,
2641 count_limit: u64,
2642 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2643 if count_limit == 0 {
2644 return Err(ServerError::InvalidInput(
2645 "count_limit must be >= 1".to_owned(),
2646 ));
2647 }
2648
2649 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2665 Ok(p) => p,
2666 Err(tokio::sync::TryAcquireError::NoPermits) => {
2667 return Err(ServerError::ConcurrencyLimitExceeded(
2668 "stream_ops",
2669 self.config.max_concurrent_stream_ops,
2670 ));
2671 }
2672 Err(tokio::sync::TryAcquireError::Closed) => {
2673 return Err(ServerError::OperationFailed(
2674 "stream semaphore closed (server shutting down)".into(),
2675 ));
2676 }
2677 };
2678
2679 let partition = execution_partition(execution_id, &self.config.partition_config);
2680 let ctx = ExecKeyContext::new(&partition, execution_id);
2681 let stream_key = ctx.stream(attempt_index);
2682 let stream_meta_key = ctx.stream_meta(attempt_index);
2683
2684 let _xread_guard = self.xread_block_lock.lock().await;
2692
2693 let result = ff_script::stream_tail::xread_block(
2694 &self.tail_client,
2695 &stream_key,
2696 &stream_meta_key,
2697 last_id,
2698 block_ms,
2699 count_limit,
2700 )
2701 .await
2702 .map_err(script_error_to_server);
2703
2704 drop(_xread_guard);
2705 drop(permit);
2706 result
2707 }
2708
2709 pub async fn shutdown(self) {
2732 tracing::info!("shutting down FlowFabric server");
2733
2734 self.stream_semaphore.close();
2739 tracing::info!(
2740 "stream semaphore closed; no new read/tail attempts will be accepted"
2741 );
2742
2743 let drain_timeout = Duration::from_secs(15);
2747 let background = self.background_tasks.clone();
2748 let drain = async move {
2749 let mut guard = background.lock().await;
2750 while guard.join_next().await.is_some() {}
2751 };
2752 match tokio::time::timeout(drain_timeout, drain).await {
2753 Ok(()) => {}
2754 Err(_) => {
2755 tracing::warn!(
2756 timeout_s = drain_timeout.as_secs(),
2757 "shutdown: background tasks did not finish in time, aborting"
2758 );
2759 self.background_tasks.lock().await.abort_all();
2760 }
2761 }
2762
2763 self.engine.shutdown().await;
2764 tracing::info!("FlowFabric server shutdown complete");
2765 }
2766}
2767
2768const REQUIRED_VALKEY_MAJOR: u32 = 7;
2774const REQUIRED_VALKEY_MINOR: u32 = 2;
2775
2776const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2781
2782async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2807 let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2808 let mut backoff = Duration::from_millis(200);
2809 loop {
2810 let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2811 match query_valkey_version(client).await {
2812 Ok((detected_major, detected_minor))
2813 if (detected_major, detected_minor)
2814 >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR) =>
2815 {
2816 tracing::info!(
2817 detected_major,
2818 detected_minor,
2819 required_major = REQUIRED_VALKEY_MAJOR,
2820 required_minor = REQUIRED_VALKEY_MINOR,
2821 "Valkey version accepted"
2822 );
2823 return Ok(());
2824 }
2825 Ok((detected_major, detected_minor)) => (
2826 true,
2830 ServerError::ValkeyVersionTooLow {
2831 detected: format!("{detected_major}.{detected_minor}"),
2832 required: format!("{REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"),
2833 },
2834 format!(
2835 "detected={detected_major}.{detected_minor} < required={REQUIRED_VALKEY_MAJOR}.{REQUIRED_VALKEY_MINOR}"
2836 ),
2837 ),
2838 Err(e) => {
2839 let retryable = e
2844 .valkey_kind()
2845 .map(ff_script::retry::is_retryable_kind)
2846 .unwrap_or(true);
2850 let detail = e.to_string();
2851 (retryable, e, detail)
2852 }
2853 };
2854
2855 if !should_retry {
2856 return Err(err_for_budget_exhaust);
2857 }
2858 if tokio::time::Instant::now() >= deadline {
2859 return Err(err_for_budget_exhaust);
2860 }
2861 tracing::warn!(
2862 backoff_ms = backoff.as_millis() as u64,
2863 detail = %log_detail,
2864 "valkey version check transient failure; retrying"
2865 );
2866 tokio::time::sleep(backoff).await;
2867 backoff = (backoff * 2).min(Duration::from_secs(5));
2868 }
2869}
2870
2871async fn query_valkey_version(client: &Client) -> Result<(u32, u32), ServerError> {
2884 let raw: Value = client
2885 .cmd("INFO")
2886 .arg("server")
2887 .execute()
2888 .await
2889 .map_err(|e| ServerError::ValkeyContext {
2890 source: e,
2891 context: "INFO server".into(),
2892 })?;
2893 let bodies = extract_info_bodies(&raw)?;
2894 let mut min_version: Option<(u32, u32)> = None;
2900 for body in &bodies {
2901 let version = parse_valkey_version(body)?;
2902 min_version = Some(match min_version {
2903 None => version,
2904 Some(existing) => existing.min(version),
2905 });
2906 }
2907 min_version.ok_or_else(|| {
2908 ServerError::OperationFailed(
2909 "valkey version check: cluster INFO returned no node bodies".into(),
2910 )
2911 })
2912}
2913
2914fn extract_info_bodies(raw: &Value) -> Result<Vec<String>, ServerError> {
2920 match raw {
2921 Value::BulkString(bytes) => Ok(vec![String::from_utf8_lossy(bytes).into_owned()]),
2922 Value::VerbatimString { text, .. } => Ok(vec![text.clone()]),
2923 Value::SimpleString(s) => Ok(vec![s.clone()]),
2924 Value::Map(entries) => {
2925 if entries.is_empty() {
2926 return Err(ServerError::OperationFailed(
2927 "valkey version check: cluster INFO returned empty map".into(),
2928 ));
2929 }
2930 let mut out = Vec::with_capacity(entries.len());
2931 for (_, body) in entries {
2932 out.extend(extract_info_bodies(body)?);
2933 }
2934 Ok(out)
2935 }
2936 other => Err(ServerError::OperationFailed(format!(
2937 "valkey version check: unexpected INFO shape: {other:?}"
2938 ))),
2939 }
2940}
2941
2942fn parse_valkey_version(info: &str) -> Result<(u32, u32), ServerError> {
2957 let extract_major_minor = |line: &str| -> Result<(u32, u32), ServerError> {
2958 let trimmed = line.trim();
2959 let mut parts = trimmed.split('.');
2960 let major_str = parts.next().unwrap_or("").trim();
2961 if major_str.is_empty() {
2962 return Err(ServerError::OperationFailed(format!(
2963 "valkey version check: empty version field in '{trimmed}'"
2964 )));
2965 }
2966 let major = major_str.parse::<u32>().map_err(|_| {
2967 ServerError::OperationFailed(format!(
2968 "valkey version check: non-numeric major in '{trimmed}'"
2969 ))
2970 })?;
2971 let minor_str = parts.next().unwrap_or("").trim();
2975 if minor_str.is_empty() {
2976 return Err(ServerError::OperationFailed(format!(
2977 "valkey version check: missing minor component in '{trimmed}'"
2978 )));
2979 }
2980 let minor = minor_str.parse::<u32>().map_err(|_| {
2981 ServerError::OperationFailed(format!(
2982 "valkey version check: non-numeric minor in '{trimmed}'"
2983 ))
2984 })?;
2985 Ok((major, minor))
2986 };
2987 if let Some(valkey_line) = info
2989 .lines()
2990 .find_map(|line| line.strip_prefix("valkey_version:"))
2991 {
2992 return extract_major_minor(valkey_line);
2993 }
2994 let server_is_valkey = info
2999 .lines()
3000 .map(str::trim)
3001 .any(|line| line.eq_ignore_ascii_case("server_name:valkey"));
3002 if !server_is_valkey {
3003 return Err(ServerError::OperationFailed(
3004 "valkey version check: INFO missing valkey_version and server_name:valkey marker \
3005 (unsupported backend — FlowFabric requires Valkey >= 7.2; Redis is not supported)"
3006 .into(),
3007 ));
3008 }
3009 if let Some(redis_line) = info
3013 .lines()
3014 .find_map(|line| line.strip_prefix("redis_version:"))
3015 {
3016 return extract_major_minor(redis_line);
3017 }
3018 Err(ServerError::OperationFailed(
3019 "valkey version check: INFO has server_name:valkey but no redis_version or valkey_version field"
3020 .into(),
3021 ))
3022}
3023
3024async fn validate_or_create_partition_config(
3031 client: &Client,
3032 config: &PartitionConfig,
3033) -> Result<(), ServerError> {
3034 let key = keys::global_config_partitions();
3035
3036 let existing: HashMap<String, String> = client
3037 .hgetall(&key)
3038 .await
3039 .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
3040
3041 if existing.is_empty() {
3042 tracing::info!("first boot: creating {key}");
3044 client
3045 .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
3046 .await
3047 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
3048 client
3049 .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
3050 .await
3051 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
3052 client
3053 .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
3054 .await
3055 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
3056 return Ok(());
3057 }
3058
3059 let check = |field: &str, expected: u16| -> Result<(), ServerError> {
3061 let stored: u16 = existing
3062 .get(field)
3063 .and_then(|v| v.parse().ok())
3064 .unwrap_or(0);
3065 if stored != expected {
3066 return Err(ServerError::PartitionMismatch(format!(
3067 "{field}: stored={stored}, config={expected}. \
3068 Partition counts are fixed at deployment time. \
3069 Either fix your config or migrate the data."
3070 )));
3071 }
3072 Ok(())
3073 };
3074
3075 check("num_flow_partitions", config.num_flow_partitions)?;
3076 check("num_budget_partitions", config.num_budget_partitions)?;
3077 check("num_quota_partitions", config.num_quota_partitions)?;
3078
3079 tracing::info!("partition config validated against stored {key}");
3080 Ok(())
3081}
3082
3083const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
3089
3090enum PartitionBootOutcome {
3093 Match,
3095 Mismatch,
3097 Repaired,
3099 Installed,
3101}
3102
3103const BOOT_INIT_CONCURRENCY: usize = 16;
3108
3109async fn init_one_partition(
3110 client: &Client,
3111 partition: Partition,
3112 secret_hex: &str,
3113) -> Result<PartitionBootOutcome, ServerError> {
3114 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
3115
3116 let stored_kid: Option<String> = client
3124 .cmd("HGET")
3125 .arg(&key)
3126 .arg("current_kid")
3127 .execute()
3128 .await
3129 .map_err(|e| ServerError::ValkeyContext {
3130 source: e,
3131 context: format!("HGET {key} current_kid (init probe)"),
3132 })?;
3133
3134 if let Some(stored_kid) = stored_kid {
3135 let field = format!("secret:{stored_kid}");
3139 let stored_secret: Option<String> = client
3140 .hget(&key, &field)
3141 .await
3142 .map_err(|e| ServerError::ValkeyContext {
3143 source: e,
3144 context: format!("HGET {key} secret:<kid> (init check)"),
3145 })?;
3146 if stored_secret.is_none() {
3147 client
3153 .hset(&key, &field, secret_hex)
3154 .await
3155 .map_err(|e| ServerError::ValkeyContext {
3156 source: e,
3157 context: format!("HSET {key} secret:<kid> (repair torn write)"),
3158 })?;
3159 return Ok(PartitionBootOutcome::Repaired);
3160 }
3161 if stored_secret.as_deref() != Some(secret_hex) {
3162 return Ok(PartitionBootOutcome::Mismatch);
3163 }
3164 return Ok(PartitionBootOutcome::Match);
3165 }
3166
3167 let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
3171 let _: i64 = client
3172 .cmd("HSET")
3173 .arg(&key)
3174 .arg("current_kid")
3175 .arg(WAITPOINT_HMAC_INITIAL_KID)
3176 .arg(&secret_field)
3177 .arg(secret_hex)
3178 .execute()
3179 .await
3180 .map_err(|e| ServerError::ValkeyContext {
3181 source: e,
3182 context: format!("HSET {key} (init waitpoint HMAC atomic)"),
3183 })?;
3184 Ok(PartitionBootOutcome::Installed)
3185}
3186
3187async fn initialize_waitpoint_hmac_secret(
3199 client: &Client,
3200 partition_config: &PartitionConfig,
3201 secret_hex: &str,
3202) -> Result<(), ServerError> {
3203 use futures::stream::{FuturesUnordered, StreamExt};
3204
3205 let n = partition_config.num_flow_partitions;
3206 tracing::info!(
3207 partitions = n,
3208 concurrency = BOOT_INIT_CONCURRENCY,
3209 "installing waitpoint HMAC secret across {n} execution partitions"
3210 );
3211
3212 let mut mismatch_count: u16 = 0;
3213 let mut repaired_count: u16 = 0;
3214 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3215 let mut next_index: u16 = 0;
3216
3217 loop {
3218 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3219 let partition = Partition {
3220 family: PartitionFamily::Execution,
3221 index: next_index,
3222 };
3223 let client = client.clone();
3224 let secret_hex = secret_hex.to_owned();
3225 pending.push(async move {
3226 init_one_partition(&client, partition, &secret_hex).await
3227 });
3228 next_index += 1;
3229 }
3230 match pending.next().await {
3231 Some(res) => match res? {
3232 PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
3233 PartitionBootOutcome::Mismatch => mismatch_count += 1,
3234 PartitionBootOutcome::Repaired => repaired_count += 1,
3235 },
3236 None => break,
3237 }
3238 }
3239
3240 if repaired_count > 0 {
3241 tracing::warn!(
3242 repaired_partitions = repaired_count,
3243 total_partitions = n,
3244 "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
3245 (current_kid present but secret:<kid> missing, likely crash during prior boot)"
3246 );
3247 }
3248
3249 if mismatch_count > 0 {
3250 tracing::warn!(
3251 mismatched_partitions = mismatch_count,
3252 total_partitions = n,
3253 "stored/env secret mismatch on {mismatch_count} partitions — \
3254 env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
3255 run POST /v1/admin/rotate-waitpoint-secret to sync"
3256 );
3257 }
3258
3259 tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
3260 Ok(())
3261}
3262
3263#[derive(Debug, Clone, serde::Serialize)]
3265pub struct RotateWaitpointSecretResult {
3266 pub rotated: u16,
3268 pub failed: Vec<u16>,
3273 pub new_kid: String,
3275}
3276
3277impl Server {
3278 pub async fn rotate_waitpoint_secret(
3286 &self,
3287 new_kid: &str,
3288 new_secret_hex: &str,
3289 ) -> Result<RotateWaitpointSecretResult, ServerError> {
3290 if new_kid.is_empty() || new_kid.contains(':') {
3291 return Err(ServerError::OperationFailed(
3292 "new_kid must be non-empty and must not contain ':'".into(),
3293 ));
3294 }
3295 if new_secret_hex.is_empty()
3296 || !new_secret_hex.len().is_multiple_of(2)
3297 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
3298 {
3299 return Err(ServerError::OperationFailed(
3300 "new_secret_hex must be a non-empty even-length hex string".into(),
3301 ));
3302 }
3303
3304 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3312 Ok(p) => p,
3313 Err(tokio::sync::TryAcquireError::NoPermits) => {
3314 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3315 }
3316 Err(tokio::sync::TryAcquireError::Closed) => {
3317 return Err(ServerError::OperationFailed(
3318 "admin rotate semaphore closed (server shutting down)".into(),
3319 ));
3320 }
3321 };
3322
3323 let n = self.config.partition_config.num_flow_partitions;
3324 let grace_ms = self.config.waitpoint_hmac_grace_ms;
3328
3329 use futures::stream::{FuturesUnordered, StreamExt};
3340
3341 let mut rotated = 0u16;
3342 let mut failed = Vec::new();
3343 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3344 let mut next_index: u16 = 0;
3345
3346 loop {
3347 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3348 let partition = Partition {
3349 family: PartitionFamily::Execution,
3350 index: next_index,
3351 };
3352 let idx = next_index;
3353 let new_kid_owned = new_kid.to_owned();
3358 let new_secret_owned = new_secret_hex.to_owned();
3359 let partition_owned = partition;
3360 let fut = async move {
3361 let outcome = self
3362 .rotate_single_partition(
3363 &partition_owned,
3364 &new_kid_owned,
3365 &new_secret_owned,
3366 grace_ms,
3367 )
3368 .await;
3369 (idx, partition_owned, outcome)
3370 };
3371 pending.push(fut);
3372 next_index += 1;
3373 }
3374 match pending.next().await {
3375 Some((idx, partition, outcome)) => match outcome {
3376 Ok(()) => {
3377 rotated += 1;
3378 tracing::debug!(
3386 partition = %partition,
3387 new_kid = %new_kid,
3388 "waitpoint_hmac_rotated"
3389 );
3390 }
3391 Err(e) => {
3392 tracing::error!(
3396 target: "audit",
3397 partition = %partition,
3398 err = %e,
3399 "waitpoint_hmac_rotation_failed"
3400 );
3401 failed.push(idx);
3402 }
3403 },
3404 None => break,
3405 }
3406 }
3407
3408 tracing::info!(
3412 target: "audit",
3413 new_kid = %new_kid,
3414 total_partitions = n,
3415 rotated,
3416 failed_count = failed.len(),
3417 "waitpoint_hmac_rotation_complete"
3418 );
3419
3420 Ok(RotateWaitpointSecretResult {
3421 rotated,
3422 failed,
3423 new_kid: new_kid.to_owned(),
3424 })
3425 }
3426
3427 async fn rotate_single_partition(
3434 &self,
3435 partition: &Partition,
3436 new_kid: &str,
3437 new_secret_hex: &str,
3438 grace_ms: u64,
3439 ) -> Result<(), ServerError> {
3440 let idx = IndexKeys::new(partition);
3441 let args = RotateWaitpointHmacSecretArgs {
3442 new_kid: new_kid.to_owned(),
3443 new_secret_hex: new_secret_hex.to_owned(),
3444 grace_ms,
3445 };
3446 let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3447 &self.client,
3448 &idx,
3449 &args,
3450 )
3451 .await
3452 .map_err(|e| match e {
3453 ff_script::ScriptError::RotationConflict(kid) => {
3457 ServerError::OperationFailed(format!(
3458 "rotation conflict: kid {kid} already installed with a \
3459 different secret. Either use a fresh kid or restore the \
3460 original secret for this kid before retrying."
3461 ))
3462 }
3463 ff_script::ScriptError::Valkey(v) => ServerError::ValkeyContext {
3464 source: v,
3465 context: format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3466 },
3467 other => ServerError::OperationFailed(format!(
3468 "rotation failed on partition {partition}: {other}"
3469 )),
3470 })?;
3471 let _ = outcome;
3474 Ok(())
3475 }
3476}
3477
3478fn parse_create_result(
3481 raw: &Value,
3482 execution_id: &ExecutionId,
3483) -> Result<CreateExecutionResult, ServerError> {
3484 let arr = match raw {
3485 Value::Array(arr) => arr,
3486 _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3487 };
3488
3489 let status = match arr.first() {
3490 Some(Ok(Value::Int(n))) => *n,
3491 _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3492 };
3493
3494 if status == 1 {
3495 let sub = arr
3497 .get(1)
3498 .and_then(|v| match v {
3499 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3500 Ok(Value::SimpleString(s)) => Some(s.clone()),
3501 _ => None,
3502 })
3503 .unwrap_or_default();
3504
3505 if sub == "DUPLICATE" {
3506 Ok(CreateExecutionResult::Duplicate {
3507 execution_id: execution_id.clone(),
3508 })
3509 } else {
3510 Ok(CreateExecutionResult::Created {
3511 execution_id: execution_id.clone(),
3512 public_state: PublicState::Waiting,
3513 })
3514 }
3515 } else {
3516 let error_code = arr
3517 .get(1)
3518 .and_then(|v| match v {
3519 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3520 Ok(Value::SimpleString(s)) => Some(s.clone()),
3521 _ => None,
3522 })
3523 .unwrap_or_else(|| "unknown".to_owned());
3524 Err(ServerError::OperationFailed(format!(
3525 "ff_create_execution failed: {error_code}"
3526 )))
3527 }
3528}
3529
3530fn parse_cancel_result(
3531 raw: &Value,
3532 execution_id: &ExecutionId,
3533) -> Result<CancelExecutionResult, ServerError> {
3534 let arr = match raw {
3535 Value::Array(arr) => arr,
3536 _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3537 };
3538
3539 let status = match arr.first() {
3540 Some(Ok(Value::Int(n))) => *n,
3541 _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3542 };
3543
3544 if status == 1 {
3545 Ok(CancelExecutionResult::Cancelled {
3546 execution_id: execution_id.clone(),
3547 public_state: PublicState::Cancelled,
3548 })
3549 } else {
3550 let error_code = arr
3551 .get(1)
3552 .and_then(|v| match v {
3553 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3554 Ok(Value::SimpleString(s)) => Some(s.clone()),
3555 _ => None,
3556 })
3557 .unwrap_or_else(|| "unknown".to_owned());
3558 Err(ServerError::OperationFailed(format!(
3559 "ff_cancel_execution failed: {error_code}"
3560 )))
3561 }
3562}
3563
3564fn parse_budget_create_result(
3565 raw: &Value,
3566 budget_id: &BudgetId,
3567) -> Result<CreateBudgetResult, ServerError> {
3568 let arr = match raw {
3569 Value::Array(arr) => arr,
3570 _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3571 };
3572
3573 let status = match arr.first() {
3574 Some(Ok(Value::Int(n))) => *n,
3575 _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3576 };
3577
3578 if status == 1 {
3579 let sub = arr
3580 .get(1)
3581 .and_then(|v| match v {
3582 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3583 Ok(Value::SimpleString(s)) => Some(s.clone()),
3584 _ => None,
3585 })
3586 .unwrap_or_default();
3587
3588 if sub == "ALREADY_SATISFIED" {
3589 Ok(CreateBudgetResult::AlreadySatisfied {
3590 budget_id: budget_id.clone(),
3591 })
3592 } else {
3593 Ok(CreateBudgetResult::Created {
3594 budget_id: budget_id.clone(),
3595 })
3596 }
3597 } else {
3598 let error_code = arr
3599 .get(1)
3600 .and_then(|v| match v {
3601 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3602 Ok(Value::SimpleString(s)) => Some(s.clone()),
3603 _ => None,
3604 })
3605 .unwrap_or_else(|| "unknown".to_owned());
3606 Err(ServerError::OperationFailed(format!(
3607 "ff_create_budget failed: {error_code}"
3608 )))
3609 }
3610}
3611
3612fn parse_quota_create_result(
3613 raw: &Value,
3614 quota_policy_id: &QuotaPolicyId,
3615) -> Result<CreateQuotaPolicyResult, ServerError> {
3616 let arr = match raw {
3617 Value::Array(arr) => arr,
3618 _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3619 };
3620
3621 let status = match arr.first() {
3622 Some(Ok(Value::Int(n))) => *n,
3623 _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3624 };
3625
3626 if status == 1 {
3627 let sub = arr
3628 .get(1)
3629 .and_then(|v| match v {
3630 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3631 Ok(Value::SimpleString(s)) => Some(s.clone()),
3632 _ => None,
3633 })
3634 .unwrap_or_default();
3635
3636 if sub == "ALREADY_SATISFIED" {
3637 Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3638 quota_policy_id: quota_policy_id.clone(),
3639 })
3640 } else {
3641 Ok(CreateQuotaPolicyResult::Created {
3642 quota_policy_id: quota_policy_id.clone(),
3643 })
3644 }
3645 } else {
3646 let error_code = arr
3647 .get(1)
3648 .and_then(|v| match v {
3649 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3650 Ok(Value::SimpleString(s)) => Some(s.clone()),
3651 _ => None,
3652 })
3653 .unwrap_or_else(|| "unknown".to_owned());
3654 Err(ServerError::OperationFailed(format!(
3655 "ff_create_quota_policy failed: {error_code}"
3656 )))
3657 }
3658}
3659
3660fn parse_create_flow_result(
3663 raw: &Value,
3664 flow_id: &FlowId,
3665) -> Result<CreateFlowResult, ServerError> {
3666 let arr = match raw {
3667 Value::Array(arr) => arr,
3668 _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3669 };
3670 let status = match arr.first() {
3671 Some(Ok(Value::Int(n))) => *n,
3672 _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3673 };
3674 if status == 1 {
3675 let sub = fcall_field_str(arr, 1);
3676 if sub == "ALREADY_SATISFIED" {
3677 Ok(CreateFlowResult::AlreadySatisfied {
3678 flow_id: flow_id.clone(),
3679 })
3680 } else {
3681 Ok(CreateFlowResult::Created {
3682 flow_id: flow_id.clone(),
3683 })
3684 }
3685 } else {
3686 let error_code = fcall_field_str(arr, 1);
3687 Err(ServerError::OperationFailed(format!(
3688 "ff_create_flow failed: {error_code}"
3689 )))
3690 }
3691}
3692
3693fn parse_add_execution_to_flow_result(
3694 raw: &Value,
3695) -> Result<AddExecutionToFlowResult, ServerError> {
3696 let arr = match raw {
3697 Value::Array(arr) => arr,
3698 _ => {
3699 return Err(ServerError::Script(
3700 "ff_add_execution_to_flow: expected Array".into(),
3701 ))
3702 }
3703 };
3704 let status = match arr.first() {
3705 Some(Ok(Value::Int(n))) => *n,
3706 _ => {
3707 return Err(ServerError::Script(
3708 "ff_add_execution_to_flow: bad status code".into(),
3709 ))
3710 }
3711 };
3712 if status == 1 {
3713 let sub = fcall_field_str(arr, 1);
3714 let eid_str = fcall_field_str(arr, 2);
3715 let nc_str = fcall_field_str(arr, 3);
3716 let eid = ExecutionId::parse(&eid_str)
3717 .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3718 let nc: u32 = nc_str.parse().unwrap_or(0);
3719 if sub == "ALREADY_SATISFIED" {
3720 Ok(AddExecutionToFlowResult::AlreadyMember {
3721 execution_id: eid,
3722 node_count: nc,
3723 })
3724 } else {
3725 Ok(AddExecutionToFlowResult::Added {
3726 execution_id: eid,
3727 new_node_count: nc,
3728 })
3729 }
3730 } else {
3731 let error_code = fcall_field_str(arr, 1);
3732 Err(ServerError::OperationFailed(format!(
3733 "ff_add_execution_to_flow failed: {error_code}"
3734 )))
3735 }
3736}
3737
3738enum ParsedCancelFlow {
3744 Cancelled {
3745 policy: String,
3746 member_execution_ids: Vec<String>,
3747 },
3748 AlreadyTerminal,
3749}
3750
3751fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3757 let arr = match raw {
3758 Value::Array(arr) => arr,
3759 _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3760 };
3761 let status = match arr.first() {
3762 Some(Ok(Value::Int(n))) => *n,
3763 _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3764 };
3765 if status != 1 {
3766 let error_code = fcall_field_str(arr, 1);
3767 if error_code == "flow_already_terminal" {
3768 return Ok(ParsedCancelFlow::AlreadyTerminal);
3769 }
3770 return Err(ServerError::OperationFailed(format!(
3771 "ff_cancel_flow failed: {error_code}"
3772 )));
3773 }
3774 let policy = fcall_field_str(arr, 2);
3776 let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3779 for i in 3..arr.len() {
3780 members.push(fcall_field_str(arr, i));
3781 }
3782 Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3783}
3784
3785fn parse_stage_dependency_edge_result(
3786 raw: &Value,
3787) -> Result<StageDependencyEdgeResult, ServerError> {
3788 let arr = match raw {
3789 Value::Array(arr) => arr,
3790 _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3791 };
3792 let status = match arr.first() {
3793 Some(Ok(Value::Int(n))) => *n,
3794 _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3795 };
3796 if status == 1 {
3797 let edge_id_str = fcall_field_str(arr, 2);
3798 let rev_str = fcall_field_str(arr, 3);
3799 let edge_id = EdgeId::parse(&edge_id_str)
3800 .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3801 let rev: u64 = rev_str.parse().unwrap_or(0);
3802 Ok(StageDependencyEdgeResult::Staged {
3803 edge_id,
3804 new_graph_revision: rev,
3805 })
3806 } else {
3807 let error_code = fcall_field_str(arr, 1);
3808 Err(ServerError::OperationFailed(format!(
3809 "ff_stage_dependency_edge failed: {error_code}"
3810 )))
3811 }
3812}
3813
3814fn parse_apply_dependency_result(
3815 raw: &Value,
3816) -> Result<ApplyDependencyToChildResult, ServerError> {
3817 let arr = match raw {
3818 Value::Array(arr) => arr,
3819 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3820 };
3821 let status = match arr.first() {
3822 Some(Ok(Value::Int(n))) => *n,
3823 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3824 };
3825 if status == 1 {
3826 let sub = fcall_field_str(arr, 1);
3827 if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3828 Ok(ApplyDependencyToChildResult::AlreadyApplied)
3829 } else {
3830 let count_str = fcall_field_str(arr, 2);
3832 let count: u32 = count_str.parse().unwrap_or(0);
3833 Ok(ApplyDependencyToChildResult::Applied {
3834 unsatisfied_count: count,
3835 })
3836 }
3837 } else {
3838 let error_code = fcall_field_str(arr, 1);
3839 Err(ServerError::OperationFailed(format!(
3840 "ff_apply_dependency_to_child failed: {error_code}"
3841 )))
3842 }
3843}
3844
3845fn parse_deliver_signal_result(
3846 raw: &Value,
3847 signal_id: &SignalId,
3848) -> Result<DeliverSignalResult, ServerError> {
3849 let arr = match raw {
3850 Value::Array(arr) => arr,
3851 _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3852 };
3853 let status = match arr.first() {
3854 Some(Ok(Value::Int(n))) => *n,
3855 _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3856 };
3857 if status == 1 {
3858 let sub = fcall_field_str(arr, 1);
3859 if sub == "DUPLICATE" {
3860 let existing_str = fcall_field_str(arr, 2);
3862 let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3863 Ok(DeliverSignalResult::Duplicate {
3864 existing_signal_id: existing_id,
3865 })
3866 } else {
3867 let effect = fcall_field_str(arr, 3);
3869 Ok(DeliverSignalResult::Accepted {
3870 signal_id: signal_id.clone(),
3871 effect,
3872 })
3873 }
3874 } else {
3875 let error_code = fcall_field_str(arr, 1);
3876 Err(ServerError::OperationFailed(format!(
3877 "ff_deliver_signal failed: {error_code}"
3878 )))
3879 }
3880}
3881
3882fn parse_change_priority_result(
3883 raw: &Value,
3884 execution_id: &ExecutionId,
3885) -> Result<ChangePriorityResult, ServerError> {
3886 let arr = match raw {
3887 Value::Array(arr) => arr,
3888 _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3889 };
3890 let status = match arr.first() {
3891 Some(Ok(Value::Int(n))) => *n,
3892 _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3893 };
3894 if status == 1 {
3895 Ok(ChangePriorityResult::Changed {
3896 execution_id: execution_id.clone(),
3897 })
3898 } else {
3899 let error_code = fcall_field_str(arr, 1);
3900 Err(ServerError::OperationFailed(format!(
3901 "ff_change_priority failed: {error_code}"
3902 )))
3903 }
3904}
3905
3906fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3907 let arr = match raw {
3908 Value::Array(arr) => arr,
3909 _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3910 };
3911 let status = match arr.first() {
3912 Some(Ok(Value::Int(n))) => *n,
3913 _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3914 };
3915 if status == 1 {
3916 let unsatisfied = fcall_field_str(arr, 2);
3918 let ps = if unsatisfied == "0" {
3919 PublicState::Waiting
3920 } else {
3921 PublicState::WaitingChildren
3922 };
3923 Ok(ReplayExecutionResult::Replayed { public_state: ps })
3924 } else {
3925 let error_code = fcall_field_str(arr, 1);
3926 Err(ServerError::OperationFailed(format!(
3927 "ff_replay_execution failed: {error_code}"
3928 )))
3929 }
3930}
3931
3932fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3943 match e {
3944 ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
3945 source: valkey_err,
3946 context: "stream FCALL transport".into(),
3947 },
3948 other => ServerError::Script(other.to_string()),
3949 }
3950}
3951
3952fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3953 match arr.get(index) {
3954 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3955 Some(Ok(Value::SimpleString(s))) => s.clone(),
3956 Some(Ok(Value::Int(n))) => n.to_string(),
3957 _ => String::new(),
3958 }
3959}
3960
3961fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3965 let arr = match raw {
3966 Value::Array(arr) => arr,
3967 _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3968 };
3969 let status_code = match arr.first() {
3970 Some(Ok(Value::Int(n))) => *n,
3971 _ => {
3972 return Err(ServerError::Script(
3973 "ff_report_usage_and_check: expected Int status code".into(),
3974 ))
3975 }
3976 };
3977 if status_code != 1 {
3978 let error_code = fcall_field_str(arr, 1);
3979 return Err(ServerError::OperationFailed(format!(
3980 "ff_report_usage_and_check failed: {error_code}"
3981 )));
3982 }
3983 let sub_status = fcall_field_str(arr, 1);
3984 match sub_status.as_str() {
3985 "OK" => Ok(ReportUsageResult::Ok),
3986 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3987 "SOFT_BREACH" => {
3988 let dim = fcall_field_str(arr, 2);
3989 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3990 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3991 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3992 }
3993 "HARD_BREACH" => {
3994 let dim = fcall_field_str(arr, 2);
3995 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3996 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3997 Ok(ReportUsageResult::HardBreach {
3998 dimension: dim,
3999 current_usage: current,
4000 hard_limit: limit,
4001 })
4002 }
4003 _ => Err(ServerError::OperationFailed(format!(
4004 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
4005 ))),
4006 }
4007}
4008
4009fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
4010 let arr = match raw {
4011 Value::Array(arr) => arr,
4012 _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
4013 };
4014 let status = match arr.first() {
4015 Some(Ok(Value::Int(n))) => *n,
4016 _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
4017 };
4018 if status == 1 {
4019 let sub = fcall_field_str(arr, 1);
4020 if sub == "ALREADY_SATISFIED" {
4021 let reason = fcall_field_str(arr, 2);
4022 Ok(RevokeLeaseResult::AlreadySatisfied { reason })
4023 } else {
4024 let lid = fcall_field_str(arr, 2);
4025 let epoch = fcall_field_str(arr, 3);
4026 Ok(RevokeLeaseResult::Revoked {
4027 lease_id: lid,
4028 lease_epoch: epoch,
4029 })
4030 }
4031 } else {
4032 let error_code = fcall_field_str(arr, 1);
4033 Err(ServerError::OperationFailed(format!(
4034 "ff_revoke_lease failed: {error_code}"
4035 )))
4036 }
4037}
4038
4039fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
4045 if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
4046 return true;
4047 }
4048 e.detail()
4049 .map(|d| {
4050 d.contains("Function not loaded")
4051 || d.contains("No matching function")
4052 || d.contains("function not found")
4053 })
4054 .unwrap_or(false)
4055 || e.to_string().contains("Function not loaded")
4056}
4057
4058async fn fcall_with_reload_on_client(
4061 client: &Client,
4062 function: &str,
4063 keys: &[&str],
4064 args: &[&str],
4065) -> Result<Value, ServerError> {
4066 match client.fcall(function, keys, args).await {
4067 Ok(v) => Ok(v),
4068 Err(e) if is_function_not_loaded(&e) => {
4069 tracing::warn!(function, "Lua library not found on server, reloading");
4070 ff_script::loader::ensure_library(client)
4071 .await
4072 .map_err(ServerError::LibraryLoad)?;
4073 client
4074 .fcall(function, keys, args)
4075 .await
4076 .map_err(ServerError::Valkey)
4077 }
4078 Err(e) => Err(ServerError::Valkey(e)),
4079 }
4080}
4081
4082async fn build_cancel_execution_fcall(
4086 client: &Client,
4087 partition_config: &PartitionConfig,
4088 args: &CancelExecutionArgs,
4089) -> Result<(Vec<String>, Vec<String>), ServerError> {
4090 let partition = execution_partition(&args.execution_id, partition_config);
4091 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
4092 let idx = IndexKeys::new(&partition);
4093
4094 let lane_str: Option<String> = client
4095 .hget(&ctx.core(), "lane_id")
4096 .await
4097 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
4098 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
4099
4100 let dyn_fields: Vec<Option<String>> = client
4101 .cmd("HMGET")
4102 .arg(ctx.core())
4103 .arg("current_attempt_index")
4104 .arg("current_waitpoint_id")
4105 .arg("current_worker_instance_id")
4106 .execute()
4107 .await
4108 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
4109
4110 let att_idx_val = dyn_fields.first()
4111 .and_then(|v| v.as_ref())
4112 .and_then(|s| s.parse::<u32>().ok())
4113 .unwrap_or(0);
4114 let att_idx = AttemptIndex::new(att_idx_val);
4115 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4116 let wp_id = if wp_id_str.is_empty() {
4117 WaitpointId::new()
4118 } else {
4119 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
4120 };
4121 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
4122 let wiid = WorkerInstanceId::new(&wiid_str);
4123
4124 let keys: Vec<String> = vec![
4125 ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
4147 let argv: Vec<String> = vec![
4148 args.execution_id.to_string(),
4149 args.reason.clone(),
4150 args.source.to_string(),
4151 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
4152 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
4153 ];
4154 Ok((keys, argv))
4155}
4156
4157const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
4161
4162fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
4167 match e {
4168 ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
4169 Some(err.kind())
4170 }
4171 ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
4172 _ => None,
4173 }
4174}
4175
4176async fn ack_cancel_member(
4193 client: &Client,
4194 pending_cancels_key: &str,
4195 cancel_backlog_key: &str,
4196 eid_str: &str,
4197 flow_id: &str,
4198) {
4199 let keys = [pending_cancels_key, cancel_backlog_key];
4200 let args_v = [eid_str, flow_id];
4201 let fut: Result<Value, _> =
4202 client.fcall("ff_ack_cancel_member", &keys, &args_v).await;
4203 if let Err(e) = fut {
4204 tracing::warn!(
4205 flow_id = %flow_id,
4206 execution_id = %eid_str,
4207 error = %e,
4208 "ff_ack_cancel_member failed; reconciler will drain on next pass"
4209 );
4210 }
4211}
4212
4213fn is_terminal_ack_error(err: &ServerError) -> bool {
4222 match err {
4223 ServerError::OperationFailed(msg) => {
4224 msg.contains("execution_not_active") || msg.contains("execution_not_found")
4225 }
4226 _ => false,
4227 }
4228}
4229
4230async fn cancel_member_execution(
4231 client: &Client,
4232 partition_config: &PartitionConfig,
4233 eid_str: &str,
4234 reason: &str,
4235 now: TimestampMs,
4236) -> Result<(), ServerError> {
4237 let execution_id = ExecutionId::parse(eid_str)
4238 .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
4239 let args = CancelExecutionArgs {
4240 execution_id: execution_id.clone(),
4241 reason: reason.to_owned(),
4242 source: CancelSource::OperatorOverride,
4243 lease_id: None,
4244 lease_epoch: None,
4245 attempt_id: None,
4246 now,
4247 };
4248
4249 let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
4250 for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
4251 let is_last = attempt_idx + 1 == attempts;
4252 match try_cancel_member_once(client, partition_config, &args).await {
4253 Ok(()) => return Ok(()),
4254 Err(e) => {
4255 let retryable = extract_valkey_kind(&e)
4259 .map(ff_script::retry::is_retryable_kind)
4260 .unwrap_or(false);
4261 if !retryable || is_last {
4262 return Err(e);
4263 }
4264 tracing::debug!(
4265 execution_id = %execution_id,
4266 attempt = attempt_idx + 1,
4267 delay_ms = *delay_ms,
4268 error = %e,
4269 "cancel_member_execution: transient error, retrying"
4270 );
4271 tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
4272 }
4273 }
4274 }
4275 Err(ServerError::OperationFailed(format!(
4279 "cancel_member_execution: retries exhausted for {execution_id}"
4280 )))
4281}
4282
4283async fn try_cancel_member_once(
4286 client: &Client,
4287 partition_config: &PartitionConfig,
4288 args: &CancelExecutionArgs,
4289) -> Result<(), ServerError> {
4290 let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
4291 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
4292 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
4293 let raw =
4294 fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
4295 parse_cancel_result(&raw, &args.execution_id).map(|_| ())
4296}
4297
4298fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
4299 let arr = match raw {
4300 Value::Array(arr) => arr,
4301 _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
4302 };
4303 let status = match arr.first() {
4304 Some(Ok(Value::Int(n))) => *n,
4305 _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
4306 };
4307 if status == 1 {
4308 let next_str = fcall_field_str(arr, 2);
4309 let next_ms: i64 = next_str.parse().unwrap_or(0);
4310 Ok(ResetBudgetResult::Reset {
4311 next_reset_at: TimestampMs::from_millis(next_ms),
4312 })
4313 } else {
4314 let error_code = fcall_field_str(arr, 1);
4315 Err(ServerError::OperationFailed(format!(
4316 "ff_reset_budget failed: {error_code}"
4317 )))
4318 }
4319}
4320
4321#[cfg(test)]
4322mod tests {
4323 use super::*;
4324 use ferriskey::ErrorKind;
4325
4326 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
4327 ferriskey::Error::from((kind, "synthetic"))
4328 }
4329
4330 #[test]
4333 fn create_budget_rejects_over_cap_dimension_count() {
4334 let n = MAX_BUDGET_DIMENSIONS + 1;
4335 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4336 let hard = vec![1u64; n];
4337 let soft = vec![0u64; n];
4338 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4339 match err {
4340 ServerError::InvalidInput(msg) => {
4341 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4342 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4343 assert!(msg.contains(&format!("got={n}")), "got: {msg}");
4344 }
4345 other => panic!("expected InvalidInput, got {other:?}"),
4346 }
4347 }
4348
4349 #[test]
4350 fn create_budget_accepts_exactly_cap_dimensions() {
4351 let n = MAX_BUDGET_DIMENSIONS;
4352 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4353 let hard = vec![1u64; n];
4354 let soft = vec![0u64; n];
4355 assert!(validate_create_budget_dimensions(&dims, &hard, &soft).is_ok());
4356 }
4357
4358 #[test]
4359 fn create_budget_rejects_hard_limit_length_mismatch() {
4360 let dims = vec!["a".to_string(), "b".to_string()];
4361 let hard = vec![1u64]; let soft = vec![0u64, 0u64];
4363 let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4364 match err {
4365 ServerError::InvalidInput(msg) => {
4366 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4367 assert!(msg.contains("hard_limits=1"), "got: {msg}");
4368 assert!(msg.contains("dimensions=2"), "got: {msg}");
4369 }
4370 other => panic!("expected InvalidInput, got {other:?}"),
4371 }
4372 }
4373
4374 #[test]
4375 fn create_budget_rejects_soft_limit_length_mismatch() {
4376 let dims = vec!["a".to_string(), "b".to_string()];
4377 let hard = vec![1u64, 2u64];
4378 let soft = vec![0u64, 0u64, 0u64]; let err = validate_create_budget_dimensions(&dims, &hard, &soft).unwrap_err();
4380 match err {
4381 ServerError::InvalidInput(msg) => {
4382 assert!(msg.contains("dimension_limit_array_mismatch"), "got: {msg}");
4383 assert!(msg.contains("soft_limits=3"), "got: {msg}");
4384 }
4385 other => panic!("expected InvalidInput, got {other:?}"),
4386 }
4387 }
4388
4389 #[test]
4390 fn report_usage_rejects_over_cap_dimension_count() {
4391 let n = MAX_BUDGET_DIMENSIONS + 1;
4392 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4393 let deltas = vec![1u64; n];
4394 let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4395 match err {
4396 ServerError::InvalidInput(msg) => {
4397 assert!(msg.contains("too_many_dimensions"), "got: {msg}");
4398 assert!(msg.contains(&format!("limit={}", MAX_BUDGET_DIMENSIONS)), "got: {msg}");
4399 }
4400 other => panic!("expected InvalidInput, got {other:?}"),
4401 }
4402 }
4403
4404 #[test]
4405 fn report_usage_accepts_exactly_cap_dimensions() {
4406 let n = MAX_BUDGET_DIMENSIONS;
4407 let dims: Vec<String> = (0..n).map(|i| format!("d{i}")).collect();
4408 let deltas = vec![1u64; n];
4409 assert!(validate_report_usage_dimensions(&dims, &deltas).is_ok());
4410 }
4411
4412 #[test]
4413 fn report_usage_rejects_delta_length_mismatch() {
4414 let dims = vec!["a".to_string(), "b".to_string(), "c".to_string()];
4415 let deltas = vec![1u64, 2u64]; let err = validate_report_usage_dimensions(&dims, &deltas).unwrap_err();
4417 match err {
4418 ServerError::InvalidInput(msg) => {
4419 assert!(msg.contains("dimension_delta_array_mismatch"), "got: {msg}");
4420 assert!(msg.contains("dimensions=3"), "got: {msg}");
4421 assert!(msg.contains("deltas=2"), "got: {msg}");
4422 }
4423 other => panic!("expected InvalidInput, got {other:?}"),
4424 }
4425 }
4426
4427 #[test]
4428 fn report_usage_accepts_empty_dimensions() {
4429 assert!(validate_report_usage_dimensions(&[], &[]).is_ok());
4432 }
4433
4434 #[test]
4435 fn is_retryable_valkey_variant_uses_kind_table() {
4436 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
4437 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
4438 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
4439 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
4440 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
4441
4442 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
4443 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
4444 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
4445 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
4446 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
4447 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4448 }
4449
4450 #[test]
4451 fn is_retryable_valkey_context_uses_kind_table() {
4452 let err = ServerError::ValkeyContext {
4453 source: mk_fk_err(ErrorKind::IoError),
4454 context: "HGET test".into(),
4455 };
4456 assert!(err.is_retryable());
4457
4458 let err = ServerError::ValkeyContext {
4459 source: mk_fk_err(ErrorKind::AuthenticationFailed),
4460 context: "auth".into(),
4461 };
4462 assert!(!err.is_retryable());
4463 }
4464
4465 #[test]
4466 fn is_retryable_library_load_delegates_to_inner_kind() {
4467 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4468 mk_fk_err(ErrorKind::IoError),
4469 ));
4470 assert!(err.is_retryable());
4471
4472 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4473 mk_fk_err(ErrorKind::AuthenticationFailed),
4474 ));
4475 assert!(!err.is_retryable());
4476
4477 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4478 expected: "1".into(),
4479 got: "2".into(),
4480 });
4481 assert!(!err.is_retryable());
4482 }
4483
4484 #[test]
4485 fn is_retryable_business_logic_variants_are_false() {
4486 assert!(!ServerError::NotFound("x".into()).is_retryable());
4487 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4488 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4489 assert!(!ServerError::Script("x".into()).is_retryable());
4490 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4491 }
4492
4493 #[test]
4494 fn valkey_kind_delegates_through_library_load() {
4495 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4496 mk_fk_err(ErrorKind::ClusterDown),
4497 ));
4498 assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
4499
4500 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4501 expected: "1".into(),
4502 got: "2".into(),
4503 });
4504 assert_eq!(err.valkey_kind(), None);
4505 }
4506
4507 #[test]
4510 fn parse_valkey_version_prefers_valkey_version_over_redis_version() {
4511 let info = "\
4515# Server\r\n\
4516redis_version:7.2.4\r\n\
4517valkey_version:9.0.3\r\n\
4518server_mode:cluster\r\n\
4519os:Linux\r\n";
4520 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4521 }
4522
4523 #[test]
4524 fn parse_valkey_version_real_valkey_8_cluster_body() {
4525 let info = "\
4529# Server\r\n\
4530redis_version:7.2.4\r\n\
4531server_name:valkey\r\n\
4532valkey_version:9.0.3\r\n\
4533valkey_release_stage:ga\r\n\
4534redis_git_sha1:00000000\r\n\
4535server_mode:cluster\r\n";
4536 assert_eq!(parse_valkey_version(info).unwrap(), (9, 0));
4537 }
4538
4539 #[test]
4540 fn parse_valkey_version_falls_back_to_redis_version_on_valkey_7() {
4541 let info = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nfoo:bar\r\n";
4544 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4545 }
4546
4547 #[test]
4548 fn parse_valkey_version_rejects_redis_backend() {
4549 let info = "\
4554# Server\r\n\
4555redis_version:7.4.0\r\n\
4556redis_mode:standalone\r\n\
4557os:Linux\r\n";
4558 let err = parse_valkey_version(info).unwrap_err();
4559 assert!(matches!(err, ServerError::OperationFailed(_)));
4560 let msg = err.to_string();
4561 assert!(
4562 msg.contains("Redis is not supported") && msg.contains("server_name:valkey"),
4563 "expected Redis-rejection message, got: {msg}"
4564 );
4565 }
4566
4567 #[test]
4568 fn parse_valkey_version_accepts_valkey_7_marker_case_insensitively() {
4569 let info = "redis_version:7.2.0\r\nSERVER_NAME:Valkey\r\n";
4571 assert_eq!(parse_valkey_version(info).unwrap(), (7, 2));
4572 }
4573
4574 #[test]
4575 fn parse_valkey_version_errors_when_no_version_field() {
4576 let info = "# Server\r\nfoo:bar\r\n";
4577 let err = parse_valkey_version(info).unwrap_err();
4578 assert!(matches!(err, ServerError::OperationFailed(_)));
4579 assert!(
4580 err.to_string().contains("missing"),
4581 "expected 'missing' in message, got: {err}"
4582 );
4583 }
4584
4585 #[test]
4586 fn parse_valkey_version_errors_on_non_numeric_major() {
4587 let info = "valkey_version:invalid.x.y\n";
4588 let err = parse_valkey_version(info).unwrap_err();
4589 assert!(matches!(err, ServerError::OperationFailed(_)));
4590 assert!(err.to_string().contains("non-numeric major"));
4591 }
4592
4593 #[test]
4594 fn parse_valkey_version_errors_on_non_numeric_minor() {
4595 let info = "valkey_version:7.x.0\n";
4596 let err = parse_valkey_version(info).unwrap_err();
4597 assert!(matches!(err, ServerError::OperationFailed(_)));
4598 assert!(err.to_string().contains("non-numeric minor"));
4599 }
4600
4601 #[test]
4602 fn parse_valkey_version_errors_on_missing_minor() {
4603 let info = "valkey_version:7\n";
4606 let err = parse_valkey_version(info).unwrap_err();
4607 assert!(matches!(err, ServerError::OperationFailed(_)));
4608 assert!(err.to_string().contains("missing minor"));
4609 }
4610
4611 #[test]
4612 fn extract_info_bodies_unwraps_cluster_map_all_entries() {
4613 let body_a = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4617 let body_b = "# Server\r\nredis_version:7.2.4\r\nvalkey_version:8.0.0\r\n";
4618 let map = Value::Map(vec![
4619 (
4620 Value::SimpleString("127.0.0.1:7000".to_string()),
4621 Value::VerbatimString {
4622 format: ferriskey::value::VerbatimFormat::Text,
4623 text: body_a.to_string(),
4624 },
4625 ),
4626 (
4627 Value::SimpleString("127.0.0.1:7001".to_string()),
4628 Value::VerbatimString {
4629 format: ferriskey::value::VerbatimFormat::Text,
4630 text: body_b.to_string(),
4631 },
4632 ),
4633 ]);
4634 let bodies = extract_info_bodies(&map).unwrap();
4635 assert_eq!(bodies.len(), 2);
4636 assert_eq!(bodies[0], body_a);
4637 assert_eq!(bodies[1], body_b);
4638 }
4639
4640 #[test]
4641 fn extract_info_bodies_handles_simple_string() {
4642 let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4643 let v = Value::SimpleString(body_text.to_string());
4644 let bodies = extract_info_bodies(&v).unwrap();
4645 assert_eq!(bodies, vec![body_text.to_string()]);
4646 }
4647
4648 #[test]
4649 fn extract_info_bodies_rejects_empty_cluster_map() {
4650 let map = Value::Map(vec![]);
4651 let err = extract_info_bodies(&map).unwrap_err();
4652 assert!(matches!(err, ServerError::OperationFailed(_)));
4653 assert!(err.to_string().contains("empty map"));
4654 }
4655
4656 #[test]
4662 fn parse_valkey_version_min_across_cluster_map_picks_lowest() {
4663 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4667 let body_node2 = "# Server\r\nredis_version:7.1.0\r\nserver_name:valkey\r\n";
4668 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4669 let map = Value::Map(vec![
4670 (
4671 Value::SimpleString("node1:6379".to_string()),
4672 Value::VerbatimString {
4673 format: ferriskey::value::VerbatimFormat::Text,
4674 text: body_node1.to_string(),
4675 },
4676 ),
4677 (
4678 Value::SimpleString("node2:6379".to_string()),
4679 Value::VerbatimString {
4680 format: ferriskey::value::VerbatimFormat::Text,
4681 text: body_node2.to_string(),
4682 },
4683 ),
4684 (
4685 Value::SimpleString("node3:6379".to_string()),
4686 Value::VerbatimString {
4687 format: ferriskey::value::VerbatimFormat::Text,
4688 text: body_node3.to_string(),
4689 },
4690 ),
4691 ]);
4692
4693 let bodies = extract_info_bodies(&map).unwrap();
4694 let min = bodies
4695 .iter()
4696 .map(|b| parse_valkey_version(b).unwrap())
4697 .min()
4698 .unwrap();
4699
4700 assert_eq!(min, (7, 1), "min across cluster must be the lowest node");
4701 assert!(
4702 min < (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4703 "mixed-version cluster with 7.1.0 node must fail the (7,2) gate"
4704 );
4705 }
4706
4707 #[test]
4711 fn parse_valkey_version_all_nodes_at_or_above_floor_accepts() {
4712 let body_node1 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:8.0.0\r\n";
4713 let body_node2 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:7.2.0\r\n";
4714 let body_node3 = "# Server\r\nredis_version:7.2.4\r\nserver_name:valkey\r\nvalkey_version:9.0.3\r\n";
4715 let map = Value::Map(vec![
4716 (
4717 Value::SimpleString("node1:6379".to_string()),
4718 Value::VerbatimString {
4719 format: ferriskey::value::VerbatimFormat::Text,
4720 text: body_node1.to_string(),
4721 },
4722 ),
4723 (
4724 Value::SimpleString("node2:6379".to_string()),
4725 Value::VerbatimString {
4726 format: ferriskey::value::VerbatimFormat::Text,
4727 text: body_node2.to_string(),
4728 },
4729 ),
4730 (
4731 Value::SimpleString("node3:6379".to_string()),
4732 Value::VerbatimString {
4733 format: ferriskey::value::VerbatimFormat::Text,
4734 text: body_node3.to_string(),
4735 },
4736 ),
4737 ]);
4738
4739 let bodies = extract_info_bodies(&map).unwrap();
4740 let min = bodies
4741 .iter()
4742 .map(|b| parse_valkey_version(b).unwrap())
4743 .min()
4744 .unwrap();
4745
4746 assert_eq!(min, (7, 2), "min across cluster is the lowest node (7.2)");
4747 assert!(
4748 min >= (REQUIRED_VALKEY_MAJOR, REQUIRED_VALKEY_MINOR),
4749 "all-above-floor cluster must pass the gate"
4750 );
4751 }
4752
4753 #[test]
4754 fn valkey_version_too_low_is_not_retryable() {
4755 let err = ServerError::ValkeyVersionTooLow {
4756 detected: "7.0".into(),
4757 required: "7.2".into(),
4758 };
4759 assert!(!err.is_retryable());
4760 assert_eq!(err.valkey_kind(), None);
4761 }
4762
4763 #[test]
4764 fn valkey_version_too_low_error_message_includes_both_versions() {
4765 let err = ServerError::ValkeyVersionTooLow {
4766 detected: "7.0".into(),
4767 required: "7.2".into(),
4768 };
4769 let msg = err.to_string();
4770 assert!(msg.contains("7.0"), "detected version in message: {msg}");
4771 assert!(msg.contains("7.2"), "required version in message: {msg}");
4772 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4773 }
4774}