ff_server/server.rs
1use std::sync::Arc;
2use std::time::Duration;
3
4use ferriskey::ClientBuilder;
5use tokio::sync::Mutex as AsyncMutex;
6use tokio::task::JoinSet;
7use ff_core::engine_backend::EngineBackend;
8use ff_core::engine_error::EngineError;
9use ff_core::contracts::{
10 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
11 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
12 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
13 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
14 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
15 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
16 ListExecutionsPage, ReplayExecutionResult,
17 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
18 RevokeLeaseResult,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::partition::{execution_partition, flow_partition, PartitionConfig};
22use ff_core::state::PublicState;
23use ff_core::types::*;
24use ff_engine::Engine;
25use ff_script::retry::is_retryable_kind;
26
27use crate::config::{BackendKind, ServerConfig};
28
29/// RFC-017 §9.0: backends that may boot as of this Stage. Postgres
30/// joins at Stage E (v0.8.0). Compiled into the binary by design —
31/// see RFC-017 §9.0 "Fleet-wide cutover requirement" for the
32/// rolling-upgrade implication.
33const BACKEND_STAGE_READY: &[&str] = &["valkey", "postgres"];
34
35/// Upper bound on `member_execution_ids` returned in the
36/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
37/// terminal state (idempotent retry). The first (non-idempotent) cancel call
38/// returns the full list; retries only need a sample.
39const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
40
41/// FlowFabric server — connects everything together.
42///
43/// Manages the Valkey connection, Lua library loading, background scanners,
44/// and provides a minimal API for Phase 1.
45pub struct Server {
46 /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
47 /// operators rotate ~monthly and can afford to serialize; concurrent
48 /// rotate requests are an attack or misbehaving script. Holding the
49 /// permit also guards against interleaved partial rotations on the
50 /// Server side of the per-partition locks, surfacing contention as
51 /// HTTP 429 instead of silently queueing and blowing past the 120s
52 /// HTTP timeout. See `rotate_waitpoint_secret` handler.
53 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
54 /// Valkey engine (14 scanners). `None` on the Postgres boot path
55 /// — engine scanners are Valkey-only (RFC-017 Wave 8 Stage E1;
56 /// Postgres reconcilers run out-of-process via
57 /// `ff-backend-postgres::reconcilers` + a separate scanner
58 /// supervisor that Stage E2/E3 will wire).
59 engine: Option<Engine>,
60 config: ServerConfig,
61 // RFC-017 Wave 8 Stage E3 (§9.3 / F8+Q4): `Server::scheduler` field
62 // removed. The Valkey `ff_scheduler::Scheduler` is owned by
63 // `ValkeyBackend` (installed via `with_scheduler` during boot);
64 // the Postgres twin is constructed per-call inside
65 // `PostgresBackend::claim_for_worker`. Server-side dispatch of
66 // `claim_for_worker` now flows through `self.backend` only —
67 // trait cutover per §4 row 9.
68 /// Background tasks spawned by async handlers (e.g. cancel_flow member
69 /// dispatch). Drained on shutdown with a bounded timeout.
70 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
71 /// PR-94: observability registry. Always present; the no-op shim
72 /// takes zero runtime cost when the `observability` feature is
73 /// off, and the real OTEL-backed registry is passed in via
74 /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
75 /// with [`Engine::start_with_metrics`] and
76 /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
77 /// sees everything the process produces.
78 metrics: Arc<ff_observability::Metrics>,
79 /// RFC-017 Stage A: backend trait object for the data-plane
80 /// migration. Dual-field posture — the existing `client` /
81 /// `tail_client` / `engine` / `scheduler` fields still serve the
82 /// unmigrated handlers during Stages A-D; migrated handlers
83 /// dispatch here. Stages B-D progressively retire the Client
84 /// fields per RFC-017 §9.
85 backend: Arc<dyn EngineBackend>,
86}
87
88/// Server error type.
89#[derive(Debug, thiserror::Error)]
90pub enum ServerError {
91 /// Backend transport error. Previously wrapped `ferriskey::Error`
92 /// directly (#88); now carries a backend-agnostic
93 /// [`ff_core::BackendError`] so consumers match on
94 /// [`ff_core::BackendErrorKind`] instead of ferriskey's native
95 /// taxonomy. The ferriskey → [`ff_core::BackendError`] mapping
96 /// lives in `ff_backend_valkey::backend_error_from_ferriskey`.
97 #[error("backend: {0}")]
98 Backend(#[from] ff_core::BackendError),
99 /// Backend error with additional context. Previously
100 /// `ValkeyContext { source: ferriskey::Error }` (#88).
101 #[error("backend ({context}): {source}")]
102 BackendContext {
103 #[source]
104 source: ff_core::BackendError,
105 context: String,
106 },
107 #[error("config: {0}")]
108 Config(#[from] crate::config::ConfigError),
109 #[error("library load: {0}")]
110 LibraryLoad(#[from] ff_script::loader::LoadError),
111 #[error("partition mismatch: {0}")]
112 PartitionMismatch(String),
113 #[error("not found: {0}")]
114 NotFound(String),
115 #[error("invalid input: {0}")]
116 InvalidInput(String),
117 #[error("operation failed: {0}")]
118 OperationFailed(String),
119 #[error("script: {0}")]
120 Script(String),
121 /// Server-wide concurrency limit reached on a labelled pool. Surfaces
122 /// as HTTP 429 at the REST boundary so load balancers and clients can
123 /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
124 /// etc.) identifies WHICH pool is exhausted so operators aren't
125 /// misled by a single "tail unavailable" string when the real fault
126 /// is rotation contention.
127 ///
128 /// Fields: (source_label, max_permits).
129 #[error("too many concurrent {0} calls (max: {1})")]
130 ConcurrencyLimitExceeded(&'static str, u32),
131 /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
132 /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
133 /// hash-tag routing; older versions do not implement the required
134 /// primitives. Operator action: upgrade Valkey.
135 #[error(
136 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
137 )]
138 ValkeyVersionTooLow {
139 detected: String,
140 required: String,
141 },
142 /// RFC-017 §9.0 hard-gate: selected backend is not yet permitted
143 /// to boot in this `ff-server` binary. Operator action is to
144 /// either (a) select `FF_BACKEND=valkey`, or (b) upgrade to a
145 /// Stage E binary once v0.8.0 ships.
146 ///
147 /// `stage` names the current stage ("A"/"B"/"C"/"D") so operator
148 /// tooling can correlate the refusal with the migration plan.
149 #[error(
150 "backend not ready: {backend} (not in BACKEND_STAGE_READY; current stage {stage}). \
151 Set FF_BACKEND=valkey or FF_BACKEND=postgres."
152 )]
153 BackendNotReady {
154 backend: &'static str,
155 stage: &'static str,
156 },
157 /// RFC-017 Stage A: an `EngineBackend` trait method returned a
158 /// typed error that is not one of the specific business outcomes
159 /// existing `ServerError` variants model. Includes transport
160 /// faults, validation/corruption, and `Unavailable` for methods
161 /// the backend has not implemented yet.
162 ///
163 /// Stage B/C migrations may refine individual arms into richer
164 /// `ServerError` variants as more handlers route through the
165 /// trait; Stage A keeps this catch-all so the migration lands
166 /// additively. Boxed to keep `ServerError` small (clippy
167 /// `result_large_err` — `EngineError` is ~200 bytes).
168 #[error("engine: {0}")]
169 Engine(#[from] Box<ff_core::engine_error::EngineError>),
170}
171
172/// Lift a native `ferriskey::Error` into [`ServerError::Backend`] via
173/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
174/// `?`-propagation ergonomic at ferriskey call sites while the
175/// public variant stays backend-agnostic.
176impl From<ferriskey::Error> for ServerError {
177 fn from(err: ferriskey::Error) -> Self {
178 Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
179 }
180}
181
182/// Lift an unboxed `EngineError` into [`ServerError::Engine`]. The
183/// variant stores a `Box<EngineError>` to keep `ServerError` small
184/// (clippy `result_large_err`); this `From` restores `?`-propagation
185/// from trait-dispatched handler paths.
186impl From<ff_core::engine_error::EngineError> for ServerError {
187 fn from(err: ff_core::engine_error::EngineError) -> Self {
188 Self::Engine(Box::new(err))
189 }
190}
191
192/// Build a [`ServerError::BackendContext`] from a native
193/// `ferriskey::Error` and a call-site label (#88).
194pub(crate) fn backend_context(
195 err: ferriskey::Error,
196 context: impl Into<String>,
197) -> ServerError {
198 ServerError::BackendContext {
199 source: ff_backend_valkey::backend_error_from_ferriskey(&err),
200 context: context.into(),
201 }
202}
203
204impl ServerError {
205 /// Returns the classified [`ff_core::BackendErrorKind`] if this
206 /// error carries a backend transport fault. Covers direct
207 /// Backend variants and library-load failures.
208 ///
209 /// Renamed from `valkey_kind` in #88 — the previous return type
210 /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
211 /// consumer doing retry/error classification.
212 pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
213 match self {
214 Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
215 Self::LibraryLoad(e) => e
216 .valkey_kind()
217 .map(ff_backend_valkey::classify_ferriskey_kind),
218 // RFC-017 Stage A: Engine(EngineError) arm is intentionally
219 // lumped in with the rest (no backend_kind — variants like
220 // Validation / NotFound are business-logic, and Transport
221 // variants already surface under Backend upstream in most
222 // paths). Stage B/C will revisit when more handlers route
223 // through the trait.
224 _ => None,
225 }
226 }
227
228 /// Whether this error is safely retryable by a caller. For
229 /// backend transport variants, delegates to
230 /// [`ff_core::BackendErrorKind::is_retryable`]. Business-logic
231 /// rejections (NotFound, InvalidInput, OperationFailed, Script,
232 /// Config, PartitionMismatch) return false — those won't change
233 /// on retry.
234 pub fn is_retryable(&self) -> bool {
235 match self {
236 Self::Backend(be) | Self::BackendContext { source: be, .. } => {
237 be.kind().is_retryable()
238 }
239 Self::LibraryLoad(load_err) => load_err
240 .valkey_kind()
241 .map(is_retryable_kind)
242 .unwrap_or(false),
243 Self::Config(_)
244 | Self::PartitionMismatch(_)
245 | Self::NotFound(_)
246 | Self::InvalidInput(_)
247 | Self::OperationFailed(_)
248 | Self::Script(_) => false,
249 // Back off and retry — the bound is a server-side permit pool,
250 // so the retry will succeed once a permit frees up. Applies
251 // equally to stream ops, admin rotate, etc.
252 Self::ConcurrencyLimitExceeded(_, _) => true,
253 // Operator must upgrade Valkey; a retry at the caller won't help.
254 Self::ValkeyVersionTooLow { .. } => false,
255 // Operator must change FF_BACKEND; not retryable.
256 Self::BackendNotReady { .. } => false,
257 // EngineError's classification helpers handle transport
258 // retry semantics; mirror them so trait-dispatched handlers
259 // keep the same retry policy as the Client path.
260 Self::Engine(e) => matches!(
261 e.as_ref(),
262 EngineError::Transport { .. } | EngineError::Contextual { .. }
263 ),
264 }
265 }
266}
267
268impl Server {
269 /// Start the FlowFabric server.
270 ///
271 /// Boot sequence:
272 /// 1. Connect to Valkey
273 /// 2. Validate or create partition config key
274 /// 3. Load the FlowFabric Lua library
275 /// 4. Start engine (14 background scanners)
276 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
277 Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
278 }
279
280 /// PR-94: boot the server with a shared observability registry.
281 ///
282 /// Scanner cycle + scheduler metrics record into this registry;
283 /// `main.rs` threads the same handle into the router so `/metrics`
284 /// exposes what the engine produces. The no-arg [`Server::start`]
285 /// forwards here with a fresh `Metrics::new()` — under the default
286 /// build that's the shim, under `observability` it's a real
287 /// registry not shared with any HTTP route (useful for tests
288 /// exercising the engine in isolation).
289 ///
290 /// **RFC-017 Stage A:** this path gates `config.backend` against
291 /// [`BACKEND_STAGE_READY`] (refuses `BackendKind::Postgres` at
292 /// boot per §9.0), dials the Valkey cluster through the legacy
293 /// path, then synthesises an `Arc<ValkeyBackend>` around the
294 /// dialed client and populates `Server.backend`. The dual-field
295 /// posture is explicit through Stage D; Stage E retires the
296 /// legacy Client fields. See [`Server::start_with_backend`] for
297 /// the test-injection entry point that takes a caller-supplied
298 /// backend.
299 pub async fn start_with_metrics(
300 config: ServerConfig,
301 metrics: Arc<ff_observability::Metrics>,
302 ) -> Result<Self, ServerError> {
303 // RFC-017 §9.0 hard-gate. At v0.8.0 (Stage E4) both `valkey` and
304 // `postgres` are ready; this check remains as defence-in-depth
305 // so future backend additions must explicitly opt into the list.
306 // The `FF_BACKEND_ACCEPT_UNREADY` / `FF_ENV=development` dev-mode
307 // override was retired at Stage E4 because it is no longer
308 // needed — both supported backends now boot without override.
309 let label = config.backend.as_str();
310 if !BACKEND_STAGE_READY.contains(&label) {
311 return Err(ServerError::BackendNotReady {
312 backend: match config.backend {
313 BackendKind::Postgres => "postgres",
314 BackendKind::Valkey => "valkey",
315 },
316 stage: "E4",
317 });
318 }
319
320 // RFC-017 Wave 8 Stage E1: Postgres dial branch. Stage E4 flips
321 // `BACKEND_STAGE_READY` to include "postgres"; until then the
322 // dev-mode override above is the only way in. On the Postgres
323 // path we skip the Valkey-specific engine/scanner + scheduler
324 // construction entirely (E3 wires the scheduler twin).
325 if matches!(config.backend, BackendKind::Postgres) {
326 return Self::start_postgres_branch(config, metrics).await;
327 }
328 // Step 1: Connect to Valkey via ClientBuilder
329 tracing::info!(
330 host = %config.valkey.host, port = config.valkey.port,
331 tls = config.valkey.tls, cluster = config.valkey.cluster,
332 "connecting to Valkey"
333 );
334 let mut builder = ClientBuilder::new()
335 .host(&config.valkey.host, config.valkey.port)
336 .connect_timeout(Duration::from_secs(10))
337 .request_timeout(Duration::from_millis(5000));
338 if config.valkey.tls {
339 builder = builder.tls();
340 }
341 if config.valkey.cluster {
342 builder = builder.cluster();
343 }
344 let client = builder
345 .build()
346 .await
347 .map_err(|e| crate::server::backend_context(e, "connect"))?;
348
349 // Verify connectivity
350 let pong: String = client
351 .cmd("PING")
352 .execute()
353 .await
354 .map_err(|e| crate::server::backend_context(e, "PING"))?;
355 if pong != "PONG" {
356 return Err(ServerError::OperationFailed(format!(
357 "unexpected PING response: {pong}"
358 )));
359 }
360 tracing::info!("Valkey connection established");
361
362 // RFC-017 Wave 8 Stage D (§4 row 12): the five Valkey-specific
363 // deployment-initialisation steps (version verify, partition
364 // config, HMAC install, FUNCTION LOAD, lanes seed) now live
365 // behind `ValkeyBackend::initialize_deployment`. Load-bearing
366 // ordering contract preserved in that method's doc-comment.
367 // The call is sequenced after the connection but before the
368 // engine + backend wiring below so the pre-relocation boot
369 // ordering is byte-for-byte identical.
370 let init_backend = ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
371 client.clone(),
372 config.partition_config,
373 );
374 init_backend
375 .initialize_deployment(
376 &config.waitpoint_hmac_secret,
377 &config.lanes,
378 config.valkey.skip_library_load,
379 )
380 .await
381 .map_err(|e| ServerError::Engine(Box::new(e)))?;
382
383 // Step 4: Start engine with scanners
384 // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
385 let engine_cfg = ff_engine::EngineConfig {
386 partition_config: config.partition_config,
387 lanes: config.lanes.clone(),
388 lease_expiry_interval: config.engine_config.lease_expiry_interval,
389 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
390 index_reconciler_interval: config.engine_config.index_reconciler_interval,
391 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
392 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
393 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
394 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
395 budget_reset_interval: config.engine_config.budget_reset_interval,
396 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
397 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
398 unblock_interval: config.engine_config.unblock_interval,
399 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
400 flow_projector_interval: config.engine_config.flow_projector_interval,
401 execution_deadline_interval: config.engine_config.execution_deadline_interval,
402 cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
403 edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
404 edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
405 scanner_filter: config.engine_config.scanner_filter.clone(),
406 };
407 // Engine scanners keep running on the MAIN `client` mux — NOT on
408 // `tail_client`. Scanner cadence is foreground-latency-coupled by
409 // design (an incident that blocks all FCALLs should also visibly
410 // block scanners), and keeping scanners off the tail client means a
411 // long-poll tail can never starve lease-expiry, retention-trim,
412 // etc. Do not change this without revisiting RFC-006 Impl Notes.
413 // Build the Valkey completion backend (issue #90) and subscribe.
414 // This replaces the pre-#90 `CompletionListenerConfig` path:
415 // the wire subscription now lives in ff-backend-valkey, the
416 // engine just consumes the resulting stream.
417 let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
418 config.valkey.host.clone(),
419 config.valkey.port,
420 );
421 valkey_conn.tls = config.valkey.tls;
422 valkey_conn.cluster = config.valkey.cluster;
423 let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
424 client.clone(),
425 config.partition_config,
426 valkey_conn,
427 );
428 let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
429 .await
430 .map_err(|e| ServerError::OperationFailed(format!(
431 "subscribe_completions: {e}"
432 )))?;
433
434 let engine = Engine::start_with_completions(
435 engine_cfg,
436 client.clone(),
437 metrics.clone(),
438 completion_stream,
439 );
440
441 // RFC-017 Stage B: the dedicated `tail_client`, the
442 // `stream_semaphore`, and the `xread_block_lock` moved into
443 // `ValkeyBackend` (§6). After issue #204's switch to per-call
444 // `duplicate_connection()` inside `tail_stream_impl`, the
445 // dedicated tail mux + serialising mutex are no longer
446 // required — each `XREAD BLOCK` gets its own socket — so
447 // the encapsulated impl only needs the bounded semaphore to
448 // preserve the existing 429-on-contention contract at the
449 // REST boundary.
450 tracing::info!(
451 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
452 "stream-op semaphore lives inside ValkeyBackend (RFC-017 Stage B)"
453 );
454
455 // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
456 // rotation, etc.) are only protected by the global Bearer
457 // middleware in api.rs — which is only installed when
458 // config.api_token is set. Without FF_API_TOKEN, a public
459 // deployment exposes secret rotation to anyone that can reach
460 // the listen_addr. Warn loudly so operators can't miss it; we
461 // don't fail-start because single-tenant dev uses this path.
462 if config.api_token.is_none() {
463 tracing::warn!(
464 listen_addr = %config.listen_addr,
465 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
466 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
467 FF_API_TOKEN for any deployment reachable from untrusted \
468 networks."
469 );
470 // Explicit callout for the credential-bearing read endpoints.
471 // Auditors grep for these on a per-endpoint basis; lumping
472 // into the admin warning alone hides the fact that
473 // /pending-waitpoints returns HMAC tokens and /result
474 // returns completion payload bytes.
475 tracing::warn!(
476 listen_addr = %config.listen_addr,
477 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
478 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
479 and GET /v1/executions/{{id}}/result returns raw completion payload \
480 bytes (may contain PII). Both are UNAUTHENTICATED in this \
481 configuration."
482 );
483 }
484
485 // Partition counts — post-RFC-011 there are three physical families.
486 // Execution keys co-locate with their parent flow's partition (§2 of
487 // the RFC), so `num_flow_partitions` governs both exec and flow
488 // routing; no separate `num_execution_partitions` count exists.
489 tracing::info!(
490 flow_partitions = config.partition_config.num_flow_partitions,
491 budget_partitions = config.partition_config.num_budget_partitions,
492 quota_partitions = config.partition_config.num_quota_partitions,
493 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
494 listen_addr = %config.listen_addr,
495 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
496 config.partition_config.num_flow_partitions,
497 config.partition_config.num_budget_partitions,
498 config.partition_config.num_quota_partitions,
499 );
500
501 let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
502 client.clone(),
503 config.partition_config,
504 metrics.clone(),
505 ));
506
507 // RFC-017 Stage A: synthesise an `Arc<ValkeyBackend>` around the
508 // already-dialed client. Zero additional round-trips; the
509 // backend shares the same ferriskey connection as the legacy
510 // path so migrated + legacy handlers observe identical state.
511 // Stage B relocates `tail_client` / `stream_semaphore` into
512 // the backend; Stage E retires the Client fields entirely.
513 let mut valkey_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
514 client.clone(),
515 config.partition_config,
516 {
517 let mut c = ff_core::backend::ValkeyConnection::new(
518 config.valkey.host.clone(),
519 config.valkey.port,
520 );
521 c.tls = config.valkey.tls;
522 c.cluster = config.valkey.cluster;
523 c
524 },
525 );
526 // RFC-017 Stage B: size the backend's stream-op semaphore
527 // before handing out the `Arc`. `get_mut` succeeds here
528 // because `valkey_backend` is the sole `Arc` owner at this
529 // point.
530 if !valkey_backend.with_stream_semaphore_permits(config.max_concurrent_stream_ops) {
531 return Err(ServerError::OperationFailed(
532 "ValkeyBackend stream semaphore sizing failed (unexpected Arc sharing)".into(),
533 ));
534 }
535 // RFC-017 Stage C: install the scheduler handle so the
536 // backend's `claim_for_worker` trait impl dispatches through
537 // it. Stage E3 removed the redundant `Server::scheduler`
538 // field — the backend is the sole owner of the scheduler
539 // after this install.
540 if !valkey_backend.with_scheduler(scheduler) {
541 return Err(ServerError::OperationFailed(
542 "ValkeyBackend scheduler wiring failed (unexpected Arc sharing)".into(),
543 ));
544 }
545 let backend: Arc<dyn EngineBackend> = valkey_backend as Arc<dyn EngineBackend>;
546
547 Ok(Self {
548 // Single-permit semaphore: only one rotate-waitpoint-secret can
549 // be mid-flight server-wide. Attackers or misbehaving scripts
550 // firing parallel rotations fail fast with 429 instead of
551 // queueing HTTP handlers.
552 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
553 engine: Some(engine),
554 config,
555 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
556 metrics,
557 backend,
558 })
559 }
560
561 /// RFC-017 Wave 8 Stage E1/E2: Postgres boot branch for
562 /// [`Server::start_with_metrics`]. Called after the §9.0 hard-gate
563 /// admitted the boot under the dev-mode override.
564 ///
565 /// **Scope (Stage E2):**
566 /// * Dial Postgres + wrap an `Arc<PostgresBackend>` as the
567 /// `backend` field (all migrated HTTP handlers dispatch through
568 /// the trait — create_execution / create_flow / add member /
569 /// stage edge / apply dep / describe_flow / cancel_flow / etc.).
570 /// * **No Valkey dial.** The E1 residual ambient-Valkey client is
571 /// retired in E2 — `Server::client` is gone, `cancel_flow_header`
572 /// / `ack_cancel_member` / `read_execution_info` /
573 /// `read_execution_state` / `fetch_waitpoint_token_v07` all flow
574 /// through the trait. Legacy reads that Postgres does not yet
575 /// implement surface as `EngineError::Unavailable` (Wave 9).
576 /// * Skip engine + scheduler. Stage E3 wires the Postgres-specific
577 /// `claim_for_worker` via the trait; until then the scheduler
578 /// field stays `None`.
579 /// * Run `apply_migrations` against the Postgres pool so an empty
580 /// database becomes usable without operator out-of-band steps.
581 async fn start_postgres_branch(
582 config: ServerConfig,
583 metrics: Arc<ff_observability::Metrics>,
584 ) -> Result<Self, ServerError> {
585 if config.postgres.url.is_empty() {
586 return Err(ServerError::InvalidInput(
587 "FF_BACKEND=postgres requires FF_POSTGRES_URL".into(),
588 ));
589 }
590 tracing::info!(
591 pool_size = config.postgres.pool_size,
592 "dialing Postgres backend (RFC-017 Stage E3)"
593 );
594 let mut pg_backend_arc = ff_backend_postgres::PostgresBackend::connect_with_metrics(
595 config.postgres_config(),
596 config.partition_config,
597 metrics.clone(),
598 )
599 .await
600 .map_err(|e| ServerError::Engine(Box::new(e)))?;
601
602 // Apply schema migrations idempotently so an empty target
603 // database becomes usable. The underlying pool is shared with
604 // the backend — one pool, one migration run.
605 ff_backend_postgres::apply_migrations(pg_backend_arc.pool())
606 .await
607 .map_err(|e| {
608 ServerError::OperationFailed(format!("postgres apply_migrations: {e}"))
609 })?;
610
611 // RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers
612 // (attempt_timeout, lease_expiry, suspension_timeout,
613 // dependency, edge_cancel_dispatcher, edge_cancel_reconciler)
614 // as background tick loops owned by the backend. Drained on
615 // `Server::shutdown` via `backend.shutdown_prepare(grace)`.
616 let scanner_cfg = ff_backend_postgres::PostgresScannerConfig {
617 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
618 lease_expiry_interval: config.engine_config.lease_expiry_interval,
619 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
620 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
621 edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
622 edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
623 dependency_stale_threshold_ms:
624 ff_backend_postgres::PostgresScannerConfig::DEFAULT_DEP_STALE_MS,
625 scanner_filter: config.engine_config.scanner_filter.clone(),
626 partition_config: config.partition_config,
627 };
628 if !pg_backend_arc.with_scanners(scanner_cfg) {
629 return Err(ServerError::OperationFailed(
630 "PostgresBackend scanner install failed (unexpected Arc sharing)".into(),
631 ));
632 }
633
634 let backend: Arc<dyn EngineBackend> = pg_backend_arc;
635
636 tracing::info!(
637 flow_partitions = config.partition_config.num_flow_partitions,
638 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
639 "FlowFabric server started (Postgres backend, Stage E3). \
640 6 Postgres reconcilers active; claim_for_worker routed to \
641 PostgresScheduler. No ambient Valkey client."
642 );
643
644 Ok(Self {
645 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
646 engine: None,
647 config,
648 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
649 metrics,
650 backend,
651 })
652 }
653
654 /// RFC-017 Stage A: test-injection + future-embedded-user entry
655 /// point. Takes a caller-constructed `Arc<dyn EngineBackend>` +
656 /// the Valkey connection/engine scaffolding
657 /// [`Server::start_with_metrics`] normally dials for itself.
658 ///
659 /// **Stage A scope:** Stage A is still dual-field — the legacy
660 /// `client` / `tail_client` / `engine` / `scheduler` fields are
661 /// constructed here exactly as in the main boot path, because
662 /// unmigrated handlers still need them. The caller-supplied
663 /// `backend` populates the new trait-object field and services
664 /// the handlers migrated in this stage (see RFC-017 §4
665 /// migration table).
666 ///
667 /// **Stage D evolution:** once the boot path relocates into each
668 /// backend's `connect_with_metrics` (RFC-017 §9 Stage D), this
669 /// entry point becomes the sole constructor — `Server::start` and
670 /// `Server::start_with_metrics` are thin shims that build the
671 /// backend first, then forward here.
672 ///
673 /// Today (Stage A) this path is exercised by `MockBackend` in
674 /// `tests/parity_stage_a.rs`; it does NOT replace the Valkey
675 /// dial under the main binary.
676 pub async fn start_with_backend(
677 config: ServerConfig,
678 backend: Arc<dyn EngineBackend>,
679 metrics: Arc<ff_observability::Metrics>,
680 ) -> Result<Self, ServerError> {
681 // Stage A: forward through the legacy dial so unmigrated
682 // handlers keep working, then overwrite `backend` with the
683 // caller-supplied handle. Stage D rewires this so the
684 // caller's backend drives the whole boot.
685 let mut server = Self::start_with_metrics(config, metrics).await?;
686 server.backend = backend;
687 Ok(server)
688 }
689
690 /// RFC-017 Stage A: access the backend trait-object driving
691 /// migrated handlers. Stable surface for tests that need to
692 /// inspect the backend directly (e.g. `backend_label()`
693 /// assertions). The Server will dispatch more handlers through
694 /// this handle as Stages B-D land.
695 pub fn backend(&self) -> &Arc<dyn EngineBackend> {
696 &self.backend
697 }
698
699 /// PR-94: access the shared observability registry.
700 pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
701 &self.metrics
702 }
703
704 // RFC-017 Stage E2 (§9 Stage D bullet completed): `Server::client()`
705 // accessor + underlying `Client` field both removed. External
706 // callers route ping / healthz through the backend trait
707 // (`self.backend.ping()` → `ValkeyBackend::ping`). The
708 // `ff_cancel_flow` header FCALL, its `flow_already_terminal`
709 // HMGET/SMEMBERS fallback, the per-member `ff_ack_cancel_member`
710 // backlog drain, and the `get_execution*` / `fetch_waitpoint_token_v07`
711 // reads are all reachable through the Stage E2 trait additions
712 // (`cancel_flow_header`, `ack_cancel_member`, `read_execution_info`,
713 // `read_execution_state`, `fetch_waitpoint_token_v07`).
714
715 /// Get the server config.
716 pub fn config(&self) -> &ServerConfig {
717 &self.config
718 }
719
720 /// Get the partition config.
721 pub fn partition_config(&self) -> &PartitionConfig {
722 &self.config.partition_config
723 }
724
725 // ── Minimal Phase 1 API ──
726
727 /// Create a new execution. RFC-017 Stage D2: delegates through the
728 /// backend trait. The KEYS/ARGV build + FCALL dispatch + result parse
729 /// live verbatim in `ValkeyBackend::create_execution`.
730 pub async fn create_execution(
731 &self,
732 args: &CreateExecutionArgs,
733 ) -> Result<CreateExecutionResult, ServerError> {
734 Ok(self.backend.create_execution(args.clone()).await?)
735 }
736
737 /// Cancel an execution. RFC-017 Stage D2: delegates through the
738 /// backend trait.
739 pub async fn cancel_execution(
740 &self,
741 args: &CancelExecutionArgs,
742 ) -> Result<CancelExecutionResult, ServerError> {
743 Ok(self.backend.cancel_execution(args.clone()).await?)
744 }
745
746 /// Get the public state of an execution.
747 ///
748 /// Reads `public_state` from the exec_core hash. Returns the parsed
749 /// PublicState enum. If the execution is not found, returns an error.
750 pub async fn get_execution_state(
751 &self,
752 execution_id: &ExecutionId,
753 ) -> Result<PublicState, ServerError> {
754 // RFC-017 Stage E2: routed through the backend trait.
755 match self.backend.read_execution_state(execution_id).await? {
756 Some(s) => Ok(s),
757 None => Err(ServerError::NotFound(format!(
758 "execution not found: {execution_id}"
759 ))),
760 }
761 }
762
763 /// Read the raw result payload written by `ff_complete_execution`.
764 ///
765 /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
766 /// No FCALL — this is a direct GET; returns `Ok(None)` when the
767 /// execution is missing, not yet complete, or (in a future
768 /// retention-policy world) when the result was trimmed.
769 ///
770 /// # Contract vs `get_execution_state`
771 ///
772 /// `get_execution_state` is the authoritative completion signal. If
773 /// a caller observes `state == completed` but `get_execution_result`
774 /// returns `None`, the result bytes are unavailable — not a caller
775 /// bug and not a server bug, just the retention policy trimming the
776 /// blob. V1 sets no retention, so callers on v1 can treat
777 /// `state == completed` + `Ok(None)` as a server bug.
778 ///
779 /// # Ordering
780 ///
781 /// Callers MUST wait for `state == completed` before calling this
782 /// method; polls issued before the state transition may hit a
783 /// narrow window where the completion Lua has written
784 /// `public_state = completed` but the `result` key SET is still
785 /// on-wire. The current Lua `ff_complete_execution` writes both in
786 /// the same atomic script, so the window is effectively zero for
787 /// direct callers — but retries via `ff_replay_execution` open it
788 /// briefly.
789 pub async fn get_execution_result(
790 &self,
791 execution_id: &ExecutionId,
792 ) -> Result<Option<Vec<u8>>, ServerError> {
793 // RFC-017 Stage E2: routed through the backend trait. The
794 // Valkey impl preserves binary-safe semantics via ferriskey's
795 // `Vec<u8>` FromValue; Postgres returns Unavailable until the
796 // result-store migration lands.
797 Ok(self.backend.get_execution_result(execution_id).await?)
798 }
799
800
801 // ── Budget / Quota API ──
802
803 /// Create a new budget policy.
804 /// Create a new budget policy. RFC-017 Stage D2: delegates through
805 /// the backend trait.
806 pub async fn create_budget(
807 &self,
808 args: &CreateBudgetArgs,
809 ) -> Result<CreateBudgetResult, ServerError> {
810 Ok(self.backend.create_budget(args.clone()).await?)
811 }
812
813 /// Create a new quota/rate-limit policy. RFC-017 Stage D2: delegates
814 /// through the backend trait.
815 pub async fn create_quota_policy(
816 &self,
817 args: &CreateQuotaPolicyArgs,
818 ) -> Result<CreateQuotaPolicyResult, ServerError> {
819 Ok(self.backend.create_quota_policy(args.clone()).await?)
820 }
821
822 /// Read-only budget status for operator visibility. RFC-017 Stage
823 /// D2: delegates through the backend trait.
824 pub async fn get_budget_status(
825 &self,
826 budget_id: &BudgetId,
827 ) -> Result<BudgetStatus, ServerError> {
828 Ok(self.backend.get_budget_status(budget_id).await?)
829 }
830
831 /// Report usage against a budget and check limits. RFC-017 Stage D2:
832 /// delegates through the backend trait's admin variant
833 /// (`report_usage_admin` — no worker handle required on the admin
834 /// path).
835 pub async fn report_usage(
836 &self,
837 budget_id: &BudgetId,
838 args: &ReportUsageArgs,
839 ) -> Result<ReportUsageResult, ServerError> {
840 let mut admin_args = ff_core::contracts::ReportUsageAdminArgs::new(
841 args.dimensions.clone(),
842 args.deltas.clone(),
843 args.now,
844 );
845 if let Some(key) = args.dedup_key.as_ref() {
846 admin_args = admin_args.with_dedup_key(key.clone());
847 }
848 Ok(self.backend.report_usage_admin(budget_id, admin_args).await?)
849 }
850
851 /// Reset a budget's usage counters and schedule the next reset.
852 /// RFC-017 Stage D2: delegates through the backend trait.
853 pub async fn reset_budget(
854 &self,
855 budget_id: &BudgetId,
856 ) -> Result<ResetBudgetResult, ServerError> {
857 let args = ff_core::contracts::ResetBudgetArgs {
858 budget_id: budget_id.clone(),
859 now: TimestampMs::now(),
860 };
861 Ok(self.backend.reset_budget(args).await?)
862 }
863
864 // ── Flow API ──
865
866 /// Create a new flow container. RFC-017 Stage D2: delegates through
867 /// the backend trait.
868 pub async fn create_flow(
869 &self,
870 args: &CreateFlowArgs,
871 ) -> Result<CreateFlowResult, ServerError> {
872 Ok(self.backend.create_flow(args.clone()).await?)
873 }
874
875 /// Add an execution to a flow.
876 ///
877 /// # Atomic single-FCALL commit (RFC-011 §7.3)
878 ///
879 /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
880 /// hash-tag routing (both hash to `{fp:N}` via the exec id's
881 /// embedded partition). A single atomic FCALL writes:
882 ///
883 /// - `members_set` SADD (flow membership)
884 /// - `exec_core.flow_id` HSET (back-pointer)
885 /// - `flow_index` SADD (self-heal)
886 /// - `flow_core` HINCRBY node_count / graph_revision +
887 /// HSET last_mutation_at
888 ///
889 /// All four writes commit atomically or none do (Valkey scripting
890 /// contract: validates-before-writing in the Lua body means
891 /// `flow_not_found` / `flow_already_terminal` early-returns fire
892 /// BEFORE any `redis.call()` mutation, and a mid-body error after
893 /// writes is not expected because all writes are on the same slot).
894 ///
895 /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
896 ///
897 /// # Consumer contract
898 ///
899 /// The caller's `args.execution_id` **must** be co-located with
900 /// `args.flow_id`'s partition — i.e. minted via
901 /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
902 /// `solo`-minted id (or any exec id hashing to a different
903 /// `{fp:N}` than the flow's) will fail at the Valkey level with
904 /// `CROSSSLOT` on a clustered deploy.
905 ///
906 /// Callers with a flow context in scope always use `for_flow`;
907 /// this is the only supported mint path for flow-member execs
908 /// post-RFC-011. Test fixtures that pre-date the co-location
909 /// contract use `TestCluster::new_execution_id_on_partition` to
910 /// pin to a specific hash-tag index for `fcall_create_flow`-style
911 /// helpers that hard-code their flow partition.
912 pub async fn add_execution_to_flow(
913 &self,
914 args: &AddExecutionToFlowArgs,
915 ) -> Result<AddExecutionToFlowResult, ServerError> {
916 // Preserve the typed `ServerError::PartitionMismatch` pre-flight
917 // check — the backend trait's implementation returns an
918 // `EngineError` on CROSSSLOT, which would surface as
919 // `ServerError::Engine(_)` and hide the consumer-contract
920 // violation. Keep the explicit check at the facade boundary.
921 let flow_part = flow_partition(&args.flow_id, &self.config.partition_config);
922 let exec_part = execution_partition(&args.execution_id, &self.config.partition_config);
923 if exec_part.index != flow_part.index {
924 return Err(ServerError::PartitionMismatch(format!(
925 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
926 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
927 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
928 exec_p = exec_part.index,
929 flow_p = flow_part.index,
930 )));
931 }
932 Ok(self.backend.add_execution_to_flow(args.clone()).await?)
933 }
934
935 /// Cancel a flow.
936 ///
937 /// Flips `public_flow_state` to `cancelled` atomically via
938 /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
939 /// executions must be cancelled cross-partition; this dispatch runs in
940 /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
941 /// immediately. For all other policies (or flows with no members), or
942 /// when the flow was already in a terminal state (idempotent retry),
943 /// the call returns [`CancelFlowResult::Cancelled`].
944 ///
945 /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
946 ///
947 /// # Backpressure
948 ///
949 /// Each call that hits the async dispatch path spawns a new task into
950 /// the shared background `JoinSet`. Rapid repeated calls against the
951 /// same flow will spawn *multiple* overlapping dispatch tasks. This is
952 /// not a correctness issue — each member cancel is idempotent and
953 /// terminal flows short-circuit via [`ff_core::contracts::CancelFlowHeader::AlreadyTerminal`]
954 /// — but heavy burst callers should either use `?wait=true` (serialises
955 /// the dispatch on the HTTP thread, giving natural backpressure) or
956 /// implement client-side deduplication on `flow_id`. The `JoinSet` is
957 /// drained with a 15s timeout on [`Self::shutdown`], so very long
958 /// dispatch tails may be aborted during graceful shutdown.
959 ///
960 /// # Orphan-member semantics on shutdown abort
961 ///
962 /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
963 /// while a dispatch loop is mid-iteration, the already-issued
964 /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
965 /// `terminal_outcome = cancelled` and the caller-supplied reason. The
966 /// members not yet visited are abandoned mid-loop. They remain in
967 /// whichever state they were in (active/eligible/suspended) until the
968 /// natural lifecycle scanners reach them: active leases expire
969 /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
970 /// members time out to `skipped`, eligible ones sit until retention
971 /// trim. So no orphan state — but the terminal_outcome for the
972 /// abandoned members will be `expired`/`skipped` rather than
973 /// `cancelled`, and the operator-supplied `reason` is lost for them.
974 /// Audit tooling that requires reason fidelity across shutdowns should
975 /// use `?wait=true`.
976 pub async fn cancel_flow(
977 &self,
978 args: &CancelFlowArgs,
979 ) -> Result<CancelFlowResult, ServerError> {
980 self.cancel_flow_inner(args, false).await
981 }
982
983 /// Cancel a flow and wait for all member cancellations to complete
984 /// inline. Slower than [`Self::cancel_flow`] for large flows, but
985 /// guarantees every member is in a terminal state on return.
986 pub async fn cancel_flow_wait(
987 &self,
988 args: &CancelFlowArgs,
989 ) -> Result<CancelFlowResult, ServerError> {
990 self.cancel_flow_inner(args, true).await
991 }
992
993 async fn cancel_flow_inner(
994 &self,
995 args: &CancelFlowArgs,
996 wait: bool,
997 ) -> Result<CancelFlowResult, ServerError> {
998 // RFC-017 Stage E2: the header FCALL + AlreadyTerminal fetch
999 // now dispatch through the backend trait. The Server no longer
1000 // owns a ferriskey `Client`; `self.backend.cancel_flow_header`
1001 // encapsulates the Valkey-specific FCALL + reload-on-failover
1002 // + HMGET/SMEMBERS-for-AlreadyTerminal work previously inlined
1003 // here.
1004 let header = self.backend.cancel_flow_header(args.clone()).await?;
1005
1006 let (policy, members) = match header {
1007 ff_core::contracts::CancelFlowHeader::Cancelled {
1008 cancellation_policy,
1009 member_execution_ids,
1010 } => (cancellation_policy, member_execution_ids),
1011 // Idempotent retry: flow was already cancelled/completed/failed.
1012 // Return Cancelled with the *stored* policy + (capped) member
1013 // list so observability tooling gets the real historical state
1014 // rather than echoing the caller's retry intent. The backend
1015 // has already done the HMGET + SMEMBERS; the Server just caps
1016 // the member list to bound wire bandwidth.
1017 ff_core::contracts::CancelFlowHeader::AlreadyTerminal {
1018 stored_cancellation_policy,
1019 stored_cancel_reason,
1020 member_execution_ids,
1021 } => {
1022 let total_members = member_execution_ids.len();
1023 let stored_members: Vec<String> = member_execution_ids
1024 .into_iter()
1025 .take(ALREADY_TERMINAL_MEMBER_CAP)
1026 .collect();
1027 tracing::debug!(
1028 flow_id = %args.flow_id,
1029 stored_policy = stored_cancellation_policy.as_deref().unwrap_or(""),
1030 stored_reason = stored_cancel_reason.as_deref().unwrap_or(""),
1031 total_members,
1032 returned_members = stored_members.len(),
1033 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1034 );
1035 return Ok(CancelFlowResult::Cancelled {
1036 cancellation_policy: stored_cancellation_policy
1037 .unwrap_or_else(|| args.cancellation_policy.clone()),
1038 member_execution_ids: stored_members,
1039 });
1040 }
1041 // `CancelFlowHeader` is `#[non_exhaustive]`. Any future
1042 // variant must be reviewed at this match site before it
1043 // reaches the wire; fall closed with a typed server error.
1044 other => {
1045 return Err(ServerError::OperationFailed(format!(
1046 "cancel_flow_header: unknown CancelFlowHeader variant: {other:?}"
1047 )));
1048 }
1049 };
1050 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1051
1052 if !needs_dispatch {
1053 return Ok(CancelFlowResult::Cancelled {
1054 cancellation_policy: policy,
1055 member_execution_ids: members,
1056 });
1057 }
1058
1059 if wait {
1060 // Synchronous dispatch — cancel every member inline before returning.
1061 // Collect per-member failures so the caller sees a
1062 // PartiallyCancelled outcome instead of a false-positive
1063 // Cancelled when any member cancel faulted. The
1064 // cancel-backlog reconciler still retries the unacked
1065 // members; surfacing the partial state lets operator
1066 // tooling alert without polling per-member state.
1067 // RFC-017 Stage E2: ack drain dispatches via the backend
1068 // trait's `ack_cancel_member` — the Server no longer owns
1069 // a raw ferriskey `Client`.
1070 let mut failed: Vec<String> = Vec::new();
1071 for eid_str in &members {
1072 let Ok(eid) = ExecutionId::parse(eid_str) else {
1073 failed.push(eid_str.clone());
1074 continue;
1075 };
1076 let cancel_args = ff_core::contracts::CancelExecutionArgs {
1077 execution_id: eid.clone(),
1078 reason: args.reason.clone(),
1079 source: ff_core::types::CancelSource::OperatorOverride,
1080 lease_id: None,
1081 lease_epoch: None,
1082 attempt_id: None,
1083 now: args.now,
1084 };
1085 match self.backend.cancel_execution(cancel_args).await {
1086 Ok(_) => {
1087 ack_cancel_member_via_backend(
1088 self.backend.as_ref(),
1089 &args.flow_id,
1090 &eid,
1091 eid_str,
1092 )
1093 .await;
1094 }
1095 Err(e) => {
1096 if is_terminal_ack_engine_error(&e) {
1097 ack_cancel_member_via_backend(
1098 self.backend.as_ref(),
1099 &args.flow_id,
1100 &eid,
1101 eid_str,
1102 )
1103 .await;
1104 continue;
1105 }
1106 tracing::warn!(
1107 execution_id = %eid_str,
1108 error = %e,
1109 "cancel_flow(wait): individual execution cancel failed \
1110 (transport/contract fault; reconciler will retry if transient)"
1111 );
1112 failed.push(eid_str.clone());
1113 }
1114 }
1115 }
1116 if failed.is_empty() {
1117 return Ok(CancelFlowResult::Cancelled {
1118 cancellation_policy: policy,
1119 member_execution_ids: members,
1120 });
1121 }
1122 return Ok(CancelFlowResult::PartiallyCancelled {
1123 cancellation_policy: policy,
1124 member_execution_ids: members,
1125 failed_member_execution_ids: failed,
1126 });
1127 }
1128
1129 // Asynchronous dispatch — spawn into the shared JoinSet so
1130 // Server::shutdown can wait for pending cancellations (bounded
1131 // by a shutdown timeout). RFC-017 Stage E2: both the
1132 // per-member cancel and the backlog ack dispatch through the
1133 // backend trait (the Server no longer holds a ferriskey handle).
1134 let backend = self.backend.clone();
1135 let reason = args.reason.clone();
1136 let now = args.now;
1137 let dispatch_members = members.clone();
1138 let flow_id = args.flow_id.clone();
1139 // Every async cancel_flow contends on this lock, but the
1140 // critical section is tiny: try_join_next drain + spawn.
1141 let mut guard = self.background_tasks.lock().await;
1142
1143 // Reap completed background dispatches before spawning the next.
1144 while let Some(joined) = guard.try_join_next() {
1145 if let Err(e) = joined {
1146 tracing::warn!(
1147 error = %e,
1148 "cancel_flow: background dispatch task panicked or was aborted"
1149 );
1150 }
1151 }
1152
1153 guard.spawn(async move {
1154 // Bounded parallel dispatch via futures::stream::buffer_unordered.
1155 use futures::stream::StreamExt;
1156 const CONCURRENCY: usize = 16;
1157
1158 let member_count = dispatch_members.len();
1159 let flow_id_for_log = flow_id.clone();
1160 futures::stream::iter(dispatch_members)
1161 .map(|eid_str| {
1162 let backend = backend.clone();
1163 let reason = reason.clone();
1164 let flow_id = flow_id.clone();
1165 async move {
1166 let Ok(eid) = ExecutionId::parse(&eid_str) else {
1167 tracing::warn!(
1168 flow_id = %flow_id,
1169 execution_id = %eid_str,
1170 "cancel_flow(async): member id failed to parse; skipping"
1171 );
1172 return;
1173 };
1174 let cancel_args = ff_core::contracts::CancelExecutionArgs {
1175 execution_id: eid.clone(),
1176 reason: reason.clone(),
1177 source: ff_core::types::CancelSource::OperatorOverride,
1178 lease_id: None,
1179 lease_epoch: None,
1180 attempt_id: None,
1181 now,
1182 };
1183 match backend.cancel_execution(cancel_args).await {
1184 Ok(_) => {
1185 ack_cancel_member_via_backend(
1186 backend.as_ref(),
1187 &flow_id,
1188 &eid,
1189 &eid_str,
1190 )
1191 .await;
1192 }
1193 Err(e) => {
1194 if is_terminal_ack_engine_error(&e) {
1195 ack_cancel_member_via_backend(
1196 backend.as_ref(),
1197 &flow_id,
1198 &eid,
1199 &eid_str,
1200 )
1201 .await;
1202 } else {
1203 tracing::warn!(
1204 flow_id = %flow_id,
1205 execution_id = %eid_str,
1206 error = %e,
1207 "cancel_flow(async): individual execution cancel failed \
1208 (transport/contract fault; reconciler will retry if transient)"
1209 );
1210 }
1211 }
1212 }
1213 }
1214 })
1215 .buffer_unordered(CONCURRENCY)
1216 .for_each(|()| async {})
1217 .await;
1218
1219 tracing::debug!(
1220 flow_id = %flow_id_for_log,
1221 member_count,
1222 concurrency = CONCURRENCY,
1223 "cancel_flow: background member dispatch complete"
1224 );
1225 });
1226 drop(guard);
1227
1228 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1229 Ok(CancelFlowResult::CancellationScheduled {
1230 cancellation_policy: policy,
1231 member_count,
1232 member_execution_ids: members,
1233 })
1234 }
1235
1236 /// Stage a dependency edge between two executions in a flow.
1237 ///
1238 /// Runs on the flow partition {fp:N}.
1239 /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1240 pub async fn stage_dependency_edge(
1241 &self,
1242 args: &StageDependencyEdgeArgs,
1243 ) -> Result<StageDependencyEdgeResult, ServerError> {
1244 Ok(self.backend.stage_dependency_edge(args.clone()).await?)
1245 }
1246
1247 /// Apply a staged dependency edge to the child execution. RFC-017
1248 /// Stage D2: delegates through the backend trait.
1249 pub async fn apply_dependency_to_child(
1250 &self,
1251 args: &ApplyDependencyToChildArgs,
1252 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1253 Ok(self.backend.apply_dependency_to_child(args.clone()).await?)
1254 }
1255
1256 // ── Execution operations API ──
1257
1258 /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1259 ///
1260 /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1261 /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1262 pub async fn deliver_signal(
1263 &self,
1264 args: &DeliverSignalArgs,
1265 ) -> Result<DeliverSignalResult, ServerError> {
1266 // RFC-017 Stage A migration: dispatch through the backend
1267 // trait. The previous body (lane pre-read + KEYS(14) + ARGV(18)
1268 // FCALL dispatch + `parse_deliver_signal_result`) lives
1269 // verbatim inside `ValkeyBackend::deliver_signal` →
1270 // `deliver_signal_impl`. Clone required because the trait
1271 // method takes `DeliverSignalArgs` by value (see
1272 // `ff_core::engine_backend::EngineBackend::deliver_signal`).
1273 Ok(self.backend.deliver_signal(args.clone()).await?)
1274 }
1275
1276 /// Change an execution's priority. RFC-017 Stage D2: delegates
1277 /// through the backend trait. Empty `lane_id` triggers the backend-
1278 /// internal HGET pre-read (matches legacy inherent behaviour).
1279 pub async fn change_priority(
1280 &self,
1281 execution_id: &ExecutionId,
1282 new_priority: i32,
1283 ) -> Result<ChangePriorityResult, ServerError> {
1284 let args = ff_core::contracts::ChangePriorityArgs {
1285 execution_id: execution_id.clone(),
1286 new_priority,
1287 lane_id: LaneId::new(""),
1288 now: TimestampMs::now(),
1289 };
1290 Ok(self.backend.change_priority(args).await?)
1291 }
1292
1293 /// Scheduler-routed claim entry point.
1294 ///
1295 /// RFC-017 Wave 8 Stage E3 (§7): dispatches through the backend
1296 /// trait. The Valkey backend forwards to its wired
1297 /// [`ff_scheduler::Scheduler`]; the Postgres backend forwards to
1298 /// [`ff_backend_postgres::scheduler::PostgresScheduler`]'s
1299 /// `FOR UPDATE SKIP LOCKED` admission pipeline. Returns
1300 /// `Ok(None)` when no eligible execution exists on the lane at
1301 /// this scan cycle — the enum-typed trait outcome
1302 /// (`ClaimForWorkerOutcome::NoWork`) is collapsed to `Option::None`
1303 /// for the inherent-call contract pre-existing Stage E.
1304 ///
1305 /// Error mapping: scheduler-class errors arrive as
1306 /// [`EngineError`] via the trait boundary and thread through
1307 /// `ServerError::Engine`'s HTTP arm
1308 /// (budget / capability / unavailable classes land on the
1309 /// documented 400/409/503 response codes — see `api::ApiError::into_response`).
1310 pub async fn claim_for_worker(
1311 &self,
1312 lane: &LaneId,
1313 worker_id: &WorkerId,
1314 worker_instance_id: &WorkerInstanceId,
1315 worker_capabilities: &std::collections::BTreeSet<String>,
1316 grant_ttl_ms: u64,
1317 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1318 let args = ff_core::contracts::ClaimForWorkerArgs::new(
1319 lane.clone(),
1320 worker_id.clone(),
1321 worker_instance_id.clone(),
1322 worker_capabilities.clone(),
1323 grant_ttl_ms,
1324 );
1325 match self.backend.claim_for_worker(args).await? {
1326 ff_core::contracts::ClaimForWorkerOutcome::Granted(g) => Ok(Some(g)),
1327 ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
1328 // `#[non_exhaustive]` — any future additive variant
1329 // surfaces as a typed Engine error so callers see a
1330 // loud miss instead of a silent `None`.
1331 _ => Err(ServerError::Engine(Box::new(
1332 ff_core::engine_error::EngineError::Unavailable {
1333 op: "claim_for_worker: unknown ClaimForWorkerOutcome variant",
1334 },
1335 ))),
1336 }
1337 }
1338
1339 /// Revoke an active lease (operator-initiated). RFC-017 Stage D2:
1340 /// delegates through the backend trait. The backend's trait impl
1341 /// returns `RevokeLeaseResult::AlreadySatisfied` when no active
1342 /// lease is present; the Server facade preserves its pre-migration
1343 /// `ServerError::NotFound` behaviour by re-mapping that variant.
1344 pub async fn revoke_lease(
1345 &self,
1346 execution_id: &ExecutionId,
1347 ) -> Result<RevokeLeaseResult, ServerError> {
1348 let args = ff_core::contracts::RevokeLeaseArgs {
1349 execution_id: execution_id.clone(),
1350 expected_lease_id: None,
1351 worker_instance_id: WorkerInstanceId::new(""),
1352 reason: "operator_revoke".to_owned(),
1353 };
1354 match self.backend.revoke_lease(args).await? {
1355 RevokeLeaseResult::AlreadySatisfied { reason } if reason == "no_active_lease" => {
1356 Err(ServerError::NotFound(format!(
1357 "no active lease for execution {execution_id} (no current_worker_instance_id)"
1358 )))
1359 }
1360 other => Ok(other),
1361 }
1362 }
1363
1364 /// Get full execution info (HGETALL-shape on Valkey; SELECT-shape on
1365 /// Postgres once Wave 9 wires it). RFC-017 Stage E2: routed through
1366 /// the backend trait's [`ff_core::engine_backend::EngineBackend::read_execution_info`].
1367 pub async fn get_execution(
1368 &self,
1369 execution_id: &ExecutionId,
1370 ) -> Result<ExecutionInfo, ServerError> {
1371 match self.backend.read_execution_info(execution_id).await? {
1372 Some(info) => Ok(info),
1373 None => Err(ServerError::NotFound(format!(
1374 "execution not found: {execution_id}"
1375 ))),
1376 }
1377 }
1378
1379 /// Partition-scoped forward-only cursor listing of executions.
1380 ///
1381 /// Parity-wrapper around the Valkey body of
1382 /// [`ff_core::engine_backend::EngineBackend::list_executions`].
1383 /// Issue #182 replaced the previous offset + lane + state-filter
1384 /// shape with this cursor-based API (per owner adjudication:
1385 /// cursor-everywhere, HTTP surface unreleased). Reads
1386 /// `ff:idx:{p:N}:all_executions`, sorts lexicographically on
1387 /// `ExecutionId`, filters `> cursor`, and trims to `limit`.
1388 pub async fn list_executions_page(
1389 &self,
1390 partition_id: u16,
1391 cursor: Option<ExecutionId>,
1392 limit: usize,
1393 ) -> Result<ListExecutionsPage, ServerError> {
1394 // RFC-017 Stage A migration: dispatch through the backend
1395 // trait. The previous body (SMEMBERS + parse + lex-sort +
1396 // filter + cap) is preserved verbatim inside
1397 // `ValkeyBackend::list_executions`. One deliberate behaviour
1398 // change: corrupt members now surface as
1399 // `EngineError::Validation { kind: Corruption, .. }` (→
1400 // `ServerError::Engine`), where the legacy path warn-logged
1401 // and skipped them. This matches RFC-012's fail-loud contract
1402 // for read-surface corruption.
1403 let partition = ff_core::partition::Partition {
1404 family: ff_core::partition::PartitionFamily::Execution,
1405 index: partition_id,
1406 };
1407 let partition_key = ff_core::partition::PartitionKey::from(&partition);
1408 Ok(self
1409 .backend
1410 .list_executions(partition_key, cursor, limit)
1411 .await?)
1412 }
1413
1414 /// Replay a terminal execution. RFC-017 Stage D2: delegates through
1415 /// the backend trait; the variadic-KEYS pre-read (HMGET + SMEMBERS
1416 /// for inbound edges on skipped flow members) now lives inside
1417 /// `ValkeyBackend::replay_execution`.
1418 pub async fn replay_execution(
1419 &self,
1420 execution_id: &ExecutionId,
1421 ) -> Result<ReplayExecutionResult, ServerError> {
1422 let args = ff_core::contracts::ReplayExecutionArgs {
1423 execution_id: execution_id.clone(),
1424 now: TimestampMs::now(),
1425 };
1426 Ok(self.backend.replay_execution(args).await?)
1427 }
1428
1429 /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
1430 /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
1431 /// when the producer finalizes.
1432 ///
1433 /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
1434 /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
1435 /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
1436 /// and the Lua-side reject).
1437 ///
1438 /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
1439 /// execution id, so all KEYS share the same slot.
1440 pub async fn read_attempt_stream(
1441 &self,
1442 execution_id: &ExecutionId,
1443 attempt_index: AttemptIndex,
1444 from_id: &str,
1445 to_id: &str,
1446 count_limit: u64,
1447 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
1448 if count_limit == 0 {
1449 return Err(ServerError::InvalidInput(
1450 "count_limit must be >= 1".to_owned(),
1451 ));
1452 }
1453 // RFC-017 Stage B row 10: delegate through the trait. The
1454 // backend owns the stream-op semaphore + XRANGE dispatch; the
1455 // 429-on-contention semantics round-trip as
1456 // `EngineError::ResourceExhausted → ServerError::Engine →
1457 // HTTP 429` (see `ServerError::from` below).
1458 let from = wire_str_to_stream_cursor(from_id);
1459 let to = wire_str_to_stream_cursor(to_id);
1460 Ok(self
1461 .backend
1462 .read_stream(execution_id, attempt_index, from, to, count_limit)
1463 .await?)
1464 }
1465
1466 /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
1467 /// plus the terminal signal so a polling consumer can exit when the
1468 /// producer closes the stream.
1469 ///
1470 /// `last_id` is exclusive — XREAD returns entries with id > last_id.
1471 /// Pass `"0-0"` to read from the beginning.
1472 ///
1473 /// `block_ms == 0` → non-blocking peek (returns immediately).
1474 /// `block_ms > 0` → blocks up to that many ms. Empty `frames` +
1475 /// `closed_at=None` → timeout, no new data, still open.
1476 ///
1477 /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
1478 ///
1479 /// Implemented as a direct XREAD command (not FCALL) because blocking
1480 /// commands are rejected inside Valkey Functions. The terminal
1481 /// markers come from a companion HMGET on `stream_meta` — see
1482 /// `ff_script::stream_tail` module docs.
1483 pub async fn tail_attempt_stream(
1484 &self,
1485 execution_id: &ExecutionId,
1486 attempt_index: AttemptIndex,
1487 last_id: &str,
1488 block_ms: u64,
1489 count_limit: u64,
1490 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
1491 if count_limit == 0 {
1492 return Err(ServerError::InvalidInput(
1493 "count_limit must be >= 1".to_owned(),
1494 ));
1495 }
1496 // RFC-017 Stage B row 10: delegate through the trait. The
1497 // backend owns the stream-op semaphore + XREAD BLOCK dispatch
1498 // (via `duplicate_connection()` per #204, so neither a shared
1499 // tail client nor a serialising mutex is needed). Saturation
1500 // round-trips as `EngineError::ResourceExhausted → HTTP 429`;
1501 // a post-shutdown arrival round-trips as
1502 // `EngineError::Unavailable → HTTP 503`.
1503 let after = wire_str_to_stream_cursor(last_id);
1504 Ok(self
1505 .backend
1506 .tail_stream(
1507 execution_id,
1508 attempt_index,
1509 after,
1510 block_ms,
1511 count_limit,
1512 ff_core::backend::TailVisibility::All,
1513 )
1514 .await?)
1515 }
1516
1517 /// Graceful shutdown — stops scanners, drains background handler tasks
1518 /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
1519 /// for scanners to finish.
1520 ///
1521 /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
1522 /// cleanly without new arrivals piling up:
1523 ///
1524 /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
1525 /// with `ServerError::OperationFailed("stream semaphore closed …")`
1526 /// which the REST layer surfaces as a 500 with `retryable=false`
1527 /// (ops tooling may choose to wait + retry on 503-class responses;
1528 /// the body clearly names the shutdown reason).
1529 /// 2. Drain handler-spawned background tasks with a 15s ceiling.
1530 /// 3. `engine.shutdown()` stops scanners.
1531 ///
1532 /// Existing in-flight tails finish on their natural `block_ms`
1533 /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
1534 /// is dropped after this function returns. We do NOT wait for tails
1535 /// to drain explicitly — the semaphore-close + natural-timeout
1536 /// combination bounds shutdown to roughly `block_ms + 15s` in the
1537 /// worst case. Callers observing a dropped connection retry against
1538 /// whatever replacement is coming up.
1539 pub async fn shutdown(self) {
1540 tracing::info!("shutting down FlowFabric server");
1541
1542 // Step 1: RFC-017 Stage B — delegate stream-op pool closure
1543 // + drain to the backend's `shutdown_prepare` hook. The
1544 // Valkey impl closes its semaphore (no new read/tail starts)
1545 // and awaits in-flight permits up to `grace`. A timeout here
1546 // is logged + counted on `ff_shutdown_timeout_total`; we
1547 // continue with best-effort drain of the server's own
1548 // background tasks rather than blocking shutdown behind a
1549 // single slow tail.
1550 let drain_timeout = Duration::from_secs(15);
1551 match self.backend.shutdown_prepare(drain_timeout).await {
1552 Ok(()) => tracing::info!(
1553 "backend shutdown_prepare complete (stream-op pool drained)"
1554 ),
1555 Err(ff_core::engine_error::EngineError::Timeout { elapsed, .. }) => {
1556 self.metrics.inc_shutdown_timeout();
1557 tracing::warn!(
1558 elapsed_ms = elapsed.as_millis() as u64,
1559 "shutdown_prepare exceeded grace; proceeding best-effort"
1560 );
1561 }
1562 Err(e) => {
1563 // Non-timeout errors don't block shutdown either, but
1564 // they're unexpected — log at warn so operators see
1565 // the signal without tripping an alert.
1566 tracing::warn!(
1567 err = %e,
1568 "shutdown_prepare returned error; proceeding best-effort"
1569 );
1570 }
1571 }
1572
1573 // Step 2: Drain handler-spawned background tasks with the same
1574 // ceiling as Engine::shutdown. If dispatch is still running at
1575 // the deadline, drop the JoinSet to abort remaining tasks.
1576 let background = self.background_tasks.clone();
1577 let drain = async move {
1578 let mut guard = background.lock().await;
1579 while guard.join_next().await.is_some() {}
1580 };
1581 match tokio::time::timeout(drain_timeout, drain).await {
1582 Ok(()) => {}
1583 Err(_) => {
1584 tracing::warn!(
1585 timeout_s = drain_timeout.as_secs(),
1586 "shutdown: background tasks did not finish in time, aborting"
1587 );
1588 self.background_tasks.lock().await.abort_all();
1589 }
1590 }
1591
1592 if let Some(engine) = self.engine {
1593 engine.shutdown().await;
1594 }
1595 tracing::info!("FlowFabric server shutdown complete");
1596 }
1597}
1598
1599/// RFC-017 Stage B: lift the wire string (`"-"`, `"+"`, or a concrete
1600/// entry id) used by the REST boundary into the typed
1601/// [`ff_core::contracts::StreamCursor`] the trait method expects.
1602/// Keeps the `read_attempt_stream` / `tail_attempt_stream`
1603/// public-function signatures byte-identical while dispatching
1604/// through the backend.
1605fn wire_str_to_stream_cursor(s: &str) -> ff_core::contracts::StreamCursor {
1606 match s {
1607 "-" => ff_core::contracts::StreamCursor::Start,
1608 "+" => ff_core::contracts::StreamCursor::End,
1609 other => ff_core::contracts::StreamCursor::At(other.to_owned()),
1610 }
1611}
1612
1613
1614/// Result of a waitpoint HMAC secret rotation across all execution partitions.
1615#[derive(Debug, Clone, serde::Serialize)]
1616pub struct RotateWaitpointSecretResult {
1617 /// Count of partitions that accepted the rotation.
1618 pub rotated: u16,
1619 /// Partition indices that failed — operator should investigate (Valkey
1620 /// outage, auth failure, cluster split). Rotation is idempotent, so a
1621 /// re-run after the underlying fault clears converges to the correct
1622 /// state.
1623 pub failed: Vec<u16>,
1624 /// New kid installed as current.
1625 pub new_kid: String,
1626}
1627
1628impl Server {
1629 /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
1630 /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
1631 /// as the new current kid. Idempotent: re-running with the same `new_kid`
1632 /// and `new_secret_hex` converges partitions to the same state.
1633 ///
1634 /// Returns a structured result so operators can see which partitions failed.
1635 /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
1636 pub async fn rotate_waitpoint_secret(
1637 &self,
1638 new_kid: &str,
1639 new_secret_hex: &str,
1640 ) -> Result<RotateWaitpointSecretResult, ServerError> {
1641 if new_kid.is_empty() || new_kid.contains(':') {
1642 return Err(ServerError::OperationFailed(
1643 "new_kid must be non-empty and must not contain ':'".into(),
1644 ));
1645 }
1646 if new_secret_hex.is_empty()
1647 || !new_secret_hex.len().is_multiple_of(2)
1648 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
1649 {
1650 return Err(ServerError::OperationFailed(
1651 "new_secret_hex must be a non-empty even-length hex string".into(),
1652 ));
1653 }
1654
1655 // Single-writer gate — admin semaphore + audit log stay on
1656 // Server per RFC-017 §4 row 11. The per-partition fan-out
1657 // moved inside `ValkeyBackend::rotate_waitpoint_hmac_secret_all`.
1658 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
1659 Ok(p) => p,
1660 Err(tokio::sync::TryAcquireError::NoPermits) => {
1661 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
1662 }
1663 Err(tokio::sync::TryAcquireError::Closed) => {
1664 return Err(ServerError::OperationFailed(
1665 "admin rotate semaphore closed (server shutting down)".into(),
1666 ));
1667 }
1668 };
1669
1670 let n = self.config.partition_config.num_flow_partitions;
1671 let grace_ms = self.config.waitpoint_hmac_grace_ms;
1672
1673 // RFC-017 Stage B row 11: delegate the per-partition fan-out
1674 // to the backend. The trait method returns one entry per
1675 // partition with an inner `Result` so partial success is
1676 // observable — matching the pre-migration Server body.
1677 let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
1678 new_kid.to_owned(),
1679 new_secret_hex.to_owned(),
1680 grace_ms,
1681 );
1682 let result = self
1683 .backend
1684 .rotate_waitpoint_hmac_secret_all(args)
1685 .await?;
1686
1687 let mut rotated = 0u16;
1688 let mut failed: Vec<u16> = Vec::new();
1689 for entry in &result.entries {
1690 match &entry.result {
1691 Ok(_) => {
1692 rotated += 1;
1693 tracing::debug!(
1694 partition = entry.partition,
1695 new_kid = %new_kid,
1696 "waitpoint_hmac_rotated"
1697 );
1698 }
1699 Err(e) => {
1700 tracing::error!(
1701 target: "audit",
1702 partition = entry.partition,
1703 err = %e,
1704 "waitpoint_hmac_rotation_failed"
1705 );
1706 failed.push(entry.partition);
1707 }
1708 }
1709 }
1710
1711 // Single aggregated audit event (RFC-017 row 11: audit emit
1712 // stays on Server).
1713 tracing::info!(
1714 target: "audit",
1715 new_kid = %new_kid,
1716 total_partitions = n,
1717 rotated,
1718 failed_count = failed.len(),
1719 "waitpoint_hmac_rotation_complete"
1720 );
1721
1722 Ok(RotateWaitpointSecretResult {
1723 rotated,
1724 failed,
1725 new_kid: new_kid.to_owned(),
1726 })
1727 }
1728}
1729
1730// ── FCALL result parsing ──
1731
1732
1733
1734
1735
1736// ── Flow FCALL result parsing ──
1737
1738
1739
1740
1741/// Extract a string from an FCALL result array at the given index.
1742/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
1743/// for transport-level variants. Business-logic variants keep their code as
1744/// `ServerError::Script(String)` so HTTP clients see a stable message.
1745///
1746/// Why this exists: before R2, the stream handlers did
1747/// `ScriptError → format!() → ServerError::Script(String)`, which erased
1748/// the ErrorKind and made `ServerError::is_retryable()` always return
1749/// false. Retry-capable clients (cairn-fabric) would not retry a legit
1750/// transient error like `IoError`.
1751#[allow(dead_code)] // retained for non-stream FCALL paths that still route via raw ScriptError; stream handlers moved to trait in RFC-017 Stage B
1752fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
1753 match e {
1754 ff_script::error::ScriptError::Valkey(valkey_err) => {
1755 crate::server::backend_context(valkey_err, "stream FCALL transport")
1756 }
1757 other => ServerError::Script(other.to_string()),
1758 }
1759}
1760
1761/// Acknowledge that a member cancel has committed. Delegates to
1762/// [`EngineBackend::ack_cancel_member`] (Valkey: `ff_ack_cancel_member`
1763/// FCALL on `{fp:N}` — SREM the execution from the flow's
1764/// `pending_cancels` set and, if empty, ZREM the flow from the
1765/// partition-level `cancel_backlog`). Best-effort — failures are
1766/// logged but not propagated, since the reconciler drains any
1767/// leftovers on its next pass.
1768async fn ack_cancel_member_via_backend(
1769 backend: &dyn EngineBackend,
1770 flow_id: &FlowId,
1771 execution_id: &ExecutionId,
1772 eid_str: &str,
1773) {
1774 if let Err(e) = backend.ack_cancel_member(flow_id, execution_id).await {
1775 tracing::warn!(
1776 flow_id = %flow_id,
1777 execution_id = %eid_str,
1778 error = %e,
1779 "ack_cancel_member failed; reconciler will drain on next pass"
1780 );
1781 }
1782}
1783
1784/// Engine-error variant: inspects an
1785/// `EngineError` returned from `self.backend.cancel_execution(...)`
1786/// and returns `true` when the member is already terminal. Matches the
1787/// Lua-code semantics of `execution_not_active` / `execution_not_found`
1788/// via the typed `State` / `Validation` classifications the backend
1789/// trait impl maps them to.
1790fn is_terminal_ack_engine_error(err: &EngineError) -> bool {
1791 match err {
1792 // Already terminal (Lua's `execution_not_active`) or missing
1793 // entirely (`execution_not_found`) — both treated as ack-worthy
1794 // so the cancel-backlog doesn't poison on a member already in
1795 // the intended terminal state.
1796 EngineError::State(kind) => matches!(
1797 kind,
1798 ff_core::engine_error::StateKind::Terminal
1799 ),
1800 EngineError::NotFound { .. } => true,
1801 EngineError::Contextual { source, .. } => is_terminal_ack_engine_error(source),
1802 _ => false,
1803 }
1804}
1805
1806
1807/// Single cancel attempt — pre-read + FCALL + parse. Factored out so the
1808/// retry loop in [`cancel_member_execution`] can invoke it cleanly.
1809
1810
1811#[cfg(test)]
1812mod tests {
1813 use super::*;
1814 use ferriskey::ErrorKind;
1815
1816 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1817 ferriskey::Error::from((kind, "synthetic"))
1818 }
1819
1820 #[test]
1821 fn is_retryable_backend_variant_uses_kind_table() {
1822 // Transport-bucketed: retryable.
1823 assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
1824 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
1825 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1826 // Cluster-bucketed (Moved / Ask / TryAgain / ClusterDown): retryable
1827 // after topology settles — the #88 BackendErrorKind classifier
1828 // treats these as transient cluster-churn, a semantic refinement
1829 // over the previous ff-script retry table.
1830 assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
1831 assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1832 assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
1833 assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
1834 // BusyLoading: retryable.
1835 assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
1836
1837 // Auth / Protocol / ScriptNotLoaded: terminal.
1838 assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
1839 assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1840 assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
1841 }
1842
1843 #[test]
1844 fn is_retryable_backend_context_uses_kind_table() {
1845 let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
1846 assert!(err.is_retryable());
1847
1848 let err =
1849 crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
1850 assert!(!err.is_retryable());
1851 }
1852
1853 #[test]
1854 fn is_retryable_library_load_delegates_to_inner_kind() {
1855 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1856 mk_fk_err(ErrorKind::IoError),
1857 ));
1858 assert!(err.is_retryable());
1859
1860 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1861 mk_fk_err(ErrorKind::AuthenticationFailed),
1862 ));
1863 assert!(!err.is_retryable());
1864
1865 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
1866 expected: "1".into(),
1867 got: "2".into(),
1868 });
1869 assert!(!err.is_retryable());
1870 }
1871
1872 #[test]
1873 fn is_retryable_business_logic_variants_are_false() {
1874 assert!(!ServerError::NotFound("x".into()).is_retryable());
1875 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
1876 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
1877 assert!(!ServerError::Script("x".into()).is_retryable());
1878 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
1879 }
1880
1881 #[test]
1882 fn backend_kind_delegates_through_library_load() {
1883 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1884 mk_fk_err(ErrorKind::ClusterDown),
1885 ));
1886 assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
1887
1888 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
1889 expected: "1".into(),
1890 got: "2".into(),
1891 });
1892 assert_eq!(err.backend_kind(), None);
1893 }
1894
1895
1896 #[test]
1897 fn valkey_version_too_low_is_not_retryable() {
1898 let err = ServerError::ValkeyVersionTooLow {
1899 detected: "7.0".into(),
1900 required: "7.2".into(),
1901 };
1902 assert!(!err.is_retryable());
1903 assert_eq!(err.backend_kind(), None);
1904 }
1905
1906 #[test]
1907 fn valkey_version_too_low_error_message_includes_both_versions() {
1908 let err = ServerError::ValkeyVersionTooLow {
1909 detected: "7.0".into(),
1910 required: "7.2".into(),
1911 };
1912 let msg = err.to_string();
1913 assert!(msg.contains("7.0"), "detected version in message: {msg}");
1914 assert!(msg.contains("7.2"), "required version in message: {msg}");
1915 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
1916 }
1917}