1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use ferriskey::{Client, ClientBuilder, Value};
6use tokio::sync::Mutex as AsyncMutex;
7use tokio::task::JoinSet;
8use ff_core::contracts::{
9 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
10 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
11 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
12 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
13 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
14 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo, ExecutionSummary,
15 ListExecutionsResult, PendingWaitpointInfo, ReplayExecutionResult,
16 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
17 RevokeLeaseResult,
18 RotateWaitpointHmacSecretArgs,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::keys::{
22 self, usage_dedup_key, BudgetKeyContext, ExecKeyContext, FlowIndexKeys, FlowKeyContext,
23 IndexKeys, QuotaKeyContext,
24};
25use ff_core::partition::{
26 budget_partition, execution_partition, flow_partition, quota_partition, Partition,
27 PartitionConfig, PartitionFamily,
28};
29use ff_core::state::{PublicState, StateVector};
30use ff_core::types::*;
31use ff_engine::Engine;
32use ff_script::retry::is_retryable_kind;
33
34use crate::config::ServerConfig;
35
36const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
41
42pub struct Server {
47 client: Client,
48 tail_client: Client,
69 stream_semaphore: Arc<tokio::sync::Semaphore>,
82 xread_block_lock: Arc<tokio::sync::Mutex<()>>,
111 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
119 engine: Engine,
120 config: ServerConfig,
121 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
124}
125
126#[derive(Debug, thiserror::Error)]
128pub enum ServerError {
129 #[error("valkey: {0}")]
131 Valkey(#[from] ferriskey::Error),
132 #[error("valkey ({context}): {source}")]
134 ValkeyContext {
135 #[source]
136 source: ferriskey::Error,
137 context: String,
138 },
139 #[error("config: {0}")]
140 Config(#[from] crate::config::ConfigError),
141 #[error("library load: {0}")]
142 LibraryLoad(#[from] ff_script::loader::LoadError),
143 #[error("partition mismatch: {0}")]
144 PartitionMismatch(String),
145 #[error("not found: {0}")]
146 NotFound(String),
147 #[error("invalid input: {0}")]
148 InvalidInput(String),
149 #[error("operation failed: {0}")]
150 OperationFailed(String),
151 #[error("script: {0}")]
152 Script(String),
153 #[error("too many concurrent {0} calls (max: {1})")]
162 ConcurrencyLimitExceeded(&'static str, u32),
163 #[error(
168 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
169 )]
170 ValkeyVersionTooLow {
171 detected: String,
172 required: String,
173 },
174}
175
176impl ServerError {
177 pub fn valkey_kind(&self) -> Option<ferriskey::ErrorKind> {
181 match self {
182 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => Some(e.kind()),
183 Self::LibraryLoad(e) => e.valkey_kind(),
184 _ => None,
185 }
186 }
187
188 pub fn is_retryable(&self) -> bool {
194 match self {
195 Self::Valkey(e) | Self::ValkeyContext { source: e, .. } => {
196 is_retryable_kind(e.kind())
197 }
198 Self::LibraryLoad(load_err) => load_err
199 .valkey_kind()
200 .map(is_retryable_kind)
201 .unwrap_or(false),
202 Self::Config(_)
203 | Self::PartitionMismatch(_)
204 | Self::NotFound(_)
205 | Self::InvalidInput(_)
206 | Self::OperationFailed(_)
207 | Self::Script(_) => false,
208 Self::ConcurrencyLimitExceeded(_, _) => true,
212 Self::ValkeyVersionTooLow { .. } => false,
214 }
215 }
216}
217
218impl Server {
219 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
227 tracing::info!(
229 host = %config.host, port = config.port,
230 tls = config.tls, cluster = config.cluster,
231 "connecting to Valkey"
232 );
233 let mut builder = ClientBuilder::new()
234 .host(&config.host, config.port)
235 .connect_timeout(Duration::from_secs(10))
236 .request_timeout(Duration::from_millis(5000));
237 if config.tls {
238 builder = builder.tls();
239 }
240 if config.cluster {
241 builder = builder.cluster();
242 }
243 let client = builder
244 .build()
245 .await
246 .map_err(|e| ServerError::ValkeyContext { source: e, context: "connect".into() })?;
247
248 let pong: String = client
250 .cmd("PING")
251 .execute()
252 .await
253 .map_err(|e| ServerError::ValkeyContext { source: e, context: "PING".into() })?;
254 if pong != "PONG" {
255 return Err(ServerError::OperationFailed(format!(
256 "unexpected PING response: {pong}"
257 )));
258 }
259 tracing::info!("Valkey connection established");
260
261 verify_valkey_version(&client).await?;
266
267 validate_or_create_partition_config(&client, &config.partition_config).await?;
269
270 initialize_waitpoint_hmac_secret(
275 &client,
276 &config.partition_config,
277 &config.waitpoint_hmac_secret,
278 )
279 .await?;
280
281 if !config.skip_library_load {
283 tracing::info!("loading flowfabric Lua library");
284 ff_script::loader::ensure_library(&client)
285 .await
286 .map_err(ServerError::LibraryLoad)?;
287 } else {
288 tracing::info!("skipping library load (skip_library_load=true)");
289 }
290
291 let engine_cfg = ff_engine::EngineConfig {
294 partition_config: config.partition_config,
295 lanes: config.lanes.clone(),
296 lease_expiry_interval: config.engine_config.lease_expiry_interval,
297 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
298 index_reconciler_interval: config.engine_config.index_reconciler_interval,
299 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
300 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
301 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
302 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
303 budget_reset_interval: config.engine_config.budget_reset_interval,
304 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
305 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
306 unblock_interval: config.engine_config.unblock_interval,
307 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
308 completion_listener: Some(ff_engine::CompletionListenerConfig {
312 addresses: vec![(config.host.clone(), config.port)],
313 tls: config.tls,
314 cluster: config.cluster,
315 }),
316 flow_projector_interval: config.engine_config.flow_projector_interval,
317 execution_deadline_interval: config.engine_config.execution_deadline_interval,
318 };
319 let engine = Engine::start(engine_cfg, client.clone());
326
327 tracing::info!("opening dedicated tail connection");
333 let mut tail_builder = ClientBuilder::new()
334 .host(&config.host, config.port)
335 .connect_timeout(Duration::from_secs(10))
336 .request_timeout(Duration::from_millis(5000));
340 if config.tls {
341 tail_builder = tail_builder.tls();
342 }
343 if config.cluster {
344 tail_builder = tail_builder.cluster();
345 }
346 let tail_client = tail_builder
347 .build()
348 .await
349 .map_err(|e| ServerError::ValkeyContext {
350 source: e,
351 context: "connect (tail)".into(),
352 })?;
353 let tail_pong: String = tail_client
354 .cmd("PING")
355 .execute()
356 .await
357 .map_err(|e| ServerError::ValkeyContext {
358 source: e,
359 context: "PING (tail)".into(),
360 })?;
361 if tail_pong != "PONG" {
362 return Err(ServerError::OperationFailed(format!(
363 "tail client unexpected PING response: {tail_pong}"
364 )));
365 }
366
367 let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(
368 config.max_concurrent_stream_ops as usize,
369 ));
370 let xread_block_lock = Arc::new(tokio::sync::Mutex::new(()));
371 tracing::info!(
372 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
373 "stream-op client ready (read + tail share the semaphore; \
374 tails additionally serialize via xread_block_lock)"
375 );
376
377 if config.api_token.is_none() {
385 tracing::warn!(
386 listen_addr = %config.listen_addr,
387 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
388 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
389 FF_API_TOKEN for any deployment reachable from untrusted \
390 networks."
391 );
392 tracing::warn!(
398 listen_addr = %config.listen_addr,
399 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
400 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
401 and GET /v1/executions/{{id}}/result returns raw completion payload \
402 bytes (may contain PII). Both are UNAUTHENTICATED in this \
403 configuration."
404 );
405 }
406
407 tracing::info!(
412 flow_partitions = config.partition_config.num_flow_partitions,
413 budget_partitions = config.partition_config.num_budget_partitions,
414 quota_partitions = config.partition_config.num_quota_partitions,
415 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
416 listen_addr = %config.listen_addr,
417 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
418 config.partition_config.num_flow_partitions,
419 config.partition_config.num_budget_partitions,
420 config.partition_config.num_quota_partitions,
421 );
422
423 Ok(Self {
424 client,
425 tail_client,
426 stream_semaphore,
427 xread_block_lock,
428 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
433 engine,
434 config,
435 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
436 })
437 }
438
439 pub fn client(&self) -> &Client {
441 &self.client
442 }
443
444 async fn fcall_with_reload(
451 &self,
452 function: &str,
453 keys: &[&str],
454 args: &[&str],
455 ) -> Result<Value, ServerError> {
456 fcall_with_reload_on_client(&self.client, function, keys, args).await
457 }
458
459 pub fn config(&self) -> &ServerConfig {
461 &self.config
462 }
463
464 pub fn partition_config(&self) -> &PartitionConfig {
466 &self.config.partition_config
467 }
468
469 pub async fn create_execution(
475 &self,
476 args: &CreateExecutionArgs,
477 ) -> Result<CreateExecutionResult, ServerError> {
478 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
479 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
480 let idx = IndexKeys::new(&partition);
481
482 let lane = &args.lane_id;
483 let tag = partition.hash_tag();
484 let idem_key = match &args.idempotency_key {
485 Some(k) if !k.is_empty() => {
486 keys::idempotency_key(&tag, args.namespace.as_str(), k)
487 }
488 _ => ctx.noop(),
489 };
490
491 let delay_str = args
492 .delay_until
493 .map(|d| d.0.to_string())
494 .unwrap_or_default();
495 let is_delayed = !delay_str.is_empty();
496
497 let scheduling_zset = if is_delayed {
502 idx.lane_delayed(lane)
503 } else {
504 idx.lane_eligible(lane)
505 };
506
507 let fcall_keys: Vec<String> = vec![
508 ctx.core(), ctx.payload(), ctx.policy(), ctx.tags(), scheduling_zset, idem_key, idx.execution_deadline(), idx.all_executions(), ];
517
518 let tags_json = serde_json::to_string(&args.tags).unwrap_or_else(|_| "{}".to_owned());
519
520 let fcall_args: Vec<String> = vec![
526 args.execution_id.to_string(), args.namespace.to_string(), args.lane_id.to_string(), args.execution_kind.clone(), args.priority.to_string(), args.creator_identity.clone(), args.policy.as_ref()
533 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_owned()))
534 .unwrap_or_else(|| "{}".to_owned()), String::from_utf8_lossy(&args.input_payload).into_owned(), delay_str, args.idempotency_key.as_ref()
538 .map(|_| "86400000".to_string())
539 .unwrap_or_default(), tags_json, args.execution_deadline_at
542 .map(|d| d.to_string())
543 .unwrap_or_default(), args.partition_id.to_string(), ];
546
547 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
548 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
549
550 let raw: Value = self
551 .fcall_with_reload("ff_create_execution", &key_refs, &arg_refs)
552 .await?;
553
554 parse_create_result(&raw, &args.execution_id)
555 }
556
557 pub async fn cancel_execution(
559 &self,
560 args: &CancelExecutionArgs,
561 ) -> Result<CancelExecutionResult, ServerError> {
562 let raw = self
563 .fcall_cancel_execution_with_reload(args)
564 .await?;
565 parse_cancel_result(&raw, &args.execution_id)
566 }
567
568 async fn fcall_cancel_execution_with_reload(
572 &self,
573 args: &CancelExecutionArgs,
574 ) -> Result<Value, ServerError> {
575 let (keys, argv) = build_cancel_execution_fcall(
576 &self.client,
577 &self.config.partition_config,
578 args,
579 )
580 .await?;
581 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
582 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
583 self.fcall_with_reload("ff_cancel_execution", &key_refs, &arg_refs).await
584 }
585
586 pub async fn get_execution_state(
591 &self,
592 execution_id: &ExecutionId,
593 ) -> Result<PublicState, ServerError> {
594 let partition = execution_partition(execution_id, &self.config.partition_config);
595 let ctx = ExecKeyContext::new(&partition, execution_id);
596
597 let state_str: Option<String> = self
598 .client
599 .hget(&ctx.core(), "public_state")
600 .await
601 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET public_state".into() })?;
602
603 match state_str {
604 Some(s) => {
605 let quoted = format!("\"{s}\"");
606 serde_json::from_str("ed).map_err(|e| {
607 ServerError::Script(format!(
608 "invalid public_state '{s}' for {execution_id}: {e}"
609 ))
610 })
611 }
612 None => Err(ServerError::NotFound(format!(
613 "execution not found: {execution_id}"
614 ))),
615 }
616 }
617
618 pub async fn get_execution_result(
645 &self,
646 execution_id: &ExecutionId,
647 ) -> Result<Option<Vec<u8>>, ServerError> {
648 let partition = execution_partition(execution_id, &self.config.partition_config);
649 let ctx = ExecKeyContext::new(&partition, execution_id);
650
651 let payload: Option<Vec<u8>> = self
661 .client
662 .cmd("GET")
663 .arg(ctx.result())
664 .execute()
665 .await
666 .map_err(|e| ServerError::ValkeyContext {
667 source: e,
668 context: "GET exec result".into(),
669 })?;
670 Ok(payload)
671 }
672
673 pub async fn list_pending_waitpoints(
719 &self,
720 execution_id: &ExecutionId,
721 ) -> Result<Vec<PendingWaitpointInfo>, ServerError> {
722 let partition = execution_partition(execution_id, &self.config.partition_config);
723 let ctx = ExecKeyContext::new(&partition, execution_id);
724
725 let core_exists: bool = self
726 .client
727 .cmd("EXISTS")
728 .arg(ctx.core())
729 .execute()
730 .await
731 .map_err(|e| ServerError::ValkeyContext {
732 source: e,
733 context: "EXISTS exec_core (pending waitpoints)".into(),
734 })?;
735 if !core_exists {
736 return Err(ServerError::NotFound(format!(
737 "execution not found: {execution_id}"
738 )));
739 }
740
741 const WAITPOINTS_SSCAN_COUNT: usize = 100;
749 let waitpoints_key = ctx.waitpoints();
750 let mut wp_ids_raw: Vec<String> = Vec::new();
751 let mut cursor: String = "0".to_owned();
752 loop {
753 let reply: (String, Vec<String>) = self
754 .client
755 .cmd("SSCAN")
756 .arg(&waitpoints_key)
757 .arg(&cursor)
758 .arg("COUNT")
759 .arg(WAITPOINTS_SSCAN_COUNT.to_string().as_str())
760 .execute()
761 .await
762 .map_err(|e| ServerError::ValkeyContext {
763 source: e,
764 context: "SSCAN waitpoints".into(),
765 })?;
766 cursor = reply.0;
767 wp_ids_raw.extend(reply.1);
768 if cursor == "0" {
769 break;
770 }
771 }
772
773 wp_ids_raw.sort_unstable();
781 wp_ids_raw.dedup();
782
783 if wp_ids_raw.is_empty() {
784 return Ok(Vec::new());
785 }
786
787 let mut wp_ids: Vec<WaitpointId> = Vec::with_capacity(wp_ids_raw.len());
791 for raw in &wp_ids_raw {
792 match WaitpointId::parse(raw) {
793 Ok(id) => wp_ids.push(id),
794 Err(e) => tracing::warn!(
795 raw_id = %raw,
796 error = %e,
797 execution_id = %execution_id,
798 "list_pending_waitpoints: skipping unparseable waitpoint_id"
799 ),
800 }
801 }
802 if wp_ids.is_empty() {
803 return Ok(Vec::new());
804 }
805
806 const WP_FIELDS: [&str; 6] = [
810 "state",
811 "waitpoint_key",
812 "waitpoint_token",
813 "created_at",
814 "activated_at",
815 "expires_at",
816 ];
817
818 let mut pass1 = self.client.pipeline();
823 let mut wp_slots = Vec::with_capacity(wp_ids.len());
824 let mut cond_slots = Vec::with_capacity(wp_ids.len());
825 for wp_id in &wp_ids {
826 let mut cmd = pass1.cmd::<Vec<Option<String>>>("HMGET");
827 cmd = cmd.arg(ctx.waitpoint(wp_id));
828 for f in WP_FIELDS {
829 cmd = cmd.arg(f);
830 }
831 wp_slots.push(cmd.finish());
832
833 cond_slots.push(
834 pass1
835 .cmd::<Option<String>>("HGET")
836 .arg(ctx.waitpoint_condition(wp_id))
837 .arg("total_matchers")
838 .finish(),
839 );
840 }
841 pass1
842 .execute()
843 .await
844 .map_err(|e| ServerError::ValkeyContext {
845 source: e,
846 context: "pipeline HMGET waitpoints + HGET total_matchers".into(),
847 })?;
848
849 struct Kept {
855 wp_id: WaitpointId,
856 wp_fields: Vec<Option<String>>,
857 total_matchers: usize,
858 }
859 let mut kept: Vec<Kept> = Vec::with_capacity(wp_ids.len());
860 for ((wp_id, wp_slot), cond_slot) in wp_ids
861 .iter()
862 .zip(wp_slots)
863 .zip(cond_slots)
864 {
865 let wp_fields: Vec<Option<String>> =
866 wp_slot.value().map_err(|e| ServerError::ValkeyContext {
867 source: e,
868 context: format!("pipeline slot HMGET waitpoint {wp_id}"),
869 })?;
870
871 if wp_fields.iter().all(Option::is_none) {
874 let _ = cond_slot.value();
876 continue;
877 }
878 let state_ref = wp_fields
879 .first()
880 .and_then(|v| v.as_deref())
881 .unwrap_or("");
882 if state_ref != "pending" && state_ref != "active" {
883 let _ = cond_slot.value();
884 continue;
885 }
886 let token_ref = wp_fields
887 .get(2)
888 .and_then(|v| v.as_deref())
889 .unwrap_or("");
890 if token_ref.is_empty() {
891 let _ = cond_slot.value();
892 tracing::warn!(
893 waitpoint_id = %wp_id,
894 execution_id = %execution_id,
895 waitpoint_hash_key = %ctx.waitpoint(wp_id),
896 state = %state_ref,
897 "list_pending_waitpoints: waitpoint hash present but waitpoint_token \
898 field is empty — likely storage corruption (half-populated write, \
899 operator edit, or interrupted script). Skipping this entry in the \
900 response. HGETALL the waitpoint_hash_key to inspect."
901 );
902 continue;
903 }
904
905 let total_matchers = cond_slot
906 .value()
907 .map_err(|e| ServerError::ValkeyContext {
908 source: e,
909 context: format!("pipeline slot HGET total_matchers {wp_id}"),
910 })?
911 .and_then(|s| s.parse::<usize>().ok())
912 .unwrap_or(0);
913
914 kept.push(Kept {
915 wp_id: wp_id.clone(),
916 wp_fields,
917 total_matchers,
918 });
919 }
920
921 if kept.is_empty() {
922 return Ok(Vec::new());
923 }
924
925 let mut pass2 = self.client.pipeline();
930 let mut matcher_slots: Vec<Option<_>> = Vec::with_capacity(kept.len());
931 let mut pass2_needed = false;
932 for k in &kept {
933 if k.total_matchers == 0 {
934 matcher_slots.push(None);
935 continue;
936 }
937 pass2_needed = true;
938 let mut cmd = pass2.cmd::<Vec<Option<String>>>("HMGET");
939 cmd = cmd.arg(ctx.waitpoint_condition(&k.wp_id));
940 for i in 0..k.total_matchers {
941 cmd = cmd.arg(format!("matcher:{i}:name"));
942 }
943 matcher_slots.push(Some(cmd.finish()));
944 }
945 if pass2_needed {
946 pass2.execute().await.map_err(|e| ServerError::ValkeyContext {
947 source: e,
948 context: "pipeline HMGET wp_condition matchers".into(),
949 })?;
950 }
951
952 let parse_ts = |raw: &str| -> Option<TimestampMs> {
953 if raw.is_empty() {
954 None
955 } else {
956 raw.parse::<i64>().ok().map(TimestampMs)
957 }
958 };
959
960 let mut out: Vec<PendingWaitpointInfo> = Vec::with_capacity(kept.len());
961 for (k, slot) in kept.into_iter().zip(matcher_slots) {
962 let get = |i: usize| -> &str {
963 k.wp_fields.get(i).and_then(|v| v.as_deref()).unwrap_or("")
964 };
965
966 let required_signal_names: Vec<String> = match slot {
969 None => Vec::new(),
970 Some(s) => {
971 let vals: Vec<Option<String>> =
972 s.value().map_err(|e| ServerError::ValkeyContext {
973 source: e,
974 context: format!(
975 "pipeline slot HMGET wp_condition matchers {}",
976 k.wp_id
977 ),
978 })?;
979 vals.into_iter()
980 .flatten()
981 .filter(|name| !name.is_empty())
982 .collect()
983 }
984 };
985
986 out.push(PendingWaitpointInfo {
987 waitpoint_id: k.wp_id,
988 waitpoint_key: get(1).to_owned(),
989 state: get(0).to_owned(),
990 waitpoint_token: WaitpointToken(get(2).to_owned()),
991 required_signal_names,
992 created_at: parse_ts(get(3)).unwrap_or(TimestampMs(0)),
993 activated_at: parse_ts(get(4)),
994 expires_at: parse_ts(get(5)),
995 });
996 }
997
998 Ok(out)
999 }
1000
1001 pub async fn create_budget(
1005 &self,
1006 args: &CreateBudgetArgs,
1007 ) -> Result<CreateBudgetResult, ServerError> {
1008 let partition = budget_partition(&args.budget_id, &self.config.partition_config);
1009 let bctx = BudgetKeyContext::new(&partition, &args.budget_id);
1010 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1011 let policies_index = keys::budget_policies_index(bctx.hash_tag());
1012
1013 let fcall_keys: Vec<String> = vec![
1016 bctx.definition(),
1017 bctx.limits(),
1018 bctx.usage(),
1019 resets_key,
1020 policies_index,
1021 ];
1022
1023 let dim_count = args.dimensions.len();
1027 let mut fcall_args: Vec<String> = Vec::with_capacity(9 + dim_count * 3);
1028 fcall_args.push(args.budget_id.to_string());
1029 fcall_args.push(args.scope_type.clone());
1030 fcall_args.push(args.scope_id.clone());
1031 fcall_args.push(args.enforcement_mode.clone());
1032 fcall_args.push(args.on_hard_limit.clone());
1033 fcall_args.push(args.on_soft_limit.clone());
1034 fcall_args.push(args.reset_interval_ms.to_string());
1035 fcall_args.push(args.now.to_string());
1036 fcall_args.push(dim_count.to_string());
1037 for dim in &args.dimensions {
1038 fcall_args.push(dim.clone());
1039 }
1040 for hard in &args.hard_limits {
1041 fcall_args.push(hard.to_string());
1042 }
1043 for soft in &args.soft_limits {
1044 fcall_args.push(soft.to_string());
1045 }
1046
1047 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1048 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1049
1050 let raw: Value = self
1051 .fcall_with_reload("ff_create_budget", &key_refs, &arg_refs)
1052 .await?;
1053
1054 parse_budget_create_result(&raw, &args.budget_id)
1055 }
1056
1057 pub async fn create_quota_policy(
1059 &self,
1060 args: &CreateQuotaPolicyArgs,
1061 ) -> Result<CreateQuotaPolicyResult, ServerError> {
1062 let partition = quota_partition(&args.quota_policy_id, &self.config.partition_config);
1063 let qctx = QuotaKeyContext::new(&partition, &args.quota_policy_id);
1064
1065 let fcall_keys: Vec<String> = vec![
1068 qctx.definition(),
1069 qctx.window("requests_per_window"),
1070 qctx.concurrency(),
1071 qctx.admitted_set(),
1072 keys::quota_policies_index(qctx.hash_tag()),
1073 ];
1074
1075 let fcall_args: Vec<String> = vec![
1078 args.quota_policy_id.to_string(),
1079 args.window_seconds.to_string(),
1080 args.max_requests_per_window.to_string(),
1081 args.max_concurrent.to_string(),
1082 args.now.to_string(),
1083 ];
1084
1085 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1086 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1087
1088 let raw: Value = self
1089 .fcall_with_reload("ff_create_quota_policy", &key_refs, &arg_refs)
1090 .await?;
1091
1092 parse_quota_create_result(&raw, &args.quota_policy_id)
1093 }
1094
1095 pub async fn get_budget_status(
1097 &self,
1098 budget_id: &BudgetId,
1099 ) -> Result<BudgetStatus, ServerError> {
1100 let partition = budget_partition(budget_id, &self.config.partition_config);
1101 let bctx = BudgetKeyContext::new(&partition, budget_id);
1102
1103 let def: HashMap<String, String> = self
1105 .client
1106 .hgetall(&bctx.definition())
1107 .await
1108 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_def".into() })?;
1109
1110 if def.is_empty() {
1111 return Err(ServerError::NotFound(format!(
1112 "budget not found: {budget_id}"
1113 )));
1114 }
1115
1116 let usage_raw: HashMap<String, String> = self
1118 .client
1119 .hgetall(&bctx.usage())
1120 .await
1121 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_usage".into() })?;
1122 let usage: HashMap<String, u64> = usage_raw
1123 .into_iter()
1124 .filter(|(k, _)| k != "_init")
1125 .map(|(k, v)| (k, v.parse().unwrap_or(0)))
1126 .collect();
1127
1128 let limits_raw: HashMap<String, String> = self
1130 .client
1131 .hgetall(&bctx.limits())
1132 .await
1133 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL budget_limits".into() })?;
1134 let mut hard_limits = HashMap::new();
1135 let mut soft_limits = HashMap::new();
1136 for (k, v) in &limits_raw {
1137 if let Some(dim) = k.strip_prefix("hard:") {
1138 hard_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1139 } else if let Some(dim) = k.strip_prefix("soft:") {
1140 soft_limits.insert(dim.to_string(), v.parse().unwrap_or(0));
1141 }
1142 }
1143
1144 let non_empty = |s: Option<&String>| -> Option<String> {
1145 s.filter(|v| !v.is_empty()).cloned()
1146 };
1147
1148 Ok(BudgetStatus {
1149 budget_id: budget_id.to_string(),
1150 scope_type: def.get("scope_type").cloned().unwrap_or_default(),
1151 scope_id: def.get("scope_id").cloned().unwrap_or_default(),
1152 enforcement_mode: def.get("enforcement_mode").cloned().unwrap_or_default(),
1153 usage,
1154 hard_limits,
1155 soft_limits,
1156 breach_count: def
1157 .get("breach_count")
1158 .and_then(|v| v.parse().ok())
1159 .unwrap_or(0),
1160 soft_breach_count: def
1161 .get("soft_breach_count")
1162 .and_then(|v| v.parse().ok())
1163 .unwrap_or(0),
1164 last_breach_at: non_empty(def.get("last_breach_at")),
1165 last_breach_dim: non_empty(def.get("last_breach_dim")),
1166 next_reset_at: non_empty(def.get("next_reset_at")),
1167 created_at: non_empty(def.get("created_at")),
1168 })
1169 }
1170
1171 pub async fn report_usage(
1173 &self,
1174 budget_id: &BudgetId,
1175 args: &ReportUsageArgs,
1176 ) -> Result<ReportUsageResult, ServerError> {
1177 let partition = budget_partition(budget_id, &self.config.partition_config);
1178 let bctx = BudgetKeyContext::new(&partition, budget_id);
1179
1180 let fcall_keys: Vec<String> = vec![bctx.usage(), bctx.limits(), bctx.definition()];
1182
1183 let dim_count = args.dimensions.len();
1185 let mut fcall_args: Vec<String> = Vec::with_capacity(3 + dim_count * 2);
1186 fcall_args.push(dim_count.to_string());
1187 for dim in &args.dimensions {
1188 fcall_args.push(dim.clone());
1189 }
1190 for delta in &args.deltas {
1191 fcall_args.push(delta.to_string());
1192 }
1193 fcall_args.push(args.now.to_string());
1194 let dedup_key_val = args
1195 .dedup_key
1196 .as_ref()
1197 .filter(|k| !k.is_empty())
1198 .map(|k| usage_dedup_key(bctx.hash_tag(), k))
1199 .unwrap_or_default();
1200 fcall_args.push(dedup_key_val);
1201
1202 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1203 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1204
1205 let raw: Value = self
1206 .fcall_with_reload("ff_report_usage_and_check", &key_refs, &arg_refs)
1207 .await?;
1208
1209 parse_report_usage_result(&raw)
1210 }
1211
1212 pub async fn reset_budget(
1214 &self,
1215 budget_id: &BudgetId,
1216 ) -> Result<ResetBudgetResult, ServerError> {
1217 let partition = budget_partition(budget_id, &self.config.partition_config);
1218 let bctx = BudgetKeyContext::new(&partition, budget_id);
1219 let resets_key = keys::budget_resets_key(bctx.hash_tag());
1220
1221 let fcall_keys: Vec<String> = vec![bctx.definition(), bctx.usage(), resets_key];
1223
1224 let now = TimestampMs::now();
1226 let fcall_args: Vec<String> = vec![budget_id.to_string(), now.to_string()];
1227
1228 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1229 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1230
1231 let raw: Value = self
1232 .fcall_with_reload("ff_reset_budget", &key_refs, &arg_refs)
1233 .await?;
1234
1235 parse_reset_budget_result(&raw)
1236 }
1237
1238 pub async fn create_flow(
1242 &self,
1243 args: &CreateFlowArgs,
1244 ) -> Result<CreateFlowResult, ServerError> {
1245 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1246 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1247 let fidx = FlowIndexKeys::new(&partition);
1248
1249 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1251
1252 let fcall_args: Vec<String> = vec![
1254 args.flow_id.to_string(),
1255 args.flow_kind.clone(),
1256 args.namespace.to_string(),
1257 args.now.to_string(),
1258 ];
1259
1260 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1261 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1262
1263 let raw: Value = self
1264 .fcall_with_reload("ff_create_flow", &key_refs, &arg_refs)
1265 .await?;
1266
1267 parse_create_flow_result(&raw, &args.flow_id)
1268 }
1269
1270 pub async fn add_execution_to_flow(
1308 &self,
1309 args: &AddExecutionToFlowArgs,
1310 ) -> Result<AddExecutionToFlowResult, ServerError> {
1311 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1312 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1313 let fidx = FlowIndexKeys::new(&partition);
1314
1315 let exec_partition =
1319 execution_partition(&args.execution_id, &self.config.partition_config);
1320 let ectx = ExecKeyContext::new(&exec_partition, &args.execution_id);
1321
1322 if exec_partition.index != partition.index {
1331 return Err(ServerError::PartitionMismatch(format!(
1332 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
1333 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
1334 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
1335 exec_p = exec_partition.index,
1336 flow_p = partition.index,
1337 )));
1338 }
1339
1340 let fcall_keys: Vec<String> = vec![
1342 fctx.core(),
1343 fctx.members(),
1344 fidx.flow_index(),
1345 ectx.core(),
1346 ];
1347
1348 let fcall_args: Vec<String> = vec![
1350 args.flow_id.to_string(),
1351 args.execution_id.to_string(),
1352 args.now.to_string(),
1353 ];
1354
1355 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1356 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1357
1358 let raw: Value = self
1359 .fcall_with_reload("ff_add_execution_to_flow", &key_refs, &arg_refs)
1360 .await?;
1361
1362 parse_add_execution_to_flow_result(&raw)
1363 }
1364
1365 pub async fn cancel_flow(
1407 &self,
1408 args: &CancelFlowArgs,
1409 ) -> Result<CancelFlowResult, ServerError> {
1410 self.cancel_flow_inner(args, false).await
1411 }
1412
1413 pub async fn cancel_flow_wait(
1417 &self,
1418 args: &CancelFlowArgs,
1419 ) -> Result<CancelFlowResult, ServerError> {
1420 self.cancel_flow_inner(args, true).await
1421 }
1422
1423 async fn cancel_flow_inner(
1424 &self,
1425 args: &CancelFlowArgs,
1426 wait: bool,
1427 ) -> Result<CancelFlowResult, ServerError> {
1428 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1429 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1430 let fidx = FlowIndexKeys::new(&partition);
1431
1432 let fcall_keys: Vec<String> = vec![fctx.core(), fctx.members(), fidx.flow_index()];
1434
1435 let fcall_args: Vec<String> = vec![
1437 args.flow_id.to_string(),
1438 args.reason.clone(),
1439 args.cancellation_policy.clone(),
1440 args.now.to_string(),
1441 ];
1442
1443 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1444 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1445
1446 let raw: Value = self
1447 .fcall_with_reload("ff_cancel_flow", &key_refs, &arg_refs)
1448 .await?;
1449
1450 let (policy, members) = match parse_cancel_flow_raw(&raw)? {
1451 ParsedCancelFlow::Cancelled { policy, member_execution_ids } => {
1452 (policy, member_execution_ids)
1453 }
1454 ParsedCancelFlow::AlreadyTerminal => {
1460 let flow_meta: Vec<Option<String>> = self
1461 .client
1462 .cmd("HMGET")
1463 .arg(fctx.core())
1464 .arg("cancellation_policy")
1465 .arg("cancel_reason")
1466 .execute()
1467 .await
1468 .map_err(|e| ServerError::ValkeyContext {
1469 source: e,
1470 context: "HMGET flow_core cancellation_policy,cancel_reason".into(),
1471 })?;
1472 let stored_policy = flow_meta
1473 .first()
1474 .and_then(|v| v.as_ref())
1475 .filter(|s| !s.is_empty())
1476 .cloned();
1477 let stored_reason = flow_meta
1478 .get(1)
1479 .and_then(|v| v.as_ref())
1480 .filter(|s| !s.is_empty())
1481 .cloned();
1482 let all_members: Vec<String> = self
1483 .client
1484 .cmd("SMEMBERS")
1485 .arg(fctx.members())
1486 .execute()
1487 .await
1488 .map_err(|e| ServerError::ValkeyContext {
1489 source: e,
1490 context: "SMEMBERS flow members (already terminal)".into(),
1491 })?;
1492 let total_members = all_members.len();
1499 let stored_members: Vec<String> = all_members
1500 .into_iter()
1501 .take(ALREADY_TERMINAL_MEMBER_CAP)
1502 .collect();
1503 tracing::debug!(
1504 flow_id = %args.flow_id,
1505 stored_policy = stored_policy.as_deref().unwrap_or(""),
1506 stored_reason = stored_reason.as_deref().unwrap_or(""),
1507 total_members,
1508 returned_members = stored_members.len(),
1509 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1510 );
1511 return Ok(CancelFlowResult::Cancelled {
1512 cancellation_policy: stored_policy
1516 .unwrap_or_else(|| args.cancellation_policy.clone()),
1517 member_execution_ids: stored_members,
1518 });
1519 }
1520 };
1521 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1522
1523 if !needs_dispatch {
1524 return Ok(CancelFlowResult::Cancelled {
1525 cancellation_policy: policy,
1526 member_execution_ids: members,
1527 });
1528 }
1529
1530 if wait {
1531 for eid_str in &members {
1533 if let Err(e) = cancel_member_execution(
1534 &self.client,
1535 &self.config.partition_config,
1536 eid_str,
1537 &args.reason,
1538 args.now,
1539 )
1540 .await
1541 {
1542 tracing::warn!(
1543 execution_id = %eid_str,
1544 error = %e,
1545 "cancel_flow(wait): individual execution cancel failed (may be terminal)"
1546 );
1547 }
1548 }
1549 return Ok(CancelFlowResult::Cancelled {
1550 cancellation_policy: policy,
1551 member_execution_ids: members,
1552 });
1553 }
1554
1555 let client = self.client.clone();
1558 let partition_config = self.config.partition_config;
1559 let reason = args.reason.clone();
1560 let now = args.now;
1561 let dispatch_members = members.clone();
1562 let flow_id = args.flow_id.clone();
1563 let mut guard = self.background_tasks.lock().await;
1569
1570 while let Some(joined) = guard.try_join_next() {
1577 if let Err(e) = joined {
1578 tracing::warn!(
1579 error = %e,
1580 "cancel_flow: background dispatch task panicked or was aborted"
1581 );
1582 }
1583 }
1584
1585 guard.spawn(async move {
1586 use futures::stream::StreamExt;
1593 const CONCURRENCY: usize = 16;
1594
1595 let member_count = dispatch_members.len();
1596 let flow_id_for_log = flow_id.clone();
1597 futures::stream::iter(dispatch_members)
1598 .map(|eid_str| {
1599 let client = client.clone();
1600 let reason = reason.clone();
1601 let flow_id = flow_id.clone();
1602 async move {
1603 if let Err(e) = cancel_member_execution(
1604 &client,
1605 &partition_config,
1606 &eid_str,
1607 &reason,
1608 now,
1609 )
1610 .await
1611 {
1612 tracing::warn!(
1613 flow_id = %flow_id,
1614 execution_id = %eid_str,
1615 error = %e,
1616 "cancel_flow(async): individual execution cancel failed (may be terminal)"
1617 );
1618 }
1619 }
1620 })
1621 .buffer_unordered(CONCURRENCY)
1622 .for_each(|()| async {})
1623 .await;
1624
1625 tracing::debug!(
1626 flow_id = %flow_id_for_log,
1627 member_count,
1628 concurrency = CONCURRENCY,
1629 "cancel_flow: background member dispatch complete"
1630 );
1631 });
1632 drop(guard);
1633
1634 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1635 Ok(CancelFlowResult::CancellationScheduled {
1636 cancellation_policy: policy,
1637 member_count,
1638 member_execution_ids: members,
1639 })
1640 }
1641
1642 pub async fn stage_dependency_edge(
1647 &self,
1648 args: &StageDependencyEdgeArgs,
1649 ) -> Result<StageDependencyEdgeResult, ServerError> {
1650 let partition = flow_partition(&args.flow_id, &self.config.partition_config);
1651 let fctx = FlowKeyContext::new(&partition, &args.flow_id);
1652
1653 let fcall_keys: Vec<String> = vec![
1655 fctx.core(),
1656 fctx.members(),
1657 fctx.edge(&args.edge_id),
1658 fctx.outgoing(&args.upstream_execution_id),
1659 fctx.incoming(&args.downstream_execution_id),
1660 fctx.grant(&args.edge_id.to_string()),
1661 ];
1662
1663 let fcall_args: Vec<String> = vec![
1666 args.flow_id.to_string(),
1667 args.edge_id.to_string(),
1668 args.upstream_execution_id.to_string(),
1669 args.downstream_execution_id.to_string(),
1670 args.dependency_kind.clone(),
1671 args.data_passing_ref.clone().unwrap_or_default(),
1672 args.expected_graph_revision.to_string(),
1673 args.now.to_string(),
1674 ];
1675
1676 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1677 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1678
1679 let raw: Value = self
1680 .fcall_with_reload("ff_stage_dependency_edge", &key_refs, &arg_refs)
1681 .await?;
1682
1683 parse_stage_dependency_edge_result(&raw)
1684 }
1685
1686 pub async fn apply_dependency_to_child(
1691 &self,
1692 args: &ApplyDependencyToChildArgs,
1693 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1694 let partition = execution_partition(
1695 &args.downstream_execution_id,
1696 &self.config.partition_config,
1697 );
1698 let ctx = ExecKeyContext::new(&partition, &args.downstream_execution_id);
1699 let idx = IndexKeys::new(&partition);
1700
1701 let lane_str: Option<String> = self
1703 .client
1704 .hget(&ctx.core(), "lane_id")
1705 .await
1706 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1707 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1708
1709 let fcall_keys: Vec<String> = vec![
1712 ctx.core(),
1713 ctx.deps_meta(),
1714 ctx.deps_unresolved(),
1715 ctx.dep_edge(&args.edge_id),
1716 idx.lane_eligible(&lane),
1717 idx.lane_blocked_dependencies(&lane),
1718 ctx.deps_all_edges(),
1719 ];
1720
1721 let fcall_args: Vec<String> = vec![
1724 args.flow_id.to_string(),
1725 args.edge_id.to_string(),
1726 args.upstream_execution_id.to_string(),
1727 args.graph_revision.to_string(),
1728 args.dependency_kind.clone(),
1729 args.data_passing_ref.clone().unwrap_or_default(),
1730 args.now.to_string(),
1731 ];
1732
1733 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1734 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1735
1736 let raw: Value = self
1737 .fcall_with_reload("ff_apply_dependency_to_child", &key_refs, &arg_refs)
1738 .await?;
1739
1740 parse_apply_dependency_result(&raw)
1741 }
1742
1743 pub async fn deliver_signal(
1750 &self,
1751 args: &DeliverSignalArgs,
1752 ) -> Result<DeliverSignalResult, ServerError> {
1753 let partition = execution_partition(&args.execution_id, &self.config.partition_config);
1754 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
1755 let idx = IndexKeys::new(&partition);
1756
1757 let lane_str: Option<String> = self
1759 .client
1760 .hget(&ctx.core(), "lane_id")
1761 .await
1762 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1763 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1764
1765 let wp_id = &args.waitpoint_id;
1766 let sig_id = &args.signal_id;
1767 let idem_key = args
1768 .idempotency_key
1769 .as_ref()
1770 .filter(|k| !k.is_empty())
1771 .map(|k| ctx.signal_dedup(wp_id, k))
1772 .unwrap_or_else(|| ctx.noop());
1773
1774 let fcall_keys: Vec<String> = vec![
1780 ctx.core(), ctx.waitpoint_condition(wp_id), ctx.waitpoint_signals(wp_id), ctx.exec_signals(), ctx.signal(sig_id), ctx.signal_payload(sig_id), idem_key, ctx.waitpoint(wp_id), ctx.suspension_current(), idx.lane_eligible(&lane), idx.lane_suspended(&lane), idx.lane_delayed(&lane), idx.suspension_timeout(), idx.waitpoint_hmac_secrets(), ];
1795
1796 let fcall_args: Vec<String> = vec![
1803 args.signal_id.to_string(), args.execution_id.to_string(), args.waitpoint_id.to_string(), args.signal_name.clone(), args.signal_category.clone(), args.source_type.clone(), args.source_identity.clone(), args.payload.as_ref()
1811 .map(|p| String::from_utf8_lossy(p).into_owned())
1812 .unwrap_or_default(), args.payload_encoding
1814 .clone()
1815 .unwrap_or_else(|| "json".to_owned()), args.idempotency_key
1817 .clone()
1818 .unwrap_or_default(), args.correlation_id
1820 .clone()
1821 .unwrap_or_default(), args.target_scope.clone(), args.created_at
1824 .map(|ts| ts.to_string())
1825 .unwrap_or_else(|| args.now.to_string()), args.dedup_ttl_ms.unwrap_or(86_400_000).to_string(), args.resume_delay_ms.unwrap_or(0).to_string(), args.signal_maxlen.unwrap_or(1000).to_string(), args.max_signals_per_execution
1830 .unwrap_or(10_000)
1831 .to_string(), args.waitpoint_token.as_str().to_owned(), ];
1836
1837 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1838 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1839
1840 let raw: Value = self
1841 .fcall_with_reload("ff_deliver_signal", &key_refs, &arg_refs)
1842 .await?;
1843
1844 parse_deliver_signal_result(&raw, &args.signal_id)
1845 }
1846
1847 pub async fn change_priority(
1851 &self,
1852 execution_id: &ExecutionId,
1853 new_priority: i32,
1854 ) -> Result<ChangePriorityResult, ServerError> {
1855 let partition = execution_partition(execution_id, &self.config.partition_config);
1856 let ctx = ExecKeyContext::new(&partition, execution_id);
1857 let idx = IndexKeys::new(&partition);
1858
1859 let lane_str: Option<String> = self
1861 .client
1862 .hget(&ctx.core(), "lane_id")
1863 .await
1864 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
1865 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
1866
1867 let fcall_keys: Vec<String> = vec![ctx.core(), idx.lane_eligible(&lane)];
1869
1870 let fcall_args: Vec<String> = vec![
1872 execution_id.to_string(),
1873 new_priority.to_string(),
1874 ];
1875
1876 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1877 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1878
1879 let raw: Value = self
1880 .fcall_with_reload("ff_change_priority", &key_refs, &arg_refs)
1881 .await?;
1882
1883 parse_change_priority_result(&raw, execution_id)
1884 }
1885
1886 pub async fn claim_for_worker(
1900 &self,
1901 lane: &LaneId,
1902 worker_id: &WorkerId,
1903 worker_instance_id: &WorkerInstanceId,
1904 worker_capabilities: &std::collections::BTreeSet<String>,
1905 grant_ttl_ms: u64,
1906 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1907 let scheduler = ff_scheduler::Scheduler::new(
1908 self.client.clone(),
1909 self.config.partition_config,
1910 );
1911 scheduler
1912 .claim_for_worker(
1913 lane,
1914 worker_id,
1915 worker_instance_id,
1916 worker_capabilities,
1917 grant_ttl_ms,
1918 )
1919 .await
1920 .map_err(|e| match e {
1921 ff_scheduler::SchedulerError::Valkey(inner) => {
1922 ServerError::Valkey(inner)
1923 }
1924 ff_scheduler::SchedulerError::ValkeyContext { source, context } => {
1925 ServerError::ValkeyContext { source, context }
1926 }
1927 ff_scheduler::SchedulerError::Config(msg) => {
1928 ServerError::InvalidInput(msg)
1929 }
1930 })
1931 }
1932
1933 pub async fn revoke_lease(
1935 &self,
1936 execution_id: &ExecutionId,
1937 ) -> Result<RevokeLeaseResult, ServerError> {
1938 let partition = execution_partition(execution_id, &self.config.partition_config);
1939 let ctx = ExecKeyContext::new(&partition, execution_id);
1940 let idx = IndexKeys::new(&partition);
1941
1942 let wiid_str: Option<String> = self
1944 .client
1945 .hget(&ctx.core(), "current_worker_instance_id")
1946 .await
1947 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET worker_instance_id".into() })?;
1948 let wiid = match wiid_str {
1949 Some(ref s) if !s.is_empty() => WorkerInstanceId::new(s),
1950 _ => {
1951 return Err(ServerError::NotFound(format!(
1952 "no active lease for execution {execution_id} (no current_worker_instance_id)"
1953 )));
1954 }
1955 };
1956
1957 let fcall_keys: Vec<String> = vec![
1959 ctx.core(),
1960 ctx.lease_current(),
1961 ctx.lease_history(),
1962 idx.lease_expiry(),
1963 idx.worker_leases(&wiid),
1964 ];
1965
1966 let fcall_args: Vec<String> = vec![
1968 execution_id.to_string(),
1969 String::new(), "operator_revoke".to_owned(),
1971 ];
1972
1973 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
1974 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
1975
1976 let raw: Value = self
1977 .fcall_with_reload("ff_revoke_lease", &key_refs, &arg_refs)
1978 .await?;
1979
1980 parse_revoke_lease_result(&raw)
1981 }
1982
1983 pub async fn get_execution(
1985 &self,
1986 execution_id: &ExecutionId,
1987 ) -> Result<ExecutionInfo, ServerError> {
1988 let partition = execution_partition(execution_id, &self.config.partition_config);
1989 let ctx = ExecKeyContext::new(&partition, execution_id);
1990
1991 let fields: HashMap<String, String> = self
1992 .client
1993 .hgetall(&ctx.core())
1994 .await
1995 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGETALL exec_core".into() })?;
1996
1997 if fields.is_empty() {
1998 return Err(ServerError::NotFound(format!(
1999 "execution not found: {execution_id}"
2000 )));
2001 }
2002
2003 let parse_enum = |field: &str| -> String {
2004 fields.get(field).cloned().unwrap_or_default()
2005 };
2006 fn deserialize<T: serde::de::DeserializeOwned>(field: &str, raw: &str) -> Result<T, ServerError> {
2007 let quoted = format!("\"{raw}\"");
2008 serde_json::from_str("ed).map_err(|e| {
2009 ServerError::Script(format!("invalid {field} '{raw}': {e}"))
2010 })
2011 }
2012
2013 let lp_str = parse_enum("lifecycle_phase");
2014 let os_str = parse_enum("ownership_state");
2015 let es_str = parse_enum("eligibility_state");
2016 let br_str = parse_enum("blocking_reason");
2017 let to_str = parse_enum("terminal_outcome");
2018 let as_str = parse_enum("attempt_state");
2019 let ps_str = parse_enum("public_state");
2020
2021 let state_vector = StateVector {
2022 lifecycle_phase: deserialize("lifecycle_phase", &lp_str)?,
2023 ownership_state: deserialize("ownership_state", &os_str)?,
2024 eligibility_state: deserialize("eligibility_state", &es_str)?,
2025 blocking_reason: deserialize("blocking_reason", &br_str)?,
2026 terminal_outcome: deserialize("terminal_outcome", &to_str)?,
2027 attempt_state: deserialize("attempt_state", &as_str)?,
2028 public_state: deserialize("public_state", &ps_str)?,
2029 };
2030
2031 let flow_id_val = fields.get("flow_id").filter(|s| !s.is_empty()).cloned();
2038
2039 let started_at_opt = fields
2046 .get("started_at")
2047 .filter(|s| !s.is_empty())
2048 .cloned();
2049 let completed_at_opt = fields
2050 .get("completed_at")
2051 .filter(|s| !s.is_empty())
2052 .cloned();
2053
2054 Ok(ExecutionInfo {
2055 execution_id: execution_id.clone(),
2056 namespace: parse_enum("namespace"),
2057 lane_id: parse_enum("lane_id"),
2058 priority: fields
2059 .get("priority")
2060 .and_then(|v| v.parse().ok())
2061 .unwrap_or(0),
2062 execution_kind: parse_enum("execution_kind"),
2063 state_vector,
2064 public_state: deserialize("public_state", &ps_str)?,
2065 created_at: parse_enum("created_at"),
2066 started_at: started_at_opt,
2067 completed_at: completed_at_opt,
2068 current_attempt_index: fields
2069 .get("current_attempt_index")
2070 .and_then(|v| v.parse().ok())
2071 .unwrap_or(0),
2072 flow_id: flow_id_val,
2073 blocking_detail: parse_enum("blocking_detail"),
2074 })
2075 }
2076
2077 pub async fn list_executions(
2081 &self,
2082 partition_id: u16,
2083 lane: &LaneId,
2084 state_filter: &str,
2085 offset: u64,
2086 limit: u64,
2087 ) -> Result<ListExecutionsResult, ServerError> {
2088 let partition = ff_core::partition::Partition {
2089 family: ff_core::partition::PartitionFamily::Execution,
2090 index: partition_id,
2091 };
2092 let idx = IndexKeys::new(&partition);
2093
2094 let zset_key = match state_filter {
2095 "eligible" => idx.lane_eligible(lane),
2096 "delayed" => idx.lane_delayed(lane),
2097 "terminal" => idx.lane_terminal(lane),
2098 "suspended" => idx.lane_suspended(lane),
2099 "active" => idx.lane_active(lane),
2100 other => {
2101 return Err(ServerError::InvalidInput(format!(
2102 "invalid state_filter: {other}. Use: eligible, delayed, terminal, suspended, active"
2103 )));
2104 }
2105 };
2106
2107 let eids: Vec<String> = self
2109 .client
2110 .cmd("ZRANGE")
2111 .arg(&zset_key)
2112 .arg("-inf")
2113 .arg("+inf")
2114 .arg("BYSCORE")
2115 .arg("LIMIT")
2116 .arg(offset)
2117 .arg(limit)
2118 .execute()
2119 .await
2120 .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("ZRANGE {zset_key}") })?;
2121
2122 if eids.is_empty() {
2123 return Ok(ListExecutionsResult {
2124 executions: vec![],
2125 total_returned: 0,
2126 });
2127 }
2128
2129 let mut parsed = Vec::with_capacity(eids.len());
2131 for eid_str in &eids {
2132 match ExecutionId::parse(eid_str) {
2133 Ok(id) => parsed.push(id),
2134 Err(e) => {
2135 tracing::warn!(
2136 raw_id = %eid_str,
2137 error = %e,
2138 zset = %zset_key,
2139 "list_executions: ZSET member failed to parse as ExecutionId (data corruption?)"
2140 );
2141 }
2142 }
2143 }
2144
2145 if parsed.is_empty() {
2146 return Ok(ListExecutionsResult {
2147 executions: vec![],
2148 total_returned: 0,
2149 });
2150 }
2151
2152 let mut pipe = self.client.pipeline();
2154 let mut slots = Vec::with_capacity(parsed.len());
2155 for eid in &parsed {
2156 let ep = execution_partition(eid, &self.config.partition_config);
2157 let ctx = ExecKeyContext::new(&ep, eid);
2158 let slot = pipe
2159 .cmd::<Vec<Option<String>>>("HMGET")
2160 .arg(ctx.core())
2161 .arg("namespace")
2162 .arg("lane_id")
2163 .arg("execution_kind")
2164 .arg("public_state")
2165 .arg("priority")
2166 .arg("created_at")
2167 .finish();
2168 slots.push(slot);
2169 }
2170
2171 pipe.execute()
2172 .await
2173 .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline HMGET".into() })?;
2174
2175 let mut summaries = Vec::with_capacity(parsed.len());
2176 for (eid, slot) in parsed.into_iter().zip(slots) {
2177 let fields: Vec<Option<String>> = slot.value()
2178 .map_err(|e| ServerError::ValkeyContext { source: e, context: "pipeline slot".into() })?;
2179
2180 let field = |i: usize| -> String {
2181 fields
2182 .get(i)
2183 .and_then(|v| v.as_ref())
2184 .cloned()
2185 .unwrap_or_default()
2186 };
2187
2188 summaries.push(ExecutionSummary {
2189 execution_id: eid,
2190 namespace: field(0),
2191 lane_id: field(1),
2192 execution_kind: field(2),
2193 public_state: field(3),
2194 priority: field(4).parse().unwrap_or(0),
2195 created_at: field(5),
2196 });
2197 }
2198
2199 let total = summaries.len();
2200 Ok(ListExecutionsResult {
2201 executions: summaries,
2202 total_returned: total,
2203 })
2204 }
2205
2206 pub async fn replay_execution(
2211 &self,
2212 execution_id: &ExecutionId,
2213 ) -> Result<ReplayExecutionResult, ServerError> {
2214 let partition = execution_partition(execution_id, &self.config.partition_config);
2215 let ctx = ExecKeyContext::new(&partition, execution_id);
2216 let idx = IndexKeys::new(&partition);
2217
2218 let dyn_fields: Vec<Option<String>> = self
2230 .client
2231 .cmd("HMGET")
2232 .arg(ctx.core())
2233 .arg("lane_id")
2234 .arg("flow_id")
2235 .arg("terminal_outcome")
2236 .execute()
2237 .await
2238 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET replay pre-read".into() })?;
2239 let lane = LaneId::new(
2240 dyn_fields
2241 .first()
2242 .and_then(|v| v.as_ref())
2243 .cloned()
2244 .unwrap_or_else(|| "default".to_owned()),
2245 );
2246 let flow_id_str = dyn_fields
2247 .get(1)
2248 .and_then(|v| v.as_ref())
2249 .cloned()
2250 .unwrap_or_default();
2251 let terminal_outcome = dyn_fields
2252 .get(2)
2253 .and_then(|v| v.as_ref())
2254 .cloned()
2255 .unwrap_or_default();
2256
2257 let is_skipped_flow_member = terminal_outcome == "skipped" && !flow_id_str.is_empty();
2258
2259 let mut fcall_keys: Vec<String> = vec![
2261 ctx.core(),
2262 idx.lane_terminal(&lane),
2263 idx.lane_eligible(&lane),
2264 ctx.lease_history(),
2265 ];
2266
2267 let now = TimestampMs::now();
2269 let mut fcall_args: Vec<String> = vec![execution_id.to_string(), now.to_string()];
2270
2271 if is_skipped_flow_member {
2272 let flow_id = FlowId::parse(&flow_id_str)
2276 .map_err(|e| ServerError::Script(format!("bad flow_id: {e}")))?;
2277 let flow_part =
2278 flow_partition(&flow_id, &self.config.partition_config);
2279 let flow_ctx = FlowKeyContext::new(&flow_part, &flow_id);
2280 let edge_ids: Vec<String> = self
2281 .client
2282 .cmd("SMEMBERS")
2283 .arg(flow_ctx.incoming(execution_id))
2284 .execute()
2285 .await
2286 .map_err(|e| ServerError::ValkeyContext { source: e, context: "SMEMBERS replay edges".into() })?;
2287
2288 fcall_keys.push(idx.lane_blocked_dependencies(&lane)); fcall_keys.push(ctx.deps_meta()); fcall_keys.push(ctx.deps_unresolved()); for eid_str in &edge_ids {
2293 let edge_id = EdgeId::parse(eid_str)
2294 .unwrap_or_else(|_| EdgeId::new());
2295 fcall_keys.push(ctx.dep_edge(&edge_id)); fcall_args.push(eid_str.clone()); }
2298 }
2299
2300 let key_refs: Vec<&str> = fcall_keys.iter().map(|s| s.as_str()).collect();
2301 let arg_refs: Vec<&str> = fcall_args.iter().map(|s| s.as_str()).collect();
2302
2303 let raw: Value = self
2304 .fcall_with_reload("ff_replay_execution", &key_refs, &arg_refs)
2305 .await?;
2306
2307 parse_replay_result(&raw)
2308 }
2309
2310 pub async fn read_attempt_stream(
2322 &self,
2323 execution_id: &ExecutionId,
2324 attempt_index: AttemptIndex,
2325 from_id: &str,
2326 to_id: &str,
2327 count_limit: u64,
2328 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2329 use ff_core::contracts::{ReadFramesArgs, ReadFramesResult};
2330
2331 if count_limit == 0 {
2332 return Err(ServerError::InvalidInput(
2333 "count_limit must be >= 1".to_owned(),
2334 ));
2335 }
2336
2337 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2342 Ok(p) => p,
2343 Err(tokio::sync::TryAcquireError::NoPermits) => {
2344 return Err(ServerError::ConcurrencyLimitExceeded(
2345 "stream_ops",
2346 self.config.max_concurrent_stream_ops,
2347 ));
2348 }
2349 Err(tokio::sync::TryAcquireError::Closed) => {
2350 return Err(ServerError::OperationFailed(
2351 "stream semaphore closed (server shutting down)".into(),
2352 ));
2353 }
2354 };
2355
2356 let args = ReadFramesArgs {
2357 execution_id: execution_id.clone(),
2358 attempt_index,
2359 from_id: from_id.to_owned(),
2360 to_id: to_id.to_owned(),
2361 count_limit,
2362 };
2363
2364 let partition = execution_partition(execution_id, &self.config.partition_config);
2365 let ctx = ExecKeyContext::new(&partition, execution_id);
2366 let keys = ff_script::functions::stream::StreamOpKeys { ctx: &ctx };
2367
2368 let result = ff_script::functions::stream::ff_read_attempt_stream(
2372 &self.tail_client, &keys, &args,
2373 )
2374 .await
2375 .map_err(script_error_to_server);
2376
2377 drop(permit);
2378
2379 match result? {
2380 ReadFramesResult::Frames(f) => Ok(f),
2381 }
2382 }
2383
2384 pub async fn tail_attempt_stream(
2402 &self,
2403 execution_id: &ExecutionId,
2404 attempt_index: AttemptIndex,
2405 last_id: &str,
2406 block_ms: u64,
2407 count_limit: u64,
2408 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
2409 if count_limit == 0 {
2410 return Err(ServerError::InvalidInput(
2411 "count_limit must be >= 1".to_owned(),
2412 ));
2413 }
2414
2415 let permit = match self.stream_semaphore.clone().try_acquire_owned() {
2431 Ok(p) => p,
2432 Err(tokio::sync::TryAcquireError::NoPermits) => {
2433 return Err(ServerError::ConcurrencyLimitExceeded(
2434 "stream_ops",
2435 self.config.max_concurrent_stream_ops,
2436 ));
2437 }
2438 Err(tokio::sync::TryAcquireError::Closed) => {
2439 return Err(ServerError::OperationFailed(
2440 "stream semaphore closed (server shutting down)".into(),
2441 ));
2442 }
2443 };
2444
2445 let partition = execution_partition(execution_id, &self.config.partition_config);
2446 let ctx = ExecKeyContext::new(&partition, execution_id);
2447 let stream_key = ctx.stream(attempt_index);
2448 let stream_meta_key = ctx.stream_meta(attempt_index);
2449
2450 let _xread_guard = self.xread_block_lock.lock().await;
2458
2459 let result = ff_script::stream_tail::xread_block(
2460 &self.tail_client,
2461 &stream_key,
2462 &stream_meta_key,
2463 last_id,
2464 block_ms,
2465 count_limit,
2466 )
2467 .await
2468 .map_err(script_error_to_server);
2469
2470 drop(_xread_guard);
2471 drop(permit);
2472 result
2473 }
2474
2475 pub async fn shutdown(self) {
2498 tracing::info!("shutting down FlowFabric server");
2499
2500 self.stream_semaphore.close();
2505 tracing::info!(
2506 "stream semaphore closed; no new read/tail attempts will be accepted"
2507 );
2508
2509 let drain_timeout = Duration::from_secs(15);
2513 let background = self.background_tasks.clone();
2514 let drain = async move {
2515 let mut guard = background.lock().await;
2516 while guard.join_next().await.is_some() {}
2517 };
2518 match tokio::time::timeout(drain_timeout, drain).await {
2519 Ok(()) => {}
2520 Err(_) => {
2521 tracing::warn!(
2522 timeout_s = drain_timeout.as_secs(),
2523 "shutdown: background tasks did not finish in time, aborting"
2524 );
2525 self.background_tasks.lock().await.abort_all();
2526 }
2527 }
2528
2529 self.engine.shutdown().await;
2530 tracing::info!("FlowFabric server shutdown complete");
2531 }
2532}
2533
2534const REQUIRED_VALKEY_MAJOR: u32 = 8;
2539
2540const VERSION_CHECK_RETRY_BUDGET: Duration = Duration::from_secs(60);
2545
2546async fn verify_valkey_version(client: &Client) -> Result<(), ServerError> {
2571 let deadline = tokio::time::Instant::now() + VERSION_CHECK_RETRY_BUDGET;
2572 let mut backoff = Duration::from_millis(200);
2573 loop {
2574 let (should_retry, err_for_budget_exhaust, log_detail): (bool, ServerError, String) =
2575 match query_valkey_version(client).await {
2576 Ok(detected_major) if detected_major >= REQUIRED_VALKEY_MAJOR => {
2577 tracing::info!(
2578 detected_major,
2579 required = REQUIRED_VALKEY_MAJOR,
2580 "Valkey version accepted"
2581 );
2582 return Ok(());
2583 }
2584 Ok(detected_major) => (
2585 true,
2589 ServerError::ValkeyVersionTooLow {
2590 detected: detected_major.to_string(),
2591 required: format!("{REQUIRED_VALKEY_MAJOR}.0"),
2592 },
2593 format!("detected_major={detected_major} < required={REQUIRED_VALKEY_MAJOR}"),
2594 ),
2595 Err(e) => {
2596 let retryable = e
2601 .valkey_kind()
2602 .map(ff_script::retry::is_retryable_kind)
2603 .unwrap_or(true);
2607 let detail = e.to_string();
2608 (retryable, e, detail)
2609 }
2610 };
2611
2612 if !should_retry {
2613 return Err(err_for_budget_exhaust);
2614 }
2615 if tokio::time::Instant::now() >= deadline {
2616 return Err(err_for_budget_exhaust);
2617 }
2618 tracing::warn!(
2619 backoff_ms = backoff.as_millis() as u64,
2620 detail = %log_detail,
2621 "valkey version check transient failure; retrying"
2622 );
2623 tokio::time::sleep(backoff).await;
2624 backoff = (backoff * 2).min(Duration::from_secs(5));
2625 }
2626}
2627
2628async fn query_valkey_version(client: &Client) -> Result<u32, ServerError> {
2640 let raw: Value = client
2641 .cmd("INFO")
2642 .arg("server")
2643 .execute()
2644 .await
2645 .map_err(|e| ServerError::ValkeyContext {
2646 source: e,
2647 context: "INFO server".into(),
2648 })?;
2649 let info_body = extract_info_body(&raw)?;
2650 parse_valkey_major_version(&info_body)
2651}
2652
2653fn extract_info_body(raw: &Value) -> Result<String, ServerError> {
2660 match raw {
2661 Value::BulkString(bytes) => Ok(String::from_utf8_lossy(bytes).into_owned()),
2662 Value::VerbatimString { text, .. } => Ok(text.clone()),
2663 Value::SimpleString(s) => Ok(s.clone()),
2664 Value::Map(entries) => {
2665 let (_, body) = entries.first().ok_or_else(|| {
2669 ServerError::OperationFailed(
2670 "valkey version check: cluster INFO returned empty map".into(),
2671 )
2672 })?;
2673 extract_info_body(body)
2674 }
2675 other => Err(ServerError::OperationFailed(format!(
2676 "valkey version check: unexpected INFO shape: {other:?}"
2677 ))),
2678 }
2679}
2680
2681fn parse_valkey_major_version(info: &str) -> Result<u32, ServerError> {
2690 let extract_major = |line: &str| -> Result<u32, ServerError> {
2691 let trimmed = line.trim();
2692 let major_str = trimmed.split('.').next().unwrap_or("").trim();
2693 if major_str.is_empty() {
2694 return Err(ServerError::OperationFailed(format!(
2695 "valkey version check: empty version field in '{trimmed}'"
2696 )));
2697 }
2698 major_str.parse::<u32>().map_err(|_| {
2699 ServerError::OperationFailed(format!(
2700 "valkey version check: non-numeric major in '{trimmed}'"
2701 ))
2702 })
2703 };
2704 if let Some(valkey_line) = info
2706 .lines()
2707 .find_map(|line| line.strip_prefix("valkey_version:"))
2708 {
2709 return extract_major(valkey_line);
2710 }
2711 if let Some(redis_line) = info
2714 .lines()
2715 .find_map(|line| line.strip_prefix("redis_version:"))
2716 {
2717 return extract_major(redis_line);
2718 }
2719 Err(ServerError::OperationFailed(
2720 "valkey version check: INFO missing valkey_version and redis_version".into(),
2721 ))
2722}
2723
2724async fn validate_or_create_partition_config(
2731 client: &Client,
2732 config: &PartitionConfig,
2733) -> Result<(), ServerError> {
2734 let key = keys::global_config_partitions();
2735
2736 let existing: HashMap<String, String> = client
2737 .hgetall(&key)
2738 .await
2739 .map_err(|e| ServerError::ValkeyContext { source: e, context: format!("HGETALL {key}") })?;
2740
2741 if existing.is_empty() {
2742 tracing::info!("first boot: creating {key}");
2744 client
2745 .hset(&key, "num_flow_partitions", &config.num_flow_partitions.to_string())
2746 .await
2747 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_flow_partitions".into() })?;
2748 client
2749 .hset(&key, "num_budget_partitions", &config.num_budget_partitions.to_string())
2750 .await
2751 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_budget_partitions".into() })?;
2752 client
2753 .hset(&key, "num_quota_partitions", &config.num_quota_partitions.to_string())
2754 .await
2755 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HSET num_quota_partitions".into() })?;
2756 return Ok(());
2757 }
2758
2759 let check = |field: &str, expected: u16| -> Result<(), ServerError> {
2761 let stored: u16 = existing
2762 .get(field)
2763 .and_then(|v| v.parse().ok())
2764 .unwrap_or(0);
2765 if stored != expected {
2766 return Err(ServerError::PartitionMismatch(format!(
2767 "{field}: stored={stored}, config={expected}. \
2768 Partition counts are fixed at deployment time. \
2769 Either fix your config or migrate the data."
2770 )));
2771 }
2772 Ok(())
2773 };
2774
2775 check("num_flow_partitions", config.num_flow_partitions)?;
2776 check("num_budget_partitions", config.num_budget_partitions)?;
2777 check("num_quota_partitions", config.num_quota_partitions)?;
2778
2779 tracing::info!("partition config validated against stored {key}");
2780 Ok(())
2781}
2782
2783const WAITPOINT_HMAC_INITIAL_KID: &str = "k1";
2789
2790enum PartitionBootOutcome {
2793 Match,
2795 Mismatch,
2797 Repaired,
2799 Installed,
2801}
2802
2803const BOOT_INIT_CONCURRENCY: usize = 16;
2808
2809async fn init_one_partition(
2810 client: &Client,
2811 partition: Partition,
2812 secret_hex: &str,
2813) -> Result<PartitionBootOutcome, ServerError> {
2814 let key = ff_core::keys::IndexKeys::new(&partition).waitpoint_hmac_secrets();
2815
2816 let stored_kid: Option<String> = client
2824 .cmd("HGET")
2825 .arg(&key)
2826 .arg("current_kid")
2827 .execute()
2828 .await
2829 .map_err(|e| ServerError::ValkeyContext {
2830 source: e,
2831 context: format!("HGET {key} current_kid (init probe)"),
2832 })?;
2833
2834 if let Some(stored_kid) = stored_kid {
2835 let field = format!("secret:{stored_kid}");
2839 let stored_secret: Option<String> = client
2840 .hget(&key, &field)
2841 .await
2842 .map_err(|e| ServerError::ValkeyContext {
2843 source: e,
2844 context: format!("HGET {key} secret:<kid> (init check)"),
2845 })?;
2846 if stored_secret.is_none() {
2847 client
2853 .hset(&key, &field, secret_hex)
2854 .await
2855 .map_err(|e| ServerError::ValkeyContext {
2856 source: e,
2857 context: format!("HSET {key} secret:<kid> (repair torn write)"),
2858 })?;
2859 return Ok(PartitionBootOutcome::Repaired);
2860 }
2861 if stored_secret.as_deref() != Some(secret_hex) {
2862 return Ok(PartitionBootOutcome::Mismatch);
2863 }
2864 return Ok(PartitionBootOutcome::Match);
2865 }
2866
2867 let secret_field = format!("secret:{WAITPOINT_HMAC_INITIAL_KID}");
2871 let _: i64 = client
2872 .cmd("HSET")
2873 .arg(&key)
2874 .arg("current_kid")
2875 .arg(WAITPOINT_HMAC_INITIAL_KID)
2876 .arg(&secret_field)
2877 .arg(secret_hex)
2878 .execute()
2879 .await
2880 .map_err(|e| ServerError::ValkeyContext {
2881 source: e,
2882 context: format!("HSET {key} (init waitpoint HMAC atomic)"),
2883 })?;
2884 Ok(PartitionBootOutcome::Installed)
2885}
2886
2887async fn initialize_waitpoint_hmac_secret(
2899 client: &Client,
2900 partition_config: &PartitionConfig,
2901 secret_hex: &str,
2902) -> Result<(), ServerError> {
2903 use futures::stream::{FuturesUnordered, StreamExt};
2904
2905 let n = partition_config.num_flow_partitions;
2906 tracing::info!(
2907 partitions = n,
2908 concurrency = BOOT_INIT_CONCURRENCY,
2909 "installing waitpoint HMAC secret across {n} execution partitions"
2910 );
2911
2912 let mut mismatch_count: u16 = 0;
2913 let mut repaired_count: u16 = 0;
2914 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
2915 let mut next_index: u16 = 0;
2916
2917 loop {
2918 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
2919 let partition = Partition {
2920 family: PartitionFamily::Execution,
2921 index: next_index,
2922 };
2923 let client = client.clone();
2924 let secret_hex = secret_hex.to_owned();
2925 pending.push(async move {
2926 init_one_partition(&client, partition, &secret_hex).await
2927 });
2928 next_index += 1;
2929 }
2930 match pending.next().await {
2931 Some(res) => match res? {
2932 PartitionBootOutcome::Match | PartitionBootOutcome::Installed => {}
2933 PartitionBootOutcome::Mismatch => mismatch_count += 1,
2934 PartitionBootOutcome::Repaired => repaired_count += 1,
2935 },
2936 None => break,
2937 }
2938 }
2939
2940 if repaired_count > 0 {
2941 tracing::warn!(
2942 repaired_partitions = repaired_count,
2943 total_partitions = n,
2944 "repaired {repaired_count} partitions with torn waitpoint HMAC writes \
2945 (current_kid present but secret:<kid> missing, likely crash during prior boot)"
2946 );
2947 }
2948
2949 if mismatch_count > 0 {
2950 tracing::warn!(
2951 mismatched_partitions = mismatch_count,
2952 total_partitions = n,
2953 "stored/env secret mismatch on {mismatch_count} partitions — \
2954 env FF_WAITPOINT_HMAC_SECRET ignored in favor of stored values; \
2955 run POST /v1/admin/rotate-waitpoint-secret to sync"
2956 );
2957 }
2958
2959 tracing::info!(partitions = n, "waitpoint HMAC secret install complete");
2960 Ok(())
2961}
2962
2963#[derive(Debug, Clone, serde::Serialize)]
2965pub struct RotateWaitpointSecretResult {
2966 pub rotated: u16,
2968 pub failed: Vec<u16>,
2973 pub new_kid: String,
2975}
2976
2977impl Server {
2978 pub async fn rotate_waitpoint_secret(
2986 &self,
2987 new_kid: &str,
2988 new_secret_hex: &str,
2989 ) -> Result<RotateWaitpointSecretResult, ServerError> {
2990 if new_kid.is_empty() || new_kid.contains(':') {
2991 return Err(ServerError::OperationFailed(
2992 "new_kid must be non-empty and must not contain ':'".into(),
2993 ));
2994 }
2995 if new_secret_hex.is_empty()
2996 || !new_secret_hex.len().is_multiple_of(2)
2997 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
2998 {
2999 return Err(ServerError::OperationFailed(
3000 "new_secret_hex must be a non-empty even-length hex string".into(),
3001 ));
3002 }
3003
3004 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
3012 Ok(p) => p,
3013 Err(tokio::sync::TryAcquireError::NoPermits) => {
3014 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
3015 }
3016 Err(tokio::sync::TryAcquireError::Closed) => {
3017 return Err(ServerError::OperationFailed(
3018 "admin rotate semaphore closed (server shutting down)".into(),
3019 ));
3020 }
3021 };
3022
3023 let n = self.config.partition_config.num_flow_partitions;
3024 let grace_ms = self.config.waitpoint_hmac_grace_ms;
3028
3029 use futures::stream::{FuturesUnordered, StreamExt};
3040
3041 let mut rotated = 0u16;
3042 let mut failed = Vec::new();
3043 let mut pending: FuturesUnordered<_> = FuturesUnordered::new();
3044 let mut next_index: u16 = 0;
3045
3046 loop {
3047 while pending.len() < BOOT_INIT_CONCURRENCY && next_index < n {
3048 let partition = Partition {
3049 family: PartitionFamily::Execution,
3050 index: next_index,
3051 };
3052 let idx = next_index;
3053 let new_kid_owned = new_kid.to_owned();
3058 let new_secret_owned = new_secret_hex.to_owned();
3059 let partition_owned = partition;
3060 let fut = async move {
3061 let outcome = self
3062 .rotate_single_partition(
3063 &partition_owned,
3064 &new_kid_owned,
3065 &new_secret_owned,
3066 grace_ms,
3067 )
3068 .await;
3069 (idx, partition_owned, outcome)
3070 };
3071 pending.push(fut);
3072 next_index += 1;
3073 }
3074 match pending.next().await {
3075 Some((idx, partition, outcome)) => match outcome {
3076 Ok(()) => {
3077 rotated += 1;
3078 tracing::debug!(
3086 partition = %partition,
3087 new_kid = %new_kid,
3088 "waitpoint_hmac_rotated"
3089 );
3090 }
3091 Err(e) => {
3092 tracing::error!(
3096 target: "audit",
3097 partition = %partition,
3098 err = %e,
3099 "waitpoint_hmac_rotation_failed"
3100 );
3101 failed.push(idx);
3102 }
3103 },
3104 None => break,
3105 }
3106 }
3107
3108 tracing::info!(
3112 target: "audit",
3113 new_kid = %new_kid,
3114 total_partitions = n,
3115 rotated,
3116 failed_count = failed.len(),
3117 "waitpoint_hmac_rotation_complete"
3118 );
3119
3120 Ok(RotateWaitpointSecretResult {
3121 rotated,
3122 failed,
3123 new_kid: new_kid.to_owned(),
3124 })
3125 }
3126
3127 async fn rotate_single_partition(
3134 &self,
3135 partition: &Partition,
3136 new_kid: &str,
3137 new_secret_hex: &str,
3138 grace_ms: u64,
3139 ) -> Result<(), ServerError> {
3140 let idx = IndexKeys::new(partition);
3141 let args = RotateWaitpointHmacSecretArgs {
3142 new_kid: new_kid.to_owned(),
3143 new_secret_hex: new_secret_hex.to_owned(),
3144 grace_ms,
3145 };
3146 let outcome = ff_script::functions::suspension::ff_rotate_waitpoint_hmac_secret(
3147 &self.client,
3148 &idx,
3149 &args,
3150 )
3151 .await
3152 .map_err(|e| match e {
3153 ff_script::ScriptError::RotationConflict(kid) => {
3157 ServerError::OperationFailed(format!(
3158 "rotation conflict: kid {kid} already installed with a \
3159 different secret. Either use a fresh kid or restore the \
3160 original secret for this kid before retrying."
3161 ))
3162 }
3163 ff_script::ScriptError::Valkey(v) => ServerError::ValkeyContext {
3164 source: v,
3165 context: format!("FCALL ff_rotate_waitpoint_hmac_secret partition={partition}"),
3166 },
3167 other => ServerError::OperationFailed(format!(
3168 "rotation failed on partition {partition}: {other}"
3169 )),
3170 })?;
3171 let _ = outcome;
3174 Ok(())
3175 }
3176}
3177
3178fn parse_create_result(
3181 raw: &Value,
3182 execution_id: &ExecutionId,
3183) -> Result<CreateExecutionResult, ServerError> {
3184 let arr = match raw {
3185 Value::Array(arr) => arr,
3186 _ => return Err(ServerError::Script("ff_create_execution: expected Array".into())),
3187 };
3188
3189 let status = match arr.first() {
3190 Some(Ok(Value::Int(n))) => *n,
3191 _ => return Err(ServerError::Script("ff_create_execution: bad status code".into())),
3192 };
3193
3194 if status == 1 {
3195 let sub = arr
3197 .get(1)
3198 .and_then(|v| match v {
3199 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3200 Ok(Value::SimpleString(s)) => Some(s.clone()),
3201 _ => None,
3202 })
3203 .unwrap_or_default();
3204
3205 if sub == "DUPLICATE" {
3206 Ok(CreateExecutionResult::Duplicate {
3207 execution_id: execution_id.clone(),
3208 })
3209 } else {
3210 Ok(CreateExecutionResult::Created {
3211 execution_id: execution_id.clone(),
3212 public_state: PublicState::Waiting,
3213 })
3214 }
3215 } else {
3216 let error_code = arr
3217 .get(1)
3218 .and_then(|v| match v {
3219 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3220 Ok(Value::SimpleString(s)) => Some(s.clone()),
3221 _ => None,
3222 })
3223 .unwrap_or_else(|| "unknown".to_owned());
3224 Err(ServerError::OperationFailed(format!(
3225 "ff_create_execution failed: {error_code}"
3226 )))
3227 }
3228}
3229
3230fn parse_cancel_result(
3231 raw: &Value,
3232 execution_id: &ExecutionId,
3233) -> Result<CancelExecutionResult, ServerError> {
3234 let arr = match raw {
3235 Value::Array(arr) => arr,
3236 _ => return Err(ServerError::Script("ff_cancel_execution: expected Array".into())),
3237 };
3238
3239 let status = match arr.first() {
3240 Some(Ok(Value::Int(n))) => *n,
3241 _ => return Err(ServerError::Script("ff_cancel_execution: bad status code".into())),
3242 };
3243
3244 if status == 1 {
3245 Ok(CancelExecutionResult::Cancelled {
3246 execution_id: execution_id.clone(),
3247 public_state: PublicState::Cancelled,
3248 })
3249 } else {
3250 let error_code = arr
3251 .get(1)
3252 .and_then(|v| match v {
3253 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3254 Ok(Value::SimpleString(s)) => Some(s.clone()),
3255 _ => None,
3256 })
3257 .unwrap_or_else(|| "unknown".to_owned());
3258 Err(ServerError::OperationFailed(format!(
3259 "ff_cancel_execution failed: {error_code}"
3260 )))
3261 }
3262}
3263
3264fn parse_budget_create_result(
3265 raw: &Value,
3266 budget_id: &BudgetId,
3267) -> Result<CreateBudgetResult, ServerError> {
3268 let arr = match raw {
3269 Value::Array(arr) => arr,
3270 _ => return Err(ServerError::Script("ff_create_budget: expected Array".into())),
3271 };
3272
3273 let status = match arr.first() {
3274 Some(Ok(Value::Int(n))) => *n,
3275 _ => return Err(ServerError::Script("ff_create_budget: bad status code".into())),
3276 };
3277
3278 if status == 1 {
3279 let sub = arr
3280 .get(1)
3281 .and_then(|v| match v {
3282 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3283 Ok(Value::SimpleString(s)) => Some(s.clone()),
3284 _ => None,
3285 })
3286 .unwrap_or_default();
3287
3288 if sub == "ALREADY_SATISFIED" {
3289 Ok(CreateBudgetResult::AlreadySatisfied {
3290 budget_id: budget_id.clone(),
3291 })
3292 } else {
3293 Ok(CreateBudgetResult::Created {
3294 budget_id: budget_id.clone(),
3295 })
3296 }
3297 } else {
3298 let error_code = arr
3299 .get(1)
3300 .and_then(|v| match v {
3301 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3302 Ok(Value::SimpleString(s)) => Some(s.clone()),
3303 _ => None,
3304 })
3305 .unwrap_or_else(|| "unknown".to_owned());
3306 Err(ServerError::OperationFailed(format!(
3307 "ff_create_budget failed: {error_code}"
3308 )))
3309 }
3310}
3311
3312fn parse_quota_create_result(
3313 raw: &Value,
3314 quota_policy_id: &QuotaPolicyId,
3315) -> Result<CreateQuotaPolicyResult, ServerError> {
3316 let arr = match raw {
3317 Value::Array(arr) => arr,
3318 _ => return Err(ServerError::Script("ff_create_quota_policy: expected Array".into())),
3319 };
3320
3321 let status = match arr.first() {
3322 Some(Ok(Value::Int(n))) => *n,
3323 _ => return Err(ServerError::Script("ff_create_quota_policy: bad status code".into())),
3324 };
3325
3326 if status == 1 {
3327 let sub = arr
3328 .get(1)
3329 .and_then(|v| match v {
3330 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3331 Ok(Value::SimpleString(s)) => Some(s.clone()),
3332 _ => None,
3333 })
3334 .unwrap_or_default();
3335
3336 if sub == "ALREADY_SATISFIED" {
3337 Ok(CreateQuotaPolicyResult::AlreadySatisfied {
3338 quota_policy_id: quota_policy_id.clone(),
3339 })
3340 } else {
3341 Ok(CreateQuotaPolicyResult::Created {
3342 quota_policy_id: quota_policy_id.clone(),
3343 })
3344 }
3345 } else {
3346 let error_code = arr
3347 .get(1)
3348 .and_then(|v| match v {
3349 Ok(Value::BulkString(b)) => Some(String::from_utf8_lossy(b).into_owned()),
3350 Ok(Value::SimpleString(s)) => Some(s.clone()),
3351 _ => None,
3352 })
3353 .unwrap_or_else(|| "unknown".to_owned());
3354 Err(ServerError::OperationFailed(format!(
3355 "ff_create_quota_policy failed: {error_code}"
3356 )))
3357 }
3358}
3359
3360fn parse_create_flow_result(
3363 raw: &Value,
3364 flow_id: &FlowId,
3365) -> Result<CreateFlowResult, ServerError> {
3366 let arr = match raw {
3367 Value::Array(arr) => arr,
3368 _ => return Err(ServerError::Script("ff_create_flow: expected Array".into())),
3369 };
3370 let status = match arr.first() {
3371 Some(Ok(Value::Int(n))) => *n,
3372 _ => return Err(ServerError::Script("ff_create_flow: bad status code".into())),
3373 };
3374 if status == 1 {
3375 let sub = fcall_field_str(arr, 1);
3376 if sub == "ALREADY_SATISFIED" {
3377 Ok(CreateFlowResult::AlreadySatisfied {
3378 flow_id: flow_id.clone(),
3379 })
3380 } else {
3381 Ok(CreateFlowResult::Created {
3382 flow_id: flow_id.clone(),
3383 })
3384 }
3385 } else {
3386 let error_code = fcall_field_str(arr, 1);
3387 Err(ServerError::OperationFailed(format!(
3388 "ff_create_flow failed: {error_code}"
3389 )))
3390 }
3391}
3392
3393fn parse_add_execution_to_flow_result(
3394 raw: &Value,
3395) -> Result<AddExecutionToFlowResult, ServerError> {
3396 let arr = match raw {
3397 Value::Array(arr) => arr,
3398 _ => {
3399 return Err(ServerError::Script(
3400 "ff_add_execution_to_flow: expected Array".into(),
3401 ))
3402 }
3403 };
3404 let status = match arr.first() {
3405 Some(Ok(Value::Int(n))) => *n,
3406 _ => {
3407 return Err(ServerError::Script(
3408 "ff_add_execution_to_flow: bad status code".into(),
3409 ))
3410 }
3411 };
3412 if status == 1 {
3413 let sub = fcall_field_str(arr, 1);
3414 let eid_str = fcall_field_str(arr, 2);
3415 let nc_str = fcall_field_str(arr, 3);
3416 let eid = ExecutionId::parse(&eid_str)
3417 .map_err(|e| ServerError::Script(format!("bad execution_id: {e}")))?;
3418 let nc: u32 = nc_str.parse().unwrap_or(0);
3419 if sub == "ALREADY_SATISFIED" {
3420 Ok(AddExecutionToFlowResult::AlreadyMember {
3421 execution_id: eid,
3422 node_count: nc,
3423 })
3424 } else {
3425 Ok(AddExecutionToFlowResult::Added {
3426 execution_id: eid,
3427 new_node_count: nc,
3428 })
3429 }
3430 } else {
3431 let error_code = fcall_field_str(arr, 1);
3432 Err(ServerError::OperationFailed(format!(
3433 "ff_add_execution_to_flow failed: {error_code}"
3434 )))
3435 }
3436}
3437
3438enum ParsedCancelFlow {
3444 Cancelled {
3445 policy: String,
3446 member_execution_ids: Vec<String>,
3447 },
3448 AlreadyTerminal,
3449}
3450
3451fn parse_cancel_flow_raw(raw: &Value) -> Result<ParsedCancelFlow, ServerError> {
3457 let arr = match raw {
3458 Value::Array(arr) => arr,
3459 _ => return Err(ServerError::Script("ff_cancel_flow: expected Array".into())),
3460 };
3461 let status = match arr.first() {
3462 Some(Ok(Value::Int(n))) => *n,
3463 _ => return Err(ServerError::Script("ff_cancel_flow: bad status code".into())),
3464 };
3465 if status != 1 {
3466 let error_code = fcall_field_str(arr, 1);
3467 if error_code == "flow_already_terminal" {
3468 return Ok(ParsedCancelFlow::AlreadyTerminal);
3469 }
3470 return Err(ServerError::OperationFailed(format!(
3471 "ff_cancel_flow failed: {error_code}"
3472 )));
3473 }
3474 let policy = fcall_field_str(arr, 2);
3476 let mut members = Vec::with_capacity(arr.len().saturating_sub(3));
3479 for i in 3..arr.len() {
3480 members.push(fcall_field_str(arr, i));
3481 }
3482 Ok(ParsedCancelFlow::Cancelled { policy, member_execution_ids: members })
3483}
3484
3485fn parse_stage_dependency_edge_result(
3486 raw: &Value,
3487) -> Result<StageDependencyEdgeResult, ServerError> {
3488 let arr = match raw {
3489 Value::Array(arr) => arr,
3490 _ => return Err(ServerError::Script("ff_stage_dependency_edge: expected Array".into())),
3491 };
3492 let status = match arr.first() {
3493 Some(Ok(Value::Int(n))) => *n,
3494 _ => return Err(ServerError::Script("ff_stage_dependency_edge: bad status code".into())),
3495 };
3496 if status == 1 {
3497 let edge_id_str = fcall_field_str(arr, 2);
3498 let rev_str = fcall_field_str(arr, 3);
3499 let edge_id = EdgeId::parse(&edge_id_str)
3500 .map_err(|e| ServerError::Script(format!("bad edge_id: {e}")))?;
3501 let rev: u64 = rev_str.parse().unwrap_or(0);
3502 Ok(StageDependencyEdgeResult::Staged {
3503 edge_id,
3504 new_graph_revision: rev,
3505 })
3506 } else {
3507 let error_code = fcall_field_str(arr, 1);
3508 Err(ServerError::OperationFailed(format!(
3509 "ff_stage_dependency_edge failed: {error_code}"
3510 )))
3511 }
3512}
3513
3514fn parse_apply_dependency_result(
3515 raw: &Value,
3516) -> Result<ApplyDependencyToChildResult, ServerError> {
3517 let arr = match raw {
3518 Value::Array(arr) => arr,
3519 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: expected Array".into())),
3520 };
3521 let status = match arr.first() {
3522 Some(Ok(Value::Int(n))) => *n,
3523 _ => return Err(ServerError::Script("ff_apply_dependency_to_child: bad status code".into())),
3524 };
3525 if status == 1 {
3526 let sub = fcall_field_str(arr, 1);
3527 if sub == "ALREADY_APPLIED" || sub == "already_applied" {
3528 Ok(ApplyDependencyToChildResult::AlreadyApplied)
3529 } else {
3530 let count_str = fcall_field_str(arr, 2);
3532 let count: u32 = count_str.parse().unwrap_or(0);
3533 Ok(ApplyDependencyToChildResult::Applied {
3534 unsatisfied_count: count,
3535 })
3536 }
3537 } else {
3538 let error_code = fcall_field_str(arr, 1);
3539 Err(ServerError::OperationFailed(format!(
3540 "ff_apply_dependency_to_child failed: {error_code}"
3541 )))
3542 }
3543}
3544
3545fn parse_deliver_signal_result(
3546 raw: &Value,
3547 signal_id: &SignalId,
3548) -> Result<DeliverSignalResult, ServerError> {
3549 let arr = match raw {
3550 Value::Array(arr) => arr,
3551 _ => return Err(ServerError::Script("ff_deliver_signal: expected Array".into())),
3552 };
3553 let status = match arr.first() {
3554 Some(Ok(Value::Int(n))) => *n,
3555 _ => return Err(ServerError::Script("ff_deliver_signal: bad status code".into())),
3556 };
3557 if status == 1 {
3558 let sub = fcall_field_str(arr, 1);
3559 if sub == "DUPLICATE" {
3560 let existing_str = fcall_field_str(arr, 2);
3562 let existing_id = SignalId::parse(&existing_str).unwrap_or_else(|_| signal_id.clone());
3563 Ok(DeliverSignalResult::Duplicate {
3564 existing_signal_id: existing_id,
3565 })
3566 } else {
3567 let effect = fcall_field_str(arr, 3);
3569 Ok(DeliverSignalResult::Accepted {
3570 signal_id: signal_id.clone(),
3571 effect,
3572 })
3573 }
3574 } else {
3575 let error_code = fcall_field_str(arr, 1);
3576 Err(ServerError::OperationFailed(format!(
3577 "ff_deliver_signal failed: {error_code}"
3578 )))
3579 }
3580}
3581
3582fn parse_change_priority_result(
3583 raw: &Value,
3584 execution_id: &ExecutionId,
3585) -> Result<ChangePriorityResult, ServerError> {
3586 let arr = match raw {
3587 Value::Array(arr) => arr,
3588 _ => return Err(ServerError::Script("ff_change_priority: expected Array".into())),
3589 };
3590 let status = match arr.first() {
3591 Some(Ok(Value::Int(n))) => *n,
3592 _ => return Err(ServerError::Script("ff_change_priority: bad status code".into())),
3593 };
3594 if status == 1 {
3595 Ok(ChangePriorityResult::Changed {
3596 execution_id: execution_id.clone(),
3597 })
3598 } else {
3599 let error_code = fcall_field_str(arr, 1);
3600 Err(ServerError::OperationFailed(format!(
3601 "ff_change_priority failed: {error_code}"
3602 )))
3603 }
3604}
3605
3606fn parse_replay_result(raw: &Value) -> Result<ReplayExecutionResult, ServerError> {
3607 let arr = match raw {
3608 Value::Array(arr) => arr,
3609 _ => return Err(ServerError::Script("ff_replay_execution: expected Array".into())),
3610 };
3611 let status = match arr.first() {
3612 Some(Ok(Value::Int(n))) => *n,
3613 _ => return Err(ServerError::Script("ff_replay_execution: bad status code".into())),
3614 };
3615 if status == 1 {
3616 let unsatisfied = fcall_field_str(arr, 2);
3618 let ps = if unsatisfied == "0" {
3619 PublicState::Waiting
3620 } else {
3621 PublicState::WaitingChildren
3622 };
3623 Ok(ReplayExecutionResult::Replayed { public_state: ps })
3624 } else {
3625 let error_code = fcall_field_str(arr, 1);
3626 Err(ServerError::OperationFailed(format!(
3627 "ff_replay_execution failed: {error_code}"
3628 )))
3629 }
3630}
3631
3632fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
3643 match e {
3644 ff_script::error::ScriptError::Valkey(valkey_err) => ServerError::ValkeyContext {
3645 source: valkey_err,
3646 context: "stream FCALL transport".into(),
3647 },
3648 other => ServerError::Script(other.to_string()),
3649 }
3650}
3651
3652fn fcall_field_str(arr: &[Result<Value, ferriskey::Error>], index: usize) -> String {
3653 match arr.get(index) {
3654 Some(Ok(Value::BulkString(b))) => String::from_utf8_lossy(b).into_owned(),
3655 Some(Ok(Value::SimpleString(s))) => s.clone(),
3656 Some(Ok(Value::Int(n))) => n.to_string(),
3657 _ => String::new(),
3658 }
3659}
3660
3661fn parse_report_usage_result(raw: &Value) -> Result<ReportUsageResult, ServerError> {
3665 let arr = match raw {
3666 Value::Array(arr) => arr,
3667 _ => return Err(ServerError::Script("ff_report_usage_and_check: expected Array".into())),
3668 };
3669 let status_code = match arr.first() {
3670 Some(Ok(Value::Int(n))) => *n,
3671 _ => {
3672 return Err(ServerError::Script(
3673 "ff_report_usage_and_check: expected Int status code".into(),
3674 ))
3675 }
3676 };
3677 if status_code != 1 {
3678 let error_code = fcall_field_str(arr, 1);
3679 return Err(ServerError::OperationFailed(format!(
3680 "ff_report_usage_and_check failed: {error_code}"
3681 )));
3682 }
3683 let sub_status = fcall_field_str(arr, 1);
3684 match sub_status.as_str() {
3685 "OK" => Ok(ReportUsageResult::Ok),
3686 "ALREADY_APPLIED" => Ok(ReportUsageResult::AlreadyApplied),
3687 "SOFT_BREACH" => {
3688 let dim = fcall_field_str(arr, 2);
3689 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3690 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3691 Ok(ReportUsageResult::SoftBreach { dimension: dim, current_usage: current, soft_limit: limit })
3692 }
3693 "HARD_BREACH" => {
3694 let dim = fcall_field_str(arr, 2);
3695 let current: u64 = fcall_field_str(arr, 3).parse().unwrap_or(0);
3696 let limit: u64 = fcall_field_str(arr, 4).parse().unwrap_or(0);
3697 Ok(ReportUsageResult::HardBreach {
3698 dimension: dim,
3699 current_usage: current,
3700 hard_limit: limit,
3701 })
3702 }
3703 _ => Err(ServerError::OperationFailed(format!(
3704 "ff_report_usage_and_check: unknown sub-status: {sub_status}"
3705 ))),
3706 }
3707}
3708
3709fn parse_revoke_lease_result(raw: &Value) -> Result<RevokeLeaseResult, ServerError> {
3710 let arr = match raw {
3711 Value::Array(arr) => arr,
3712 _ => return Err(ServerError::Script("ff_revoke_lease: expected Array".into())),
3713 };
3714 let status = match arr.first() {
3715 Some(Ok(Value::Int(n))) => *n,
3716 _ => return Err(ServerError::Script("ff_revoke_lease: bad status code".into())),
3717 };
3718 if status == 1 {
3719 let sub = fcall_field_str(arr, 1);
3720 if sub == "ALREADY_SATISFIED" {
3721 let reason = fcall_field_str(arr, 2);
3722 Ok(RevokeLeaseResult::AlreadySatisfied { reason })
3723 } else {
3724 let lid = fcall_field_str(arr, 2);
3725 let epoch = fcall_field_str(arr, 3);
3726 Ok(RevokeLeaseResult::Revoked {
3727 lease_id: lid,
3728 lease_epoch: epoch,
3729 })
3730 }
3731 } else {
3732 let error_code = fcall_field_str(arr, 1);
3733 Err(ServerError::OperationFailed(format!(
3734 "ff_revoke_lease failed: {error_code}"
3735 )))
3736 }
3737}
3738
3739fn is_function_not_loaded(e: &ferriskey::Error) -> bool {
3745 if matches!(e.kind(), ferriskey::ErrorKind::NoScriptError) {
3746 return true;
3747 }
3748 e.detail()
3749 .map(|d| {
3750 d.contains("Function not loaded")
3751 || d.contains("No matching function")
3752 || d.contains("function not found")
3753 })
3754 .unwrap_or(false)
3755 || e.to_string().contains("Function not loaded")
3756}
3757
3758async fn fcall_with_reload_on_client(
3761 client: &Client,
3762 function: &str,
3763 keys: &[&str],
3764 args: &[&str],
3765) -> Result<Value, ServerError> {
3766 match client.fcall(function, keys, args).await {
3767 Ok(v) => Ok(v),
3768 Err(e) if is_function_not_loaded(&e) => {
3769 tracing::warn!(function, "Lua library not found on server, reloading");
3770 ff_script::loader::ensure_library(client)
3771 .await
3772 .map_err(ServerError::LibraryLoad)?;
3773 client
3774 .fcall(function, keys, args)
3775 .await
3776 .map_err(ServerError::Valkey)
3777 }
3778 Err(e) => Err(ServerError::Valkey(e)),
3779 }
3780}
3781
3782async fn build_cancel_execution_fcall(
3786 client: &Client,
3787 partition_config: &PartitionConfig,
3788 args: &CancelExecutionArgs,
3789) -> Result<(Vec<String>, Vec<String>), ServerError> {
3790 let partition = execution_partition(&args.execution_id, partition_config);
3791 let ctx = ExecKeyContext::new(&partition, &args.execution_id);
3792 let idx = IndexKeys::new(&partition);
3793
3794 let lane_str: Option<String> = client
3795 .hget(&ctx.core(), "lane_id")
3796 .await
3797 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HGET lane_id".into() })?;
3798 let lane = LaneId::new(lane_str.unwrap_or_else(|| "default".to_owned()));
3799
3800 let dyn_fields: Vec<Option<String>> = client
3801 .cmd("HMGET")
3802 .arg(ctx.core())
3803 .arg("current_attempt_index")
3804 .arg("current_waitpoint_id")
3805 .arg("current_worker_instance_id")
3806 .execute()
3807 .await
3808 .map_err(|e| ServerError::ValkeyContext { source: e, context: "HMGET cancel pre-read".into() })?;
3809
3810 let att_idx_val = dyn_fields.first()
3811 .and_then(|v| v.as_ref())
3812 .and_then(|s| s.parse::<u32>().ok())
3813 .unwrap_or(0);
3814 let att_idx = AttemptIndex::new(att_idx_val);
3815 let wp_id_str = dyn_fields.get(1).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
3816 let wp_id = if wp_id_str.is_empty() {
3817 WaitpointId::new()
3818 } else {
3819 WaitpointId::parse(&wp_id_str).unwrap_or_else(|_| WaitpointId::new())
3820 };
3821 let wiid_str = dyn_fields.get(2).and_then(|v| v.as_ref()).cloned().unwrap_or_default();
3822 let wiid = WorkerInstanceId::new(&wiid_str);
3823
3824 let keys: Vec<String> = vec![
3825 ctx.core(), ctx.attempt_hash(att_idx), ctx.stream_meta(att_idx), ctx.lease_current(), ctx.lease_history(), idx.lease_expiry(), idx.worker_leases(&wiid), ctx.suspension_current(), ctx.waitpoint(&wp_id), ctx.waitpoint_condition(&wp_id), idx.suspension_timeout(), idx.lane_terminal(&lane), idx.attempt_timeout(), idx.execution_deadline(), idx.lane_eligible(&lane), idx.lane_delayed(&lane), idx.lane_blocked_dependencies(&lane), idx.lane_blocked_budget(&lane), idx.lane_blocked_quota(&lane), idx.lane_blocked_route(&lane), idx.lane_blocked_operator(&lane), ];
3847 let argv: Vec<String> = vec![
3848 args.execution_id.to_string(),
3849 args.reason.clone(),
3850 args.source.to_string(),
3851 args.lease_id.as_ref().map(|l| l.to_string()).unwrap_or_default(),
3852 args.lease_epoch.as_ref().map(|e| e.to_string()).unwrap_or_default(),
3853 ];
3854 Ok((keys, argv))
3855}
3856
3857const CANCEL_MEMBER_RETRY_DELAYS_MS: [u64; 3] = [100, 500, 2_000];
3861
3862fn extract_valkey_kind(e: &ServerError) -> Option<ferriskey::ErrorKind> {
3867 match e {
3868 ServerError::Valkey(err) | ServerError::ValkeyContext { source: err, .. } => {
3869 Some(err.kind())
3870 }
3871 ServerError::LibraryLoad(load_err) => load_err.valkey_kind(),
3872 _ => None,
3873 }
3874}
3875
3876async fn cancel_member_execution(
3887 client: &Client,
3888 partition_config: &PartitionConfig,
3889 eid_str: &str,
3890 reason: &str,
3891 now: TimestampMs,
3892) -> Result<(), ServerError> {
3893 let execution_id = ExecutionId::parse(eid_str)
3894 .map_err(|e| ServerError::InvalidInput(format!("bad execution_id '{eid_str}': {e}")))?;
3895 let args = CancelExecutionArgs {
3896 execution_id: execution_id.clone(),
3897 reason: reason.to_owned(),
3898 source: CancelSource::OperatorOverride,
3899 lease_id: None,
3900 lease_epoch: None,
3901 attempt_id: None,
3902 now,
3903 };
3904
3905 let attempts = CANCEL_MEMBER_RETRY_DELAYS_MS.len();
3906 for (attempt_idx, delay_ms) in CANCEL_MEMBER_RETRY_DELAYS_MS.iter().enumerate() {
3907 let is_last = attempt_idx + 1 == attempts;
3908 match try_cancel_member_once(client, partition_config, &args).await {
3909 Ok(()) => return Ok(()),
3910 Err(e) => {
3911 let retryable = extract_valkey_kind(&e)
3915 .map(ff_script::retry::is_retryable_kind)
3916 .unwrap_or(false);
3917 if !retryable || is_last {
3918 return Err(e);
3919 }
3920 tracing::debug!(
3921 execution_id = %execution_id,
3922 attempt = attempt_idx + 1,
3923 delay_ms = *delay_ms,
3924 error = %e,
3925 "cancel_member_execution: transient error, retrying"
3926 );
3927 tokio::time::sleep(Duration::from_millis(*delay_ms)).await;
3928 }
3929 }
3930 }
3931 Err(ServerError::OperationFailed(format!(
3935 "cancel_member_execution: retries exhausted for {execution_id}"
3936 )))
3937}
3938
3939async fn try_cancel_member_once(
3942 client: &Client,
3943 partition_config: &PartitionConfig,
3944 args: &CancelExecutionArgs,
3945) -> Result<(), ServerError> {
3946 let (keys, argv) = build_cancel_execution_fcall(client, partition_config, args).await?;
3947 let key_refs: Vec<&str> = keys.iter().map(|s| s.as_str()).collect();
3948 let arg_refs: Vec<&str> = argv.iter().map(|s| s.as_str()).collect();
3949 let raw =
3950 fcall_with_reload_on_client(client, "ff_cancel_execution", &key_refs, &arg_refs).await?;
3951 parse_cancel_result(&raw, &args.execution_id).map(|_| ())
3952}
3953
3954fn parse_reset_budget_result(raw: &Value) -> Result<ResetBudgetResult, ServerError> {
3955 let arr = match raw {
3956 Value::Array(arr) => arr,
3957 _ => return Err(ServerError::Script("ff_reset_budget: expected Array".into())),
3958 };
3959 let status = match arr.first() {
3960 Some(Ok(Value::Int(n))) => *n,
3961 _ => return Err(ServerError::Script("ff_reset_budget: bad status code".into())),
3962 };
3963 if status == 1 {
3964 let next_str = fcall_field_str(arr, 2);
3965 let next_ms: i64 = next_str.parse().unwrap_or(0);
3966 Ok(ResetBudgetResult::Reset {
3967 next_reset_at: TimestampMs::from_millis(next_ms),
3968 })
3969 } else {
3970 let error_code = fcall_field_str(arr, 1);
3971 Err(ServerError::OperationFailed(format!(
3972 "ff_reset_budget failed: {error_code}"
3973 )))
3974 }
3975}
3976
3977#[cfg(test)]
3978mod tests {
3979 use super::*;
3980 use ferriskey::ErrorKind;
3981
3982 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
3983 ferriskey::Error::from((kind, "synthetic"))
3984 }
3985
3986 #[test]
3987 fn is_retryable_valkey_variant_uses_kind_table() {
3988 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::IoError)).is_retryable());
3989 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
3990 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
3991 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
3992 assert!(ServerError::Valkey(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
3993
3994 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
3995 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
3996 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
3997 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Moved)).is_retryable());
3998 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::Ask)).is_retryable());
3999 assert!(!ServerError::Valkey(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
4000 }
4001
4002 #[test]
4003 fn is_retryable_valkey_context_uses_kind_table() {
4004 let err = ServerError::ValkeyContext {
4005 source: mk_fk_err(ErrorKind::IoError),
4006 context: "HGET test".into(),
4007 };
4008 assert!(err.is_retryable());
4009
4010 let err = ServerError::ValkeyContext {
4011 source: mk_fk_err(ErrorKind::AuthenticationFailed),
4012 context: "auth".into(),
4013 };
4014 assert!(!err.is_retryable());
4015 }
4016
4017 #[test]
4018 fn is_retryable_library_load_delegates_to_inner_kind() {
4019 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4020 mk_fk_err(ErrorKind::IoError),
4021 ));
4022 assert!(err.is_retryable());
4023
4024 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4025 mk_fk_err(ErrorKind::AuthenticationFailed),
4026 ));
4027 assert!(!err.is_retryable());
4028
4029 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4030 expected: "1".into(),
4031 got: "2".into(),
4032 });
4033 assert!(!err.is_retryable());
4034 }
4035
4036 #[test]
4037 fn is_retryable_business_logic_variants_are_false() {
4038 assert!(!ServerError::NotFound("x".into()).is_retryable());
4039 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
4040 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
4041 assert!(!ServerError::Script("x".into()).is_retryable());
4042 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
4043 }
4044
4045 #[test]
4046 fn valkey_kind_delegates_through_library_load() {
4047 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
4048 mk_fk_err(ErrorKind::ClusterDown),
4049 ));
4050 assert_eq!(err.valkey_kind(), Some(ErrorKind::ClusterDown));
4051
4052 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
4053 expected: "1".into(),
4054 got: "2".into(),
4055 });
4056 assert_eq!(err.valkey_kind(), None);
4057 }
4058
4059 #[test]
4062 fn parse_valkey_major_prefers_valkey_version_over_redis_version() {
4063 let info = "\
4067# Server\r\n\
4068redis_version:7.2.4\r\n\
4069valkey_version:9.0.3\r\n\
4070server_mode:cluster\r\n\
4071os:Linux\r\n";
4072 assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4073 }
4074
4075 #[test]
4076 fn parse_valkey_major_real_valkey_8_cluster_body() {
4077 let info = "\
4081# Server\r\n\
4082redis_version:7.2.4\r\n\
4083server_name:valkey\r\n\
4084valkey_version:9.0.3\r\n\
4085valkey_release_stage:ga\r\n\
4086redis_git_sha1:00000000\r\n\
4087server_mode:cluster\r\n";
4088 assert_eq!(parse_valkey_major_version(info).unwrap(), 9);
4089 }
4090
4091 #[test]
4092 fn parse_valkey_major_falls_back_to_redis_version_on_valkey_7() {
4093 let info = "# Server\r\nredis_version:7.2.4\r\nfoo:bar\r\n";
4095 assert_eq!(parse_valkey_major_version(info).unwrap(), 7);
4096 }
4097
4098 #[test]
4099 fn parse_valkey_major_errors_when_no_version_field() {
4100 let info = "# Server\r\nfoo:bar\r\n";
4101 let err = parse_valkey_major_version(info).unwrap_err();
4102 assert!(matches!(err, ServerError::OperationFailed(_)));
4103 assert!(
4104 err.to_string().contains("missing"),
4105 "expected 'missing' in message, got: {err}"
4106 );
4107 }
4108
4109 #[test]
4110 fn parse_valkey_major_errors_on_non_numeric() {
4111 let info = "valkey_version:invalid.x.y\n";
4112 let err = parse_valkey_major_version(info).unwrap_err();
4113 assert!(matches!(err, ServerError::OperationFailed(_)));
4114 assert!(err.to_string().contains("non-numeric major"));
4115 }
4116
4117 #[test]
4118 fn extract_info_body_unwraps_cluster_map() {
4119 let body_text = "\
4122# Server\r\n\
4123redis_version:7.2.4\r\n\
4124valkey_version:9.0.3\r\n";
4125 let node_entry = (
4126 Value::SimpleString("127.0.0.1:7000".to_string()),
4127 Value::VerbatimString {
4128 format: ferriskey::value::VerbatimFormat::Text,
4129 text: body_text.to_string(),
4130 },
4131 );
4132 let map = Value::Map(vec![node_entry]);
4133 let body = extract_info_body(&map).unwrap();
4134 assert_eq!(body, body_text);
4135 assert_eq!(parse_valkey_major_version(&body).unwrap(), 9);
4136 }
4137
4138 #[test]
4139 fn extract_info_body_handles_simple_string() {
4140 let body_text = "redis_version:7.2.4\r\nvalkey_version:9.0.3\r\n";
4141 let v = Value::SimpleString(body_text.to_string());
4142 let body = extract_info_body(&v).unwrap();
4143 assert_eq!(body, body_text);
4144 }
4145
4146 #[test]
4147 fn valkey_version_too_low_is_not_retryable() {
4148 let err = ServerError::ValkeyVersionTooLow {
4149 detected: "7".into(),
4150 required: "8.0".into(),
4151 };
4152 assert!(!err.is_retryable());
4153 assert_eq!(err.valkey_kind(), None);
4154 }
4155
4156 #[test]
4157 fn valkey_version_too_low_error_message_includes_both_versions() {
4158 let err = ServerError::ValkeyVersionTooLow {
4159 detected: "7".into(),
4160 required: "8.0".into(),
4161 };
4162 let msg = err.to_string();
4163 assert!(msg.contains("7"), "detected version in message: {msg}");
4164 assert!(msg.contains("8.0"), "required version in message: {msg}");
4165 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
4166 }
4167}