Skip to main content

ff_server/
server.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use ferriskey::ClientBuilder;
5use tokio::sync::Mutex as AsyncMutex;
6use tokio::task::JoinSet;
7use ff_core::engine_backend::EngineBackend;
8use ff_core::engine_error::EngineError;
9use ff_core::contracts::{
10    AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
11    CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
12    CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
13    CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
14    ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
15    DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
16    ListExecutionsPage, ReplayExecutionResult,
17    ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
18    RevokeLeaseResult,
19    StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::partition::{execution_partition, flow_partition, PartitionConfig};
22use ff_core::state::PublicState;
23use ff_core::types::*;
24use ff_engine::Engine;
25use ff_script::retry::is_retryable_kind;
26
27use crate::config::{BackendKind, ServerConfig};
28
29/// RFC-017 §9.0: backends that may boot as of this Stage. Postgres
30/// joins at Stage E (v0.8.0). Compiled into the binary by design —
31/// see RFC-017 §9.0 "Fleet-wide cutover requirement" for the
32/// rolling-upgrade implication.
33const BACKEND_STAGE_READY: &[&str] = &["valkey", "postgres"];
34
35/// Upper bound on `member_execution_ids` returned in the
36/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
37/// terminal state (idempotent retry). The first (non-idempotent) cancel call
38/// returns the full list; retries only need a sample.
39const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
40
41/// FlowFabric server — connects everything together.
42///
43/// Manages the Valkey connection, Lua library loading, background scanners,
44/// and provides a minimal API for Phase 1.
45pub struct Server {
46    /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
47    /// operators rotate ~monthly and can afford to serialize; concurrent
48    /// rotate requests are an attack or misbehaving script. Holding the
49    /// permit also guards against interleaved partial rotations on the
50    /// Server side of the per-partition locks, surfacing contention as
51    /// HTTP 429 instead of silently queueing and blowing past the 120s
52    /// HTTP timeout. See `rotate_waitpoint_secret` handler.
53    admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
54    /// Valkey engine (14 scanners). `None` on the Postgres boot path
55    /// — engine scanners are Valkey-only (RFC-017 Wave 8 Stage E1;
56    /// Postgres reconcilers run out-of-process via
57    /// `ff-backend-postgres::reconcilers` + a separate scanner
58    /// supervisor that Stage E2/E3 will wire).
59    engine: Option<Engine>,
60    config: ServerConfig,
61    // RFC-017 Wave 8 Stage E3 (§9.3 / F8+Q4): `Server::scheduler` field
62    // removed. The Valkey `ff_scheduler::Scheduler` is owned by
63    // `ValkeyBackend` (installed via `with_scheduler` during boot);
64    // the Postgres twin is constructed per-call inside
65    // `PostgresBackend::claim_for_worker`. Server-side dispatch of
66    // `claim_for_worker` now flows through `self.backend` only —
67    // trait cutover per §4 row 9.
68    /// Background tasks spawned by async handlers (e.g. cancel_flow member
69    /// dispatch). Drained on shutdown with a bounded timeout.
70    background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
71    /// PR-94: observability registry. Always present; the no-op shim
72    /// takes zero runtime cost when the `observability` feature is
73    /// off, and the real OTEL-backed registry is passed in via
74    /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
75    /// with [`Engine::start_with_metrics`] and
76    /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
77    /// sees everything the process produces.
78    metrics: Arc<ff_observability::Metrics>,
79    /// RFC-017 Stage A: backend trait object for the data-plane
80    /// migration. Dual-field posture — the existing `client` /
81    /// `tail_client` / `engine` / `scheduler` fields still serve the
82    /// unmigrated handlers during Stages A-D; migrated handlers
83    /// dispatch here. Stages B-D progressively retire the Client
84    /// fields per RFC-017 §9.
85    backend: Arc<dyn EngineBackend>,
86}
87
88/// Server error type.
89#[derive(Debug, thiserror::Error)]
90pub enum ServerError {
91    /// Backend transport error. Previously wrapped `ferriskey::Error`
92    /// directly (#88); now carries a backend-agnostic
93    /// [`ff_core::BackendError`] so consumers match on
94    /// [`ff_core::BackendErrorKind`] instead of ferriskey's native
95    /// taxonomy. The ferriskey → [`ff_core::BackendError`] mapping
96    /// lives in `ff_backend_valkey::backend_error_from_ferriskey`.
97    #[error("backend: {0}")]
98    Backend(#[from] ff_core::BackendError),
99    /// Backend error with additional context. Previously
100    /// `ValkeyContext { source: ferriskey::Error }` (#88).
101    #[error("backend ({context}): {source}")]
102    BackendContext {
103        #[source]
104        source: ff_core::BackendError,
105        context: String,
106    },
107    #[error("config: {0}")]
108    Config(#[from] crate::config::ConfigError),
109    #[error("library load: {0}")]
110    LibraryLoad(#[from] ff_script::loader::LoadError),
111    #[error("partition mismatch: {0}")]
112    PartitionMismatch(String),
113    #[error("not found: {0}")]
114    NotFound(String),
115    #[error("invalid input: {0}")]
116    InvalidInput(String),
117    #[error("operation failed: {0}")]
118    OperationFailed(String),
119    #[error("script: {0}")]
120    Script(String),
121    /// Server-wide concurrency limit reached on a labelled pool. Surfaces
122    /// as HTTP 429 at the REST boundary so load balancers and clients can
123    /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
124    /// etc.) identifies WHICH pool is exhausted so operators aren't
125    /// misled by a single "tail unavailable" string when the real fault
126    /// is rotation contention.
127    ///
128    /// Fields: (source_label, max_permits).
129    #[error("too many concurrent {0} calls (max: {1})")]
130    ConcurrencyLimitExceeded(&'static str, u32),
131    /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
132    /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
133    /// hash-tag routing; older versions do not implement the required
134    /// primitives. Operator action: upgrade Valkey.
135    #[error(
136        "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
137    )]
138    ValkeyVersionTooLow {
139        detected: String,
140        required: String,
141    },
142    /// RFC-017 §9.0 hard-gate: selected backend is not yet permitted
143    /// to boot in this `ff-server` binary. Operator action is to
144    /// either (a) select `FF_BACKEND=valkey`, or (b) upgrade to a
145    /// Stage E binary once v0.8.0 ships.
146    ///
147    /// `stage` names the current stage ("A"/"B"/"C"/"D") so operator
148    /// tooling can correlate the refusal with the migration plan.
149    #[error(
150        "backend not ready: {backend} (not in BACKEND_STAGE_READY; current stage {stage}). \
151         Set FF_BACKEND=valkey or FF_BACKEND=postgres."
152    )]
153    BackendNotReady {
154        backend: &'static str,
155        stage: &'static str,
156    },
157    /// RFC-017 Stage A: an `EngineBackend` trait method returned a
158    /// typed error that is not one of the specific business outcomes
159    /// existing `ServerError` variants model. Includes transport
160    /// faults, validation/corruption, and `Unavailable` for methods
161    /// the backend has not implemented yet.
162    ///
163    /// Stage B/C migrations may refine individual arms into richer
164    /// `ServerError` variants as more handlers route through the
165    /// trait; Stage A keeps this catch-all so the migration lands
166    /// additively. Boxed to keep `ServerError` small (clippy
167    /// `result_large_err` — `EngineError` is ~200 bytes).
168    #[error("engine: {0}")]
169    Engine(#[from] Box<ff_core::engine_error::EngineError>),
170}
171
172/// Lift a native `ferriskey::Error` into [`ServerError::Backend`] via
173/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
174/// `?`-propagation ergonomic at ferriskey call sites while the
175/// public variant stays backend-agnostic.
176impl From<ferriskey::Error> for ServerError {
177    fn from(err: ferriskey::Error) -> Self {
178        Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
179    }
180}
181
182/// Lift an unboxed `EngineError` into [`ServerError::Engine`]. The
183/// variant stores a `Box<EngineError>` to keep `ServerError` small
184/// (clippy `result_large_err`); this `From` restores `?`-propagation
185/// from trait-dispatched handler paths.
186impl From<ff_core::engine_error::EngineError> for ServerError {
187    fn from(err: ff_core::engine_error::EngineError) -> Self {
188        Self::Engine(Box::new(err))
189    }
190}
191
192/// Build a [`ServerError::BackendContext`] from a native
193/// `ferriskey::Error` and a call-site label (#88).
194pub(crate) fn backend_context(
195    err: ferriskey::Error,
196    context: impl Into<String>,
197) -> ServerError {
198    ServerError::BackendContext {
199        source: ff_backend_valkey::backend_error_from_ferriskey(&err),
200        context: context.into(),
201    }
202}
203
204impl ServerError {
205    /// Returns the classified [`ff_core::BackendErrorKind`] if this
206    /// error carries a backend transport fault. Covers direct
207    /// Backend variants and library-load failures.
208    ///
209    /// Renamed from `valkey_kind` in #88 — the previous return type
210    /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
211    /// consumer doing retry/error classification.
212    pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
213        match self {
214            Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
215            Self::LibraryLoad(e) => e
216                .valkey_kind()
217                .map(ff_backend_valkey::classify_ferriskey_kind),
218            // RFC-017 Stage A: Engine(EngineError) arm is intentionally
219            // lumped in with the rest (no backend_kind — variants like
220            // Validation / NotFound are business-logic, and Transport
221            // variants already surface under Backend upstream in most
222            // paths). Stage B/C will revisit when more handlers route
223            // through the trait.
224            _ => None,
225        }
226    }
227
228    /// Whether this error is safely retryable by a caller. For
229    /// backend transport variants, delegates to
230    /// [`ff_core::BackendErrorKind::is_retryable`]. Business-logic
231    /// rejections (NotFound, InvalidInput, OperationFailed, Script,
232    /// Config, PartitionMismatch) return false — those won't change
233    /// on retry.
234    pub fn is_retryable(&self) -> bool {
235        match self {
236            Self::Backend(be) | Self::BackendContext { source: be, .. } => {
237                be.kind().is_retryable()
238            }
239            Self::LibraryLoad(load_err) => load_err
240                .valkey_kind()
241                .map(is_retryable_kind)
242                .unwrap_or(false),
243            Self::Config(_)
244            | Self::PartitionMismatch(_)
245            | Self::NotFound(_)
246            | Self::InvalidInput(_)
247            | Self::OperationFailed(_)
248            | Self::Script(_) => false,
249            // Back off and retry — the bound is a server-side permit pool,
250            // so the retry will succeed once a permit frees up. Applies
251            // equally to stream ops, admin rotate, etc.
252            Self::ConcurrencyLimitExceeded(_, _) => true,
253            // Operator must upgrade Valkey; a retry at the caller won't help.
254            Self::ValkeyVersionTooLow { .. } => false,
255            // Operator must change FF_BACKEND; not retryable.
256            Self::BackendNotReady { .. } => false,
257            // EngineError's classification helpers handle transport
258            // retry semantics; mirror them so trait-dispatched handlers
259            // keep the same retry policy as the Client path.
260            Self::Engine(e) => matches!(
261                e.as_ref(),
262                EngineError::Transport { .. } | EngineError::Contextual { .. }
263            ),
264        }
265    }
266}
267
268impl Server {
269    /// Start the FlowFabric server.
270    ///
271    /// Boot sequence:
272    /// 1. Connect to Valkey
273    /// 2. Validate or create partition config key
274    /// 3. Load the FlowFabric Lua library
275    /// 4. Start engine (14 background scanners)
276    pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
277        Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
278    }
279
280    /// PR-94: boot the server with a shared observability registry.
281    ///
282    /// Scanner cycle + scheduler metrics record into this registry;
283    /// `main.rs` threads the same handle into the router so `/metrics`
284    /// exposes what the engine produces. The no-arg [`Server::start`]
285    /// forwards here with a fresh `Metrics::new()` — under the default
286    /// build that's the shim, under `observability` it's a real
287    /// registry not shared with any HTTP route (useful for tests
288    /// exercising the engine in isolation).
289    ///
290    /// **RFC-017 Stage A:** this path gates `config.backend` against
291    /// [`BACKEND_STAGE_READY`] (refuses `BackendKind::Postgres` at
292    /// boot per §9.0), dials the Valkey cluster through the legacy
293    /// path, then synthesises an `Arc<ValkeyBackend>` around the
294    /// dialed client and populates `Server.backend`. The dual-field
295    /// posture is explicit through Stage D; Stage E retires the
296    /// legacy Client fields. See [`Server::start_with_backend`] for
297    /// the test-injection entry point that takes a caller-supplied
298    /// backend.
299    pub async fn start_with_metrics(
300        config: ServerConfig,
301        metrics: Arc<ff_observability::Metrics>,
302    ) -> Result<Self, ServerError> {
303        // RFC-017 §9.0 hard-gate. At v0.8.0 (Stage E4) both `valkey` and
304        // `postgres` are ready; this check remains as defence-in-depth
305        // so future backend additions must explicitly opt into the list.
306        // The `FF_BACKEND_ACCEPT_UNREADY` / `FF_ENV=development` dev-mode
307        // override was retired at Stage E4 because it is no longer
308        // needed — both supported backends now boot without override.
309        let label = config.backend.as_str();
310        if !BACKEND_STAGE_READY.contains(&label) {
311            return Err(ServerError::BackendNotReady {
312                backend: match config.backend {
313                    BackendKind::Postgres => "postgres",
314                    BackendKind::Valkey => "valkey",
315                },
316                stage: "E4",
317            });
318        }
319
320        // RFC-017 Wave 8 Stage E1: Postgres dial branch. Stage E4 flips
321        // `BACKEND_STAGE_READY` to include "postgres"; until then the
322        // dev-mode override above is the only way in. On the Postgres
323        // path we skip the Valkey-specific engine/scanner + scheduler
324        // construction entirely (E3 wires the scheduler twin).
325        if matches!(config.backend, BackendKind::Postgres) {
326            return Self::start_postgres_branch(config, metrics).await;
327        }
328        // Step 1: Connect to Valkey via ClientBuilder
329        tracing::info!(
330            host = %config.valkey.host, port = config.valkey.port,
331            tls = config.valkey.tls, cluster = config.valkey.cluster,
332            "connecting to Valkey"
333        );
334        let mut builder = ClientBuilder::new()
335            .host(&config.valkey.host, config.valkey.port)
336            .connect_timeout(Duration::from_secs(10))
337            .request_timeout(Duration::from_millis(5000));
338        if config.valkey.tls {
339            builder = builder.tls();
340        }
341        if config.valkey.cluster {
342            builder = builder.cluster();
343        }
344        let client = builder
345            .build()
346            .await
347            .map_err(|e| crate::server::backend_context(e, "connect"))?;
348
349        // Verify connectivity
350        let pong: String = client
351            .cmd("PING")
352            .execute()
353            .await
354            .map_err(|e| crate::server::backend_context(e, "PING"))?;
355        if pong != "PONG" {
356            return Err(ServerError::OperationFailed(format!(
357                "unexpected PING response: {pong}"
358            )));
359        }
360        tracing::info!("Valkey connection established");
361
362        // RFC-017 Wave 8 Stage D (§4 row 12): the five Valkey-specific
363        // deployment-initialisation steps (version verify, partition
364        // config, HMAC install, FUNCTION LOAD, lanes seed) now live
365        // behind `ValkeyBackend::initialize_deployment`. Load-bearing
366        // ordering contract preserved in that method's doc-comment.
367        // The call is sequenced after the connection but before the
368        // engine + backend wiring below so the pre-relocation boot
369        // ordering is byte-for-byte identical.
370        let init_backend = ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
371            client.clone(),
372            config.partition_config,
373        );
374        init_backend
375            .initialize_deployment(
376                &config.waitpoint_hmac_secret,
377                &config.lanes,
378                config.valkey.skip_library_load,
379            )
380            .await
381            .map_err(|e| ServerError::Engine(Box::new(e)))?;
382
383        // Step 4: Start engine with scanners
384        // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
385        let engine_cfg = ff_engine::EngineConfig {
386            partition_config: config.partition_config,
387            lanes: config.lanes.clone(),
388            lease_expiry_interval: config.engine_config.lease_expiry_interval,
389            delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
390            index_reconciler_interval: config.engine_config.index_reconciler_interval,
391            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
392            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
393            pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
394            retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
395            budget_reset_interval: config.engine_config.budget_reset_interval,
396            budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
397            quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
398            unblock_interval: config.engine_config.unblock_interval,
399            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
400            flow_projector_interval: config.engine_config.flow_projector_interval,
401            execution_deadline_interval: config.engine_config.execution_deadline_interval,
402            cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
403            edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
404            edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
405            scanner_filter: config.engine_config.scanner_filter.clone(),
406        };
407        // Engine scanners keep running on the MAIN `client` mux — NOT on
408        // `tail_client`. Scanner cadence is foreground-latency-coupled by
409        // design (an incident that blocks all FCALLs should also visibly
410        // block scanners), and keeping scanners off the tail client means a
411        // long-poll tail can never starve lease-expiry, retention-trim,
412        // etc. Do not change this without revisiting RFC-006 Impl Notes.
413        // Build the Valkey completion backend (issue #90) and subscribe.
414        // This replaces the pre-#90 `CompletionListenerConfig` path:
415        // the wire subscription now lives in ff-backend-valkey, the
416        // engine just consumes the resulting stream.
417        let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
418            config.valkey.host.clone(),
419            config.valkey.port,
420        );
421        valkey_conn.tls = config.valkey.tls;
422        valkey_conn.cluster = config.valkey.cluster;
423        let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
424            client.clone(),
425            config.partition_config,
426            valkey_conn,
427        );
428        let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
429            .await
430            .map_err(|e| ServerError::OperationFailed(format!(
431                "subscribe_completions: {e}"
432            )))?;
433
434        let engine = Engine::start_with_completions(
435            engine_cfg,
436            client.clone(),
437            metrics.clone(),
438            completion_stream,
439        );
440
441        // RFC-017 Stage B: the dedicated `tail_client`, the
442        // `stream_semaphore`, and the `xread_block_lock` moved into
443        // `ValkeyBackend` (§6). After issue #204's switch to per-call
444        // `duplicate_connection()` inside `tail_stream_impl`, the
445        // dedicated tail mux + serialising mutex are no longer
446        // required — each `XREAD BLOCK` gets its own socket — so
447        // the encapsulated impl only needs the bounded semaphore to
448        // preserve the existing 429-on-contention contract at the
449        // REST boundary.
450        tracing::info!(
451            max_concurrent_stream_ops = config.max_concurrent_stream_ops,
452            "stream-op semaphore lives inside ValkeyBackend (RFC-017 Stage B)"
453        );
454
455        // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
456        // rotation, etc.) are only protected by the global Bearer
457        // middleware in api.rs — which is only installed when
458        // config.api_token is set. Without FF_API_TOKEN, a public
459        // deployment exposes secret rotation to anyone that can reach
460        // the listen_addr. Warn loudly so operators can't miss it; we
461        // don't fail-start because single-tenant dev uses this path.
462        if config.api_token.is_none() {
463            tracing::warn!(
464                listen_addr = %config.listen_addr,
465                "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
466                 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
467                 FF_API_TOKEN for any deployment reachable from untrusted \
468                 networks."
469            );
470            // Explicit callout for the credential-bearing read endpoints.
471            // Auditors grep for these on a per-endpoint basis; lumping
472            // into the admin warning alone hides the fact that
473            // /pending-waitpoints returns HMAC tokens and /result
474            // returns completion payload bytes.
475            tracing::warn!(
476                listen_addr = %config.listen_addr,
477                "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
478                 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
479                 and GET /v1/executions/{{id}}/result returns raw completion payload \
480                 bytes (may contain PII). Both are UNAUTHENTICATED in this \
481                 configuration."
482            );
483        }
484
485        // Partition counts — post-RFC-011 there are three physical families.
486        // Execution keys co-locate with their parent flow's partition (§2 of
487        // the RFC), so `num_flow_partitions` governs both exec and flow
488        // routing; no separate `num_execution_partitions` count exists.
489        tracing::info!(
490            flow_partitions = config.partition_config.num_flow_partitions,
491            budget_partitions = config.partition_config.num_budget_partitions,
492            quota_partitions = config.partition_config.num_quota_partitions,
493            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
494            listen_addr = %config.listen_addr,
495            "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
496            config.partition_config.num_flow_partitions,
497            config.partition_config.num_budget_partitions,
498            config.partition_config.num_quota_partitions,
499        );
500
501        let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
502            client.clone(),
503            config.partition_config,
504            metrics.clone(),
505        ));
506
507        // RFC-017 Stage A: synthesise an `Arc<ValkeyBackend>` around the
508        // already-dialed client. Zero additional round-trips; the
509        // backend shares the same ferriskey connection as the legacy
510        // path so migrated + legacy handlers observe identical state.
511        // Stage B relocates `tail_client` / `stream_semaphore` into
512        // the backend; Stage E retires the Client fields entirely.
513        let mut valkey_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
514            client.clone(),
515            config.partition_config,
516            {
517                let mut c = ff_core::backend::ValkeyConnection::new(
518                    config.valkey.host.clone(),
519                    config.valkey.port,
520                );
521                c.tls = config.valkey.tls;
522                c.cluster = config.valkey.cluster;
523                c
524            },
525        );
526        // RFC-017 Stage B: size the backend's stream-op semaphore
527        // before handing out the `Arc`. `get_mut` succeeds here
528        // because `valkey_backend` is the sole `Arc` owner at this
529        // point.
530        if !valkey_backend.with_stream_semaphore_permits(config.max_concurrent_stream_ops) {
531            return Err(ServerError::OperationFailed(
532                "ValkeyBackend stream semaphore sizing failed (unexpected Arc sharing)".into(),
533            ));
534        }
535        // RFC-017 Stage C: install the scheduler handle so the
536        // backend's `claim_for_worker` trait impl dispatches through
537        // it. Stage E3 removed the redundant `Server::scheduler`
538        // field — the backend is the sole owner of the scheduler
539        // after this install.
540        if !valkey_backend.with_scheduler(scheduler) {
541            return Err(ServerError::OperationFailed(
542                "ValkeyBackend scheduler wiring failed (unexpected Arc sharing)".into(),
543            ));
544        }
545        let backend: Arc<dyn EngineBackend> = valkey_backend as Arc<dyn EngineBackend>;
546
547        Ok(Self {
548            // Single-permit semaphore: only one rotate-waitpoint-secret can
549            // be mid-flight server-wide. Attackers or misbehaving scripts
550            // firing parallel rotations fail fast with 429 instead of
551            // queueing HTTP handlers.
552            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
553            engine: Some(engine),
554            config,
555            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
556            metrics,
557            backend,
558        })
559    }
560
561    /// RFC-017 Wave 8 Stage E1/E2: Postgres boot branch for
562    /// [`Server::start_with_metrics`]. Called after the §9.0 hard-gate
563    /// admitted the boot under the dev-mode override.
564    ///
565    /// **Scope (Stage E2):**
566    /// * Dial Postgres + wrap an `Arc<PostgresBackend>` as the
567    ///   `backend` field (all migrated HTTP handlers dispatch through
568    ///   the trait — create_execution / create_flow / add member /
569    ///   stage edge / apply dep / describe_flow / cancel_flow / etc.).
570    /// * **No Valkey dial.** The E1 residual ambient-Valkey client is
571    ///   retired in E2 — `Server::client` is gone, `cancel_flow_header`
572    ///   / `ack_cancel_member` / `read_execution_info` /
573    ///   `read_execution_state` / `fetch_waitpoint_token_v07` all flow
574    ///   through the trait. Legacy reads that Postgres does not yet
575    ///   implement surface as `EngineError::Unavailable` (Wave 9).
576    /// * Skip engine + scheduler. Stage E3 wires the Postgres-specific
577    ///   `claim_for_worker` via the trait; until then the scheduler
578    ///   field stays `None`.
579    /// * Run `apply_migrations` against the Postgres pool so an empty
580    ///   database becomes usable without operator out-of-band steps.
581    async fn start_postgres_branch(
582        config: ServerConfig,
583        metrics: Arc<ff_observability::Metrics>,
584    ) -> Result<Self, ServerError> {
585        if config.postgres.url.is_empty() {
586            return Err(ServerError::InvalidInput(
587                "FF_BACKEND=postgres requires FF_POSTGRES_URL".into(),
588            ));
589        }
590        tracing::info!(
591            pool_size = config.postgres.pool_size,
592            "dialing Postgres backend (RFC-017 Stage E3)"
593        );
594        let mut pg_backend_arc = ff_backend_postgres::PostgresBackend::connect_with_metrics(
595            config.postgres_config(),
596            config.partition_config,
597            metrics.clone(),
598        )
599        .await
600        .map_err(|e| ServerError::Engine(Box::new(e)))?;
601
602        // Apply schema migrations idempotently so an empty target
603        // database becomes usable. The underlying pool is shared with
604        // the backend — one pool, one migration run.
605        ff_backend_postgres::apply_migrations(pg_backend_arc.pool())
606            .await
607            .map_err(|e| {
608                ServerError::OperationFailed(format!("postgres apply_migrations: {e}"))
609            })?;
610
611        // RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers
612        // (attempt_timeout, lease_expiry, suspension_timeout,
613        // dependency, edge_cancel_dispatcher, edge_cancel_reconciler)
614        // as background tick loops owned by the backend. Drained on
615        // `Server::shutdown` via `backend.shutdown_prepare(grace)`.
616        let scanner_cfg = ff_backend_postgres::PostgresScannerConfig {
617            attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
618            lease_expiry_interval: config.engine_config.lease_expiry_interval,
619            suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
620            dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
621            edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
622            edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
623            budget_reset_interval: config.engine_config.budget_reset_interval,
624            dependency_stale_threshold_ms:
625                ff_backend_postgres::PostgresScannerConfig::DEFAULT_DEP_STALE_MS,
626            scanner_filter: config.engine_config.scanner_filter.clone(),
627            partition_config: config.partition_config,
628        };
629        if !pg_backend_arc.with_scanners(scanner_cfg) {
630            return Err(ServerError::OperationFailed(
631                "PostgresBackend scanner install failed (unexpected Arc sharing)".into(),
632            ));
633        }
634
635        let backend: Arc<dyn EngineBackend> = pg_backend_arc;
636
637        tracing::info!(
638            flow_partitions = config.partition_config.num_flow_partitions,
639            lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
640            "FlowFabric server started (Postgres backend, Stage E3). \
641             6 Postgres reconcilers active; claim_for_worker routed to \
642             PostgresScheduler. No ambient Valkey client."
643        );
644
645        Ok(Self {
646            admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
647            engine: None,
648            config,
649            background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
650            metrics,
651            backend,
652        })
653    }
654
655    /// RFC-017 Stage A: test-injection + future-embedded-user entry
656    /// point. Takes a caller-constructed `Arc<dyn EngineBackend>` +
657    /// the Valkey connection/engine scaffolding
658    /// [`Server::start_with_metrics`] normally dials for itself.
659    ///
660    /// **Stage A scope:** Stage A is still dual-field — the legacy
661    /// `client` / `tail_client` / `engine` / `scheduler` fields are
662    /// constructed here exactly as in the main boot path, because
663    /// unmigrated handlers still need them. The caller-supplied
664    /// `backend` populates the new trait-object field and services
665    /// the handlers migrated in this stage (see RFC-017 §4
666    /// migration table).
667    ///
668    /// **Stage D evolution:** once the boot path relocates into each
669    /// backend's `connect_with_metrics` (RFC-017 §9 Stage D), this
670    /// entry point becomes the sole constructor — `Server::start` and
671    /// `Server::start_with_metrics` are thin shims that build the
672    /// backend first, then forward here.
673    ///
674    /// Today (Stage A) this path is exercised by `MockBackend` in
675    /// `tests/parity_stage_a.rs`; it does NOT replace the Valkey
676    /// dial under the main binary.
677    pub async fn start_with_backend(
678        config: ServerConfig,
679        backend: Arc<dyn EngineBackend>,
680        metrics: Arc<ff_observability::Metrics>,
681    ) -> Result<Self, ServerError> {
682        // Stage A: forward through the legacy dial so unmigrated
683        // handlers keep working, then overwrite `backend` with the
684        // caller-supplied handle. Stage D rewires this so the
685        // caller's backend drives the whole boot.
686        let mut server = Self::start_with_metrics(config, metrics).await?;
687        server.backend = backend;
688        Ok(server)
689    }
690
691    /// RFC-017 Stage A: access the backend trait-object driving
692    /// migrated handlers. Stable surface for tests that need to
693    /// inspect the backend directly (e.g. `backend_label()`
694    /// assertions). The Server will dispatch more handlers through
695    /// this handle as Stages B-D land.
696    pub fn backend(&self) -> &Arc<dyn EngineBackend> {
697        &self.backend
698    }
699
700    /// PR-94: access the shared observability registry.
701    pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
702        &self.metrics
703    }
704
705    // RFC-017 Stage E2 (§9 Stage D bullet completed): `Server::client()`
706    // accessor + underlying `Client` field both removed. External
707    // callers route ping / healthz through the backend trait
708    // (`self.backend.ping()` → `ValkeyBackend::ping`). The
709    // `ff_cancel_flow` header FCALL, its `flow_already_terminal`
710    // HMGET/SMEMBERS fallback, the per-member `ff_ack_cancel_member`
711    // backlog drain, and the `get_execution*` / `fetch_waitpoint_token_v07`
712    // reads are all reachable through the Stage E2 trait additions
713    // (`cancel_flow_header`, `ack_cancel_member`, `read_execution_info`,
714    // `read_execution_state`, `fetch_waitpoint_token_v07`).
715
716    /// Get the server config.
717    pub fn config(&self) -> &ServerConfig {
718        &self.config
719    }
720
721    /// Get the partition config.
722    pub fn partition_config(&self) -> &PartitionConfig {
723        &self.config.partition_config
724    }
725
726    // ── Minimal Phase 1 API ──
727
728    /// Create a new execution. RFC-017 Stage D2: delegates through the
729    /// backend trait. The KEYS/ARGV build + FCALL dispatch + result parse
730    /// live verbatim in `ValkeyBackend::create_execution`.
731    pub async fn create_execution(
732        &self,
733        args: &CreateExecutionArgs,
734    ) -> Result<CreateExecutionResult, ServerError> {
735        Ok(self.backend.create_execution(args.clone()).await?)
736    }
737
738    /// Cancel an execution. RFC-017 Stage D2: delegates through the
739    /// backend trait.
740    pub async fn cancel_execution(
741        &self,
742        args: &CancelExecutionArgs,
743    ) -> Result<CancelExecutionResult, ServerError> {
744        Ok(self.backend.cancel_execution(args.clone()).await?)
745    }
746
747    /// Get the public state of an execution.
748    ///
749    /// Reads `public_state` from the exec_core hash. Returns the parsed
750    /// PublicState enum. If the execution is not found, returns an error.
751    pub async fn get_execution_state(
752        &self,
753        execution_id: &ExecutionId,
754    ) -> Result<PublicState, ServerError> {
755        // RFC-017 Stage E2: routed through the backend trait.
756        match self.backend.read_execution_state(execution_id).await? {
757            Some(s) => Ok(s),
758            None => Err(ServerError::NotFound(format!(
759                "execution not found: {execution_id}"
760            ))),
761        }
762    }
763
764    /// Read the raw result payload written by `ff_complete_execution`.
765    ///
766    /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
767    /// No FCALL — this is a direct GET; returns `Ok(None)` when the
768    /// execution is missing, not yet complete, or (in a future
769    /// retention-policy world) when the result was trimmed.
770    ///
771    /// # Contract vs `get_execution_state`
772    ///
773    /// `get_execution_state` is the authoritative completion signal. If
774    /// a caller observes `state == completed` but `get_execution_result`
775    /// returns `None`, the result bytes are unavailable — not a caller
776    /// bug and not a server bug, just the retention policy trimming the
777    /// blob. V1 sets no retention, so callers on v1 can treat
778    /// `state == completed` + `Ok(None)` as a server bug.
779    ///
780    /// # Ordering
781    ///
782    /// Callers MUST wait for `state == completed` before calling this
783    /// method; polls issued before the state transition may hit a
784    /// narrow window where the completion Lua has written
785    /// `public_state = completed` but the `result` key SET is still
786    /// on-wire. The current Lua `ff_complete_execution` writes both in
787    /// the same atomic script, so the window is effectively zero for
788    /// direct callers — but retries via `ff_replay_execution` open it
789    /// briefly.
790    pub async fn get_execution_result(
791        &self,
792        execution_id: &ExecutionId,
793    ) -> Result<Option<Vec<u8>>, ServerError> {
794        // RFC-017 Stage E2: routed through the backend trait. The
795        // Valkey impl preserves binary-safe semantics via ferriskey's
796        // `Vec<u8>` FromValue; Postgres returns Unavailable until the
797        // result-store migration lands.
798        Ok(self.backend.get_execution_result(execution_id).await?)
799    }
800
801
802    // ── Budget / Quota API ──
803
804    /// Create a new budget policy.
805    /// Create a new budget policy. RFC-017 Stage D2: delegates through
806    /// the backend trait.
807    pub async fn create_budget(
808        &self,
809        args: &CreateBudgetArgs,
810    ) -> Result<CreateBudgetResult, ServerError> {
811        Ok(self.backend.create_budget(args.clone()).await?)
812    }
813
814    /// Create a new quota/rate-limit policy. RFC-017 Stage D2: delegates
815    /// through the backend trait.
816    pub async fn create_quota_policy(
817        &self,
818        args: &CreateQuotaPolicyArgs,
819    ) -> Result<CreateQuotaPolicyResult, ServerError> {
820        Ok(self.backend.create_quota_policy(args.clone()).await?)
821    }
822
823    /// Read-only budget status for operator visibility. RFC-017 Stage
824    /// D2: delegates through the backend trait.
825    pub async fn get_budget_status(
826        &self,
827        budget_id: &BudgetId,
828    ) -> Result<BudgetStatus, ServerError> {
829        Ok(self.backend.get_budget_status(budget_id).await?)
830    }
831
832    /// Report usage against a budget and check limits. RFC-017 Stage D2:
833    /// delegates through the backend trait's admin variant
834    /// (`report_usage_admin` — no worker handle required on the admin
835    /// path).
836    pub async fn report_usage(
837        &self,
838        budget_id: &BudgetId,
839        args: &ReportUsageArgs,
840    ) -> Result<ReportUsageResult, ServerError> {
841        let mut admin_args = ff_core::contracts::ReportUsageAdminArgs::new(
842            args.dimensions.clone(),
843            args.deltas.clone(),
844            args.now,
845        );
846        if let Some(key) = args.dedup_key.as_ref() {
847            admin_args = admin_args.with_dedup_key(key.clone());
848        }
849        Ok(self.backend.report_usage_admin(budget_id, admin_args).await?)
850    }
851
852    /// Reset a budget's usage counters and schedule the next reset.
853    /// RFC-017 Stage D2: delegates through the backend trait.
854    pub async fn reset_budget(
855        &self,
856        budget_id: &BudgetId,
857    ) -> Result<ResetBudgetResult, ServerError> {
858        let args = ff_core::contracts::ResetBudgetArgs {
859            budget_id: budget_id.clone(),
860            now: TimestampMs::now(),
861        };
862        Ok(self.backend.reset_budget(args).await?)
863    }
864
865    // ── Flow API ──
866
867    /// Create a new flow container. RFC-017 Stage D2: delegates through
868    /// the backend trait.
869    pub async fn create_flow(
870        &self,
871        args: &CreateFlowArgs,
872    ) -> Result<CreateFlowResult, ServerError> {
873        Ok(self.backend.create_flow(args.clone()).await?)
874    }
875
876    /// Add an execution to a flow.
877    ///
878    /// # Atomic single-FCALL commit (RFC-011 §7.3)
879    ///
880    /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
881    /// hash-tag routing (both hash to `{fp:N}` via the exec id's
882    /// embedded partition). A single atomic FCALL writes:
883    ///
884    ///   - `members_set` SADD (flow membership)
885    ///   - `exec_core.flow_id` HSET (back-pointer)
886    ///   - `flow_index` SADD (self-heal)
887    ///   - `flow_core` HINCRBY node_count / graph_revision +
888    ///     HSET last_mutation_at
889    ///
890    /// All four writes commit atomically or none do (Valkey scripting
891    /// contract: validates-before-writing in the Lua body means
892    /// `flow_not_found` / `flow_already_terminal` early-returns fire
893    /// BEFORE any `redis.call()` mutation, and a mid-body error after
894    /// writes is not expected because all writes are on the same slot).
895    ///
896    /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
897    ///
898    /// # Consumer contract
899    ///
900    /// The caller's `args.execution_id` **must** be co-located with
901    /// `args.flow_id`'s partition — i.e. minted via
902    /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
903    /// `solo`-minted id (or any exec id hashing to a different
904    /// `{fp:N}` than the flow's) will fail at the Valkey level with
905    /// `CROSSSLOT` on a clustered deploy.
906    ///
907    /// Callers with a flow context in scope always use `for_flow`;
908    /// this is the only supported mint path for flow-member execs
909    /// post-RFC-011. Test fixtures that pre-date the co-location
910    /// contract use `TestCluster::new_execution_id_on_partition` to
911    /// pin to a specific hash-tag index for `fcall_create_flow`-style
912    /// helpers that hard-code their flow partition.
913    pub async fn add_execution_to_flow(
914        &self,
915        args: &AddExecutionToFlowArgs,
916    ) -> Result<AddExecutionToFlowResult, ServerError> {
917        // Preserve the typed `ServerError::PartitionMismatch` pre-flight
918        // check — the backend trait's implementation returns an
919        // `EngineError` on CROSSSLOT, which would surface as
920        // `ServerError::Engine(_)` and hide the consumer-contract
921        // violation. Keep the explicit check at the facade boundary.
922        let flow_part = flow_partition(&args.flow_id, &self.config.partition_config);
923        let exec_part = execution_partition(&args.execution_id, &self.config.partition_config);
924        if exec_part.index != flow_part.index {
925            return Err(ServerError::PartitionMismatch(format!(
926                "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
927                 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
928                 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
929                exec_p = exec_part.index,
930                flow_p = flow_part.index,
931            )));
932        }
933        Ok(self.backend.add_execution_to_flow(args.clone()).await?)
934    }
935
936    /// Cancel a flow.
937    ///
938    /// Flips `public_flow_state` to `cancelled` atomically via
939    /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
940    /// executions must be cancelled cross-partition; this dispatch runs in
941    /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
942    /// immediately. For all other policies (or flows with no members), or
943    /// when the flow was already in a terminal state (idempotent retry),
944    /// the call returns [`CancelFlowResult::Cancelled`].
945    ///
946    /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
947    ///
948    /// # Backpressure
949    ///
950    /// Each call that hits the async dispatch path spawns a new task into
951    /// the shared background `JoinSet`. Rapid repeated calls against the
952    /// same flow will spawn *multiple* overlapping dispatch tasks. This is
953    /// not a correctness issue — each member cancel is idempotent and
954    /// terminal flows short-circuit via [`ff_core::contracts::CancelFlowHeader::AlreadyTerminal`]
955    /// — but heavy burst callers should either use `?wait=true` (serialises
956    /// the dispatch on the HTTP thread, giving natural backpressure) or
957    /// implement client-side deduplication on `flow_id`. The `JoinSet` is
958    /// drained with a 15s timeout on [`Self::shutdown`], so very long
959    /// dispatch tails may be aborted during graceful shutdown.
960    ///
961    /// # Orphan-member semantics on shutdown abort
962    ///
963    /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
964    /// while a dispatch loop is mid-iteration, the already-issued
965    /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
966    /// `terminal_outcome = cancelled` and the caller-supplied reason. The
967    /// members not yet visited are abandoned mid-loop. They remain in
968    /// whichever state they were in (active/eligible/suspended) until the
969    /// natural lifecycle scanners reach them: active leases expire
970    /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
971    /// members time out to `skipped`, eligible ones sit until retention
972    /// trim. So no orphan state — but the terminal_outcome for the
973    /// abandoned members will be `expired`/`skipped` rather than
974    /// `cancelled`, and the operator-supplied `reason` is lost for them.
975    /// Audit tooling that requires reason fidelity across shutdowns should
976    /// use `?wait=true`.
977    pub async fn cancel_flow(
978        &self,
979        args: &CancelFlowArgs,
980    ) -> Result<CancelFlowResult, ServerError> {
981        self.cancel_flow_inner(args, false).await
982    }
983
984    /// Cancel a flow and wait for all member cancellations to complete
985    /// inline. Slower than [`Self::cancel_flow`] for large flows, but
986    /// guarantees every member is in a terminal state on return.
987    pub async fn cancel_flow_wait(
988        &self,
989        args: &CancelFlowArgs,
990    ) -> Result<CancelFlowResult, ServerError> {
991        self.cancel_flow_inner(args, true).await
992    }
993
994    async fn cancel_flow_inner(
995        &self,
996        args: &CancelFlowArgs,
997        wait: bool,
998    ) -> Result<CancelFlowResult, ServerError> {
999        // RFC-017 Stage E2: the header FCALL + AlreadyTerminal fetch
1000        // now dispatch through the backend trait. The Server no longer
1001        // owns a ferriskey `Client`; `self.backend.cancel_flow_header`
1002        // encapsulates the Valkey-specific FCALL + reload-on-failover
1003        // + HMGET/SMEMBERS-for-AlreadyTerminal work previously inlined
1004        // here.
1005        let header = self.backend.cancel_flow_header(args.clone()).await?;
1006
1007        let (policy, members) = match header {
1008            ff_core::contracts::CancelFlowHeader::Cancelled {
1009                cancellation_policy,
1010                member_execution_ids,
1011            } => (cancellation_policy, member_execution_ids),
1012            // Idempotent retry: flow was already cancelled/completed/failed.
1013            // Return Cancelled with the *stored* policy + (capped) member
1014            // list so observability tooling gets the real historical state
1015            // rather than echoing the caller's retry intent. The backend
1016            // has already done the HMGET + SMEMBERS; the Server just caps
1017            // the member list to bound wire bandwidth.
1018            ff_core::contracts::CancelFlowHeader::AlreadyTerminal {
1019                stored_cancellation_policy,
1020                stored_cancel_reason,
1021                member_execution_ids,
1022            } => {
1023                let total_members = member_execution_ids.len();
1024                let stored_members: Vec<String> = member_execution_ids
1025                    .into_iter()
1026                    .take(ALREADY_TERMINAL_MEMBER_CAP)
1027                    .collect();
1028                tracing::debug!(
1029                    flow_id = %args.flow_id,
1030                    stored_policy = stored_cancellation_policy.as_deref().unwrap_or(""),
1031                    stored_reason = stored_cancel_reason.as_deref().unwrap_or(""),
1032                    total_members,
1033                    returned_members = stored_members.len(),
1034                    "cancel_flow: flow already terminal, returning idempotent Cancelled"
1035                );
1036                return Ok(CancelFlowResult::Cancelled {
1037                    cancellation_policy: stored_cancellation_policy
1038                        .unwrap_or_else(|| args.cancellation_policy.clone()),
1039                    member_execution_ids: stored_members,
1040                });
1041            }
1042            // `CancelFlowHeader` is `#[non_exhaustive]`. Any future
1043            // variant must be reviewed at this match site before it
1044            // reaches the wire; fall closed with a typed server error.
1045            other => {
1046                return Err(ServerError::OperationFailed(format!(
1047                    "cancel_flow_header: unknown CancelFlowHeader variant: {other:?}"
1048                )));
1049            }
1050        };
1051        let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1052
1053        if !needs_dispatch {
1054            return Ok(CancelFlowResult::Cancelled {
1055                cancellation_policy: policy,
1056                member_execution_ids: members,
1057            });
1058        }
1059
1060        if wait {
1061            // Synchronous dispatch — cancel every member inline before returning.
1062            // Collect per-member failures so the caller sees a
1063            // PartiallyCancelled outcome instead of a false-positive
1064            // Cancelled when any member cancel faulted. The
1065            // cancel-backlog reconciler still retries the unacked
1066            // members; surfacing the partial state lets operator
1067            // tooling alert without polling per-member state.
1068            // RFC-017 Stage E2: ack drain dispatches via the backend
1069            // trait's `ack_cancel_member` — the Server no longer owns
1070            // a raw ferriskey `Client`.
1071            let mut failed: Vec<String> = Vec::new();
1072            for eid_str in &members {
1073                let Ok(eid) = ExecutionId::parse(eid_str) else {
1074                    failed.push(eid_str.clone());
1075                    continue;
1076                };
1077                let cancel_args = ff_core::contracts::CancelExecutionArgs {
1078                    execution_id: eid.clone(),
1079                    reason: args.reason.clone(),
1080                    source: ff_core::types::CancelSource::OperatorOverride,
1081                    lease_id: None,
1082                    lease_epoch: None,
1083                    attempt_id: None,
1084                    now: args.now,
1085                };
1086                match self.backend.cancel_execution(cancel_args).await {
1087                    Ok(_) => {
1088                        ack_cancel_member_via_backend(
1089                            self.backend.as_ref(),
1090                            &args.flow_id,
1091                            &eid,
1092                            eid_str,
1093                        )
1094                        .await;
1095                    }
1096                    Err(e) => {
1097                        if is_terminal_ack_engine_error(&e) {
1098                            ack_cancel_member_via_backend(
1099                                self.backend.as_ref(),
1100                                &args.flow_id,
1101                                &eid,
1102                                eid_str,
1103                            )
1104                            .await;
1105                            continue;
1106                        }
1107                        tracing::warn!(
1108                            execution_id = %eid_str,
1109                            error = %e,
1110                            "cancel_flow(wait): individual execution cancel failed \
1111                             (transport/contract fault; reconciler will retry if transient)"
1112                        );
1113                        failed.push(eid_str.clone());
1114                    }
1115                }
1116            }
1117            if failed.is_empty() {
1118                return Ok(CancelFlowResult::Cancelled {
1119                    cancellation_policy: policy,
1120                    member_execution_ids: members,
1121                });
1122            }
1123            return Ok(CancelFlowResult::PartiallyCancelled {
1124                cancellation_policy: policy,
1125                member_execution_ids: members,
1126                failed_member_execution_ids: failed,
1127            });
1128        }
1129
1130        // Asynchronous dispatch — spawn into the shared JoinSet so
1131        // Server::shutdown can wait for pending cancellations (bounded
1132        // by a shutdown timeout). RFC-017 Stage E2: both the
1133        // per-member cancel and the backlog ack dispatch through the
1134        // backend trait (the Server no longer holds a ferriskey handle).
1135        let backend = self.backend.clone();
1136        let reason = args.reason.clone();
1137        let now = args.now;
1138        let dispatch_members = members.clone();
1139        let flow_id = args.flow_id.clone();
1140        // Every async cancel_flow contends on this lock, but the
1141        // critical section is tiny: try_join_next drain + spawn.
1142        let mut guard = self.background_tasks.lock().await;
1143
1144        // Reap completed background dispatches before spawning the next.
1145        while let Some(joined) = guard.try_join_next() {
1146            if let Err(e) = joined {
1147                tracing::warn!(
1148                    error = %e,
1149                    "cancel_flow: background dispatch task panicked or was aborted"
1150                );
1151            }
1152        }
1153
1154        guard.spawn(async move {
1155            // Bounded parallel dispatch via futures::stream::buffer_unordered.
1156            use futures::stream::StreamExt;
1157            const CONCURRENCY: usize = 16;
1158
1159            let member_count = dispatch_members.len();
1160            let flow_id_for_log = flow_id.clone();
1161            futures::stream::iter(dispatch_members)
1162                .map(|eid_str| {
1163                    let backend = backend.clone();
1164                    let reason = reason.clone();
1165                    let flow_id = flow_id.clone();
1166                    async move {
1167                        let Ok(eid) = ExecutionId::parse(&eid_str) else {
1168                            tracing::warn!(
1169                                flow_id = %flow_id,
1170                                execution_id = %eid_str,
1171                                "cancel_flow(async): member id failed to parse; skipping"
1172                            );
1173                            return;
1174                        };
1175                        let cancel_args = ff_core::contracts::CancelExecutionArgs {
1176                            execution_id: eid.clone(),
1177                            reason: reason.clone(),
1178                            source: ff_core::types::CancelSource::OperatorOverride,
1179                            lease_id: None,
1180                            lease_epoch: None,
1181                            attempt_id: None,
1182                            now,
1183                        };
1184                        match backend.cancel_execution(cancel_args).await {
1185                            Ok(_) => {
1186                                ack_cancel_member_via_backend(
1187                                    backend.as_ref(),
1188                                    &flow_id,
1189                                    &eid,
1190                                    &eid_str,
1191                                )
1192                                .await;
1193                            }
1194                            Err(e) => {
1195                                if is_terminal_ack_engine_error(&e) {
1196                                    ack_cancel_member_via_backend(
1197                                        backend.as_ref(),
1198                                        &flow_id,
1199                                        &eid,
1200                                        &eid_str,
1201                                    )
1202                                    .await;
1203                                } else {
1204                                    tracing::warn!(
1205                                        flow_id = %flow_id,
1206                                        execution_id = %eid_str,
1207                                        error = %e,
1208                                        "cancel_flow(async): individual execution cancel failed \
1209                                         (transport/contract fault; reconciler will retry if transient)"
1210                                    );
1211                                }
1212                            }
1213                        }
1214                    }
1215                })
1216                .buffer_unordered(CONCURRENCY)
1217                .for_each(|()| async {})
1218                .await;
1219
1220            tracing::debug!(
1221                flow_id = %flow_id_for_log,
1222                member_count,
1223                concurrency = CONCURRENCY,
1224                "cancel_flow: background member dispatch complete"
1225            );
1226        });
1227        drop(guard);
1228
1229        let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1230        Ok(CancelFlowResult::CancellationScheduled {
1231            cancellation_policy: policy,
1232            member_count,
1233            member_execution_ids: members,
1234        })
1235    }
1236
1237    /// Stage a dependency edge between two executions in a flow.
1238    ///
1239    /// Runs on the flow partition {fp:N}.
1240    /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1241    pub async fn stage_dependency_edge(
1242        &self,
1243        args: &StageDependencyEdgeArgs,
1244    ) -> Result<StageDependencyEdgeResult, ServerError> {
1245        Ok(self.backend.stage_dependency_edge(args.clone()).await?)
1246    }
1247
1248    /// Apply a staged dependency edge to the child execution. RFC-017
1249    /// Stage D2: delegates through the backend trait.
1250    pub async fn apply_dependency_to_child(
1251        &self,
1252        args: &ApplyDependencyToChildArgs,
1253    ) -> Result<ApplyDependencyToChildResult, ServerError> {
1254        Ok(self.backend.apply_dependency_to_child(args.clone()).await?)
1255    }
1256
1257    // ── Execution operations API ──
1258
1259    /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1260    ///
1261    /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1262    /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1263    pub async fn deliver_signal(
1264        &self,
1265        args: &DeliverSignalArgs,
1266    ) -> Result<DeliverSignalResult, ServerError> {
1267        // RFC-017 Stage A migration: dispatch through the backend
1268        // trait. The previous body (lane pre-read + KEYS(14) + ARGV(18)
1269        // FCALL dispatch + `parse_deliver_signal_result`) lives
1270        // verbatim inside `ValkeyBackend::deliver_signal` →
1271        // `deliver_signal_impl`. Clone required because the trait
1272        // method takes `DeliverSignalArgs` by value (see
1273        // `ff_core::engine_backend::EngineBackend::deliver_signal`).
1274        Ok(self.backend.deliver_signal(args.clone()).await?)
1275    }
1276
1277    /// Change an execution's priority. RFC-017 Stage D2: delegates
1278    /// through the backend trait. Empty `lane_id` triggers the backend-
1279    /// internal HGET pre-read (matches legacy inherent behaviour).
1280    pub async fn change_priority(
1281        &self,
1282        execution_id: &ExecutionId,
1283        new_priority: i32,
1284    ) -> Result<ChangePriorityResult, ServerError> {
1285        let args = ff_core::contracts::ChangePriorityArgs {
1286            execution_id: execution_id.clone(),
1287            new_priority,
1288            lane_id: LaneId::new(""),
1289            now: TimestampMs::now(),
1290        };
1291        Ok(self.backend.change_priority(args).await?)
1292    }
1293
1294    /// Scheduler-routed claim entry point.
1295    ///
1296    /// RFC-017 Wave 8 Stage E3 (§7): dispatches through the backend
1297    /// trait. The Valkey backend forwards to its wired
1298    /// [`ff_scheduler::Scheduler`]; the Postgres backend forwards to
1299    /// [`ff_backend_postgres::scheduler::PostgresScheduler`]'s
1300    /// `FOR UPDATE SKIP LOCKED` admission pipeline. Returns
1301    /// `Ok(None)` when no eligible execution exists on the lane at
1302    /// this scan cycle — the enum-typed trait outcome
1303    /// (`ClaimForWorkerOutcome::NoWork`) is collapsed to `Option::None`
1304    /// for the inherent-call contract pre-existing Stage E.
1305    ///
1306    /// Error mapping: scheduler-class errors arrive as
1307    /// [`EngineError`] via the trait boundary and thread through
1308    /// `ServerError::Engine`'s HTTP arm
1309    /// (budget / capability / unavailable classes land on the
1310    /// documented 400/409/503 response codes — see `api::ApiError::into_response`).
1311    pub async fn claim_for_worker(
1312        &self,
1313        lane: &LaneId,
1314        worker_id: &WorkerId,
1315        worker_instance_id: &WorkerInstanceId,
1316        worker_capabilities: &std::collections::BTreeSet<String>,
1317        grant_ttl_ms: u64,
1318    ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1319        let args = ff_core::contracts::ClaimForWorkerArgs::new(
1320            lane.clone(),
1321            worker_id.clone(),
1322            worker_instance_id.clone(),
1323            worker_capabilities.clone(),
1324            grant_ttl_ms,
1325        );
1326        match self.backend.claim_for_worker(args).await? {
1327            ff_core::contracts::ClaimForWorkerOutcome::Granted(g) => Ok(Some(g)),
1328            ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
1329            // `#[non_exhaustive]` — any future additive variant
1330            // surfaces as a typed Engine error so callers see a
1331            // loud miss instead of a silent `None`.
1332            _ => Err(ServerError::Engine(Box::new(
1333                ff_core::engine_error::EngineError::Unavailable {
1334                    op: "claim_for_worker: unknown ClaimForWorkerOutcome variant",
1335                },
1336            ))),
1337        }
1338    }
1339
1340    /// Revoke an active lease (operator-initiated). RFC-017 Stage D2:
1341    /// delegates through the backend trait. The backend's trait impl
1342    /// returns `RevokeLeaseResult::AlreadySatisfied` when no active
1343    /// lease is present; the Server facade preserves its pre-migration
1344    /// `ServerError::NotFound` behaviour by re-mapping that variant.
1345    pub async fn revoke_lease(
1346        &self,
1347        execution_id: &ExecutionId,
1348    ) -> Result<RevokeLeaseResult, ServerError> {
1349        let args = ff_core::contracts::RevokeLeaseArgs {
1350            execution_id: execution_id.clone(),
1351            expected_lease_id: None,
1352            worker_instance_id: WorkerInstanceId::new(""),
1353            reason: "operator_revoke".to_owned(),
1354        };
1355        match self.backend.revoke_lease(args).await? {
1356            RevokeLeaseResult::AlreadySatisfied { reason } if reason == "no_active_lease" => {
1357                Err(ServerError::NotFound(format!(
1358                    "no active lease for execution {execution_id} (no current_worker_instance_id)"
1359                )))
1360            }
1361            other => Ok(other),
1362        }
1363    }
1364
1365    /// Get full execution info (HGETALL-shape on Valkey; SELECT-shape on
1366    /// Postgres once Wave 9 wires it). RFC-017 Stage E2: routed through
1367    /// the backend trait's [`ff_core::engine_backend::EngineBackend::read_execution_info`].
1368    pub async fn get_execution(
1369        &self,
1370        execution_id: &ExecutionId,
1371    ) -> Result<ExecutionInfo, ServerError> {
1372        match self.backend.read_execution_info(execution_id).await? {
1373            Some(info) => Ok(info),
1374            None => Err(ServerError::NotFound(format!(
1375                "execution not found: {execution_id}"
1376            ))),
1377        }
1378    }
1379
1380    /// Partition-scoped forward-only cursor listing of executions.
1381    ///
1382    /// Parity-wrapper around the Valkey body of
1383    /// [`ff_core::engine_backend::EngineBackend::list_executions`].
1384    /// Issue #182 replaced the previous offset + lane + state-filter
1385    /// shape with this cursor-based API (per owner adjudication:
1386    /// cursor-everywhere, HTTP surface unreleased). Reads
1387    /// `ff:idx:{p:N}:all_executions`, sorts lexicographically on
1388    /// `ExecutionId`, filters `> cursor`, and trims to `limit`.
1389    pub async fn list_executions_page(
1390        &self,
1391        partition_id: u16,
1392        cursor: Option<ExecutionId>,
1393        limit: usize,
1394    ) -> Result<ListExecutionsPage, ServerError> {
1395        // RFC-017 Stage A migration: dispatch through the backend
1396        // trait. The previous body (SMEMBERS + parse + lex-sort +
1397        // filter + cap) is preserved verbatim inside
1398        // `ValkeyBackend::list_executions`. One deliberate behaviour
1399        // change: corrupt members now surface as
1400        // `EngineError::Validation { kind: Corruption, .. }` (→
1401        // `ServerError::Engine`), where the legacy path warn-logged
1402        // and skipped them. This matches RFC-012's fail-loud contract
1403        // for read-surface corruption.
1404        let partition = ff_core::partition::Partition {
1405            family: ff_core::partition::PartitionFamily::Execution,
1406            index: partition_id,
1407        };
1408        let partition_key = ff_core::partition::PartitionKey::from(&partition);
1409        Ok(self
1410            .backend
1411            .list_executions(partition_key, cursor, limit)
1412            .await?)
1413    }
1414
1415    /// Replay a terminal execution. RFC-017 Stage D2: delegates through
1416    /// the backend trait; the variadic-KEYS pre-read (HMGET + SMEMBERS
1417    /// for inbound edges on skipped flow members) now lives inside
1418    /// `ValkeyBackend::replay_execution`.
1419    pub async fn replay_execution(
1420        &self,
1421        execution_id: &ExecutionId,
1422    ) -> Result<ReplayExecutionResult, ServerError> {
1423        let args = ff_core::contracts::ReplayExecutionArgs {
1424            execution_id: execution_id.clone(),
1425            now: TimestampMs::now(),
1426        };
1427        Ok(self.backend.replay_execution(args).await?)
1428    }
1429
1430    /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
1431    /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
1432    /// when the producer finalizes.
1433    ///
1434    /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
1435    /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
1436    /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
1437    /// and the Lua-side reject).
1438    ///
1439    /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
1440    /// execution id, so all KEYS share the same slot.
1441    pub async fn read_attempt_stream(
1442        &self,
1443        execution_id: &ExecutionId,
1444        attempt_index: AttemptIndex,
1445        from_id: &str,
1446        to_id: &str,
1447        count_limit: u64,
1448    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
1449        if count_limit == 0 {
1450            return Err(ServerError::InvalidInput(
1451                "count_limit must be >= 1".to_owned(),
1452            ));
1453        }
1454        // RFC-017 Stage B row 10: delegate through the trait. The
1455        // backend owns the stream-op semaphore + XRANGE dispatch; the
1456        // 429-on-contention semantics round-trip as
1457        // `EngineError::ResourceExhausted → ServerError::Engine →
1458        // HTTP 429` (see `ServerError::from` below).
1459        let from = wire_str_to_stream_cursor(from_id);
1460        let to = wire_str_to_stream_cursor(to_id);
1461        Ok(self
1462            .backend
1463            .read_stream(execution_id, attempt_index, from, to, count_limit)
1464            .await?)
1465    }
1466
1467    /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
1468    /// plus the terminal signal so a polling consumer can exit when the
1469    /// producer closes the stream.
1470    ///
1471    /// `last_id` is exclusive — XREAD returns entries with id > last_id.
1472    /// Pass `"0-0"` to read from the beginning.
1473    ///
1474    /// `block_ms == 0` → non-blocking peek (returns immediately).
1475    /// `block_ms > 0`  → blocks up to that many ms. Empty `frames` +
1476    /// `closed_at=None` → timeout, no new data, still open.
1477    ///
1478    /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
1479    ///
1480    /// Implemented as a direct XREAD command (not FCALL) because blocking
1481    /// commands are rejected inside Valkey Functions. The terminal
1482    /// markers come from a companion HMGET on `stream_meta` — see
1483    /// `ff_script::stream_tail` module docs.
1484    pub async fn tail_attempt_stream(
1485        &self,
1486        execution_id: &ExecutionId,
1487        attempt_index: AttemptIndex,
1488        last_id: &str,
1489        block_ms: u64,
1490        count_limit: u64,
1491    ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
1492        if count_limit == 0 {
1493            return Err(ServerError::InvalidInput(
1494                "count_limit must be >= 1".to_owned(),
1495            ));
1496        }
1497        // RFC-017 Stage B row 10: delegate through the trait. The
1498        // backend owns the stream-op semaphore + XREAD BLOCK dispatch
1499        // (via `duplicate_connection()` per #204, so neither a shared
1500        // tail client nor a serialising mutex is needed). Saturation
1501        // round-trips as `EngineError::ResourceExhausted → HTTP 429`;
1502        // a post-shutdown arrival round-trips as
1503        // `EngineError::Unavailable → HTTP 503`.
1504        let after = wire_str_to_stream_cursor(last_id);
1505        Ok(self
1506            .backend
1507            .tail_stream(
1508                execution_id,
1509                attempt_index,
1510                after,
1511                block_ms,
1512                count_limit,
1513                ff_core::backend::TailVisibility::All,
1514            )
1515            .await?)
1516    }
1517
1518    /// Graceful shutdown — stops scanners, drains background handler tasks
1519    /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
1520    /// for scanners to finish.
1521    ///
1522    /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
1523    /// cleanly without new arrivals piling up:
1524    ///
1525    /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
1526    ///    with `ServerError::OperationFailed("stream semaphore closed …")`
1527    ///    which the REST layer surfaces as a 500 with `retryable=false`
1528    ///    (ops tooling may choose to wait + retry on 503-class responses;
1529    ///    the body clearly names the shutdown reason).
1530    /// 2. Drain handler-spawned background tasks with a 15s ceiling.
1531    /// 3. `engine.shutdown()` stops scanners.
1532    ///
1533    /// Existing in-flight tails finish on their natural `block_ms`
1534    /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
1535    /// is dropped after this function returns. We do NOT wait for tails
1536    /// to drain explicitly — the semaphore-close + natural-timeout
1537    /// combination bounds shutdown to roughly `block_ms + 15s` in the
1538    /// worst case. Callers observing a dropped connection retry against
1539    /// whatever replacement is coming up.
1540    pub async fn shutdown(self) {
1541        tracing::info!("shutting down FlowFabric server");
1542
1543        // Step 1: RFC-017 Stage B — delegate stream-op pool closure
1544        // + drain to the backend's `shutdown_prepare` hook. The
1545        // Valkey impl closes its semaphore (no new read/tail starts)
1546        // and awaits in-flight permits up to `grace`. A timeout here
1547        // is logged + counted on `ff_shutdown_timeout_total`; we
1548        // continue with best-effort drain of the server's own
1549        // background tasks rather than blocking shutdown behind a
1550        // single slow tail.
1551        let drain_timeout = Duration::from_secs(15);
1552        match self.backend.shutdown_prepare(drain_timeout).await {
1553            Ok(()) => tracing::info!(
1554                "backend shutdown_prepare complete (stream-op pool drained)"
1555            ),
1556            Err(ff_core::engine_error::EngineError::Timeout { elapsed, .. }) => {
1557                self.metrics.inc_shutdown_timeout();
1558                tracing::warn!(
1559                    elapsed_ms = elapsed.as_millis() as u64,
1560                    "shutdown_prepare exceeded grace; proceeding best-effort"
1561                );
1562            }
1563            Err(e) => {
1564                // Non-timeout errors don't block shutdown either, but
1565                // they're unexpected — log at warn so operators see
1566                // the signal without tripping an alert.
1567                tracing::warn!(
1568                    err = %e,
1569                    "shutdown_prepare returned error; proceeding best-effort"
1570                );
1571            }
1572        }
1573
1574        // Step 2: Drain handler-spawned background tasks with the same
1575        // ceiling as Engine::shutdown. If dispatch is still running at
1576        // the deadline, drop the JoinSet to abort remaining tasks.
1577        let background = self.background_tasks.clone();
1578        let drain = async move {
1579            let mut guard = background.lock().await;
1580            while guard.join_next().await.is_some() {}
1581        };
1582        match tokio::time::timeout(drain_timeout, drain).await {
1583            Ok(()) => {}
1584            Err(_) => {
1585                tracing::warn!(
1586                    timeout_s = drain_timeout.as_secs(),
1587                    "shutdown: background tasks did not finish in time, aborting"
1588                );
1589                self.background_tasks.lock().await.abort_all();
1590            }
1591        }
1592
1593        if let Some(engine) = self.engine {
1594            engine.shutdown().await;
1595        }
1596        tracing::info!("FlowFabric server shutdown complete");
1597    }
1598}
1599
1600/// RFC-017 Stage B: lift the wire string (`"-"`, `"+"`, or a concrete
1601/// entry id) used by the REST boundary into the typed
1602/// [`ff_core::contracts::StreamCursor`] the trait method expects.
1603/// Keeps the `read_attempt_stream` / `tail_attempt_stream`
1604/// public-function signatures byte-identical while dispatching
1605/// through the backend.
1606fn wire_str_to_stream_cursor(s: &str) -> ff_core::contracts::StreamCursor {
1607    match s {
1608        "-" => ff_core::contracts::StreamCursor::Start,
1609        "+" => ff_core::contracts::StreamCursor::End,
1610        other => ff_core::contracts::StreamCursor::At(other.to_owned()),
1611    }
1612}
1613
1614
1615/// Result of a waitpoint HMAC secret rotation across all execution partitions.
1616#[derive(Debug, Clone, serde::Serialize)]
1617pub struct RotateWaitpointSecretResult {
1618    /// Count of partitions that accepted the rotation.
1619    pub rotated: u16,
1620    /// Partition indices that failed — operator should investigate (Valkey
1621    /// outage, auth failure, cluster split). Rotation is idempotent, so a
1622    /// re-run after the underlying fault clears converges to the correct
1623    /// state.
1624    pub failed: Vec<u16>,
1625    /// New kid installed as current.
1626    pub new_kid: String,
1627}
1628
1629impl Server {
1630    /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
1631    /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
1632    /// as the new current kid. Idempotent: re-running with the same `new_kid`
1633    /// and `new_secret_hex` converges partitions to the same state.
1634    ///
1635    /// Returns a structured result so operators can see which partitions failed.
1636    /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
1637    pub async fn rotate_waitpoint_secret(
1638        &self,
1639        new_kid: &str,
1640        new_secret_hex: &str,
1641    ) -> Result<RotateWaitpointSecretResult, ServerError> {
1642        if new_kid.is_empty() || new_kid.contains(':') {
1643            return Err(ServerError::OperationFailed(
1644                "new_kid must be non-empty and must not contain ':'".into(),
1645            ));
1646        }
1647        if new_secret_hex.is_empty()
1648            || !new_secret_hex.len().is_multiple_of(2)
1649            || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
1650        {
1651            return Err(ServerError::OperationFailed(
1652                "new_secret_hex must be a non-empty even-length hex string".into(),
1653            ));
1654        }
1655
1656        // Single-writer gate — admin semaphore + audit log stay on
1657        // Server per RFC-017 §4 row 11. The per-partition fan-out
1658        // moved inside `ValkeyBackend::rotate_waitpoint_hmac_secret_all`.
1659        let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
1660            Ok(p) => p,
1661            Err(tokio::sync::TryAcquireError::NoPermits) => {
1662                return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
1663            }
1664            Err(tokio::sync::TryAcquireError::Closed) => {
1665                return Err(ServerError::OperationFailed(
1666                    "admin rotate semaphore closed (server shutting down)".into(),
1667                ));
1668            }
1669        };
1670
1671        let n = self.config.partition_config.num_flow_partitions;
1672        let grace_ms = self.config.waitpoint_hmac_grace_ms;
1673
1674        // RFC-017 Stage B row 11: delegate the per-partition fan-out
1675        // to the backend. The trait method returns one entry per
1676        // partition with an inner `Result` so partial success is
1677        // observable — matching the pre-migration Server body.
1678        let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
1679            new_kid.to_owned(),
1680            new_secret_hex.to_owned(),
1681            grace_ms,
1682        );
1683        let result = self
1684            .backend
1685            .rotate_waitpoint_hmac_secret_all(args)
1686            .await?;
1687
1688        let mut rotated = 0u16;
1689        let mut failed: Vec<u16> = Vec::new();
1690        for entry in &result.entries {
1691            match &entry.result {
1692                Ok(_) => {
1693                    rotated += 1;
1694                    tracing::debug!(
1695                        partition = entry.partition,
1696                        new_kid = %new_kid,
1697                        "waitpoint_hmac_rotated"
1698                    );
1699                }
1700                Err(e) => {
1701                    tracing::error!(
1702                        target: "audit",
1703                        partition = entry.partition,
1704                        err = %e,
1705                        "waitpoint_hmac_rotation_failed"
1706                    );
1707                    failed.push(entry.partition);
1708                }
1709            }
1710        }
1711
1712        // Single aggregated audit event (RFC-017 row 11: audit emit
1713        // stays on Server).
1714        tracing::info!(
1715            target: "audit",
1716            new_kid = %new_kid,
1717            total_partitions = n,
1718            rotated,
1719            failed_count = failed.len(),
1720            "waitpoint_hmac_rotation_complete"
1721        );
1722
1723        Ok(RotateWaitpointSecretResult {
1724            rotated,
1725            failed,
1726            new_kid: new_kid.to_owned(),
1727        })
1728    }
1729}
1730
1731// ── FCALL result parsing ──
1732
1733
1734
1735
1736
1737// ── Flow FCALL result parsing ──
1738
1739
1740
1741
1742/// Extract a string from an FCALL result array at the given index.
1743/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
1744/// for transport-level variants. Business-logic variants keep their code as
1745/// `ServerError::Script(String)` so HTTP clients see a stable message.
1746///
1747/// Why this exists: before R2, the stream handlers did
1748/// `ScriptError → format!() → ServerError::Script(String)`, which erased
1749/// the ErrorKind and made `ServerError::is_retryable()` always return
1750/// false. Retry-capable clients (cairn-fabric) would not retry a legit
1751/// transient error like `IoError`.
1752#[allow(dead_code)] // retained for non-stream FCALL paths that still route via raw ScriptError; stream handlers moved to trait in RFC-017 Stage B
1753fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
1754    match e {
1755        ff_script::error::ScriptError::Valkey(valkey_err) => {
1756            crate::server::backend_context(valkey_err, "stream FCALL transport")
1757        }
1758        other => ServerError::Script(other.to_string()),
1759    }
1760}
1761
1762/// Acknowledge that a member cancel has committed. Delegates to
1763/// [`EngineBackend::ack_cancel_member`] (Valkey: `ff_ack_cancel_member`
1764/// FCALL on `{fp:N}` — SREM the execution from the flow's
1765/// `pending_cancels` set and, if empty, ZREM the flow from the
1766/// partition-level `cancel_backlog`). Best-effort — failures are
1767/// logged but not propagated, since the reconciler drains any
1768/// leftovers on its next pass.
1769async fn ack_cancel_member_via_backend(
1770    backend: &dyn EngineBackend,
1771    flow_id: &FlowId,
1772    execution_id: &ExecutionId,
1773    eid_str: &str,
1774) {
1775    if let Err(e) = backend.ack_cancel_member(flow_id, execution_id).await {
1776        tracing::warn!(
1777            flow_id = %flow_id,
1778            execution_id = %eid_str,
1779            error = %e,
1780            "ack_cancel_member failed; reconciler will drain on next pass"
1781        );
1782    }
1783}
1784
1785/// Engine-error variant: inspects an
1786/// `EngineError` returned from `self.backend.cancel_execution(...)`
1787/// and returns `true` when the member is already terminal. Matches the
1788/// Lua-code semantics of `execution_not_active` / `execution_not_found`
1789/// via the typed `State` / `Validation` classifications the backend
1790/// trait impl maps them to.
1791fn is_terminal_ack_engine_error(err: &EngineError) -> bool {
1792    match err {
1793        // Already terminal (Lua's `execution_not_active`) or missing
1794        // entirely (`execution_not_found`) — both treated as ack-worthy
1795        // so the cancel-backlog doesn't poison on a member already in
1796        // the intended terminal state.
1797        EngineError::State(kind) => matches!(
1798            kind,
1799            ff_core::engine_error::StateKind::Terminal
1800        ),
1801        EngineError::NotFound { .. } => true,
1802        EngineError::Contextual { source, .. } => is_terminal_ack_engine_error(source),
1803        _ => false,
1804    }
1805}
1806
1807
1808#[cfg(test)]
1809mod tests {
1810    use super::*;
1811    use ferriskey::ErrorKind;
1812
1813    fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1814        ferriskey::Error::from((kind, "synthetic"))
1815    }
1816
1817    #[test]
1818    fn is_retryable_backend_variant_uses_kind_table() {
1819        // Transport-bucketed: retryable.
1820        assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
1821        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
1822        assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1823        // Cluster-bucketed (Moved / Ask / TryAgain / ClusterDown): retryable
1824        // after topology settles — the #88 BackendErrorKind classifier
1825        // treats these as transient cluster-churn, a semantic refinement
1826        // over the previous ff-script retry table.
1827        assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
1828        assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1829        assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
1830        assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
1831        // BusyLoading: retryable.
1832        assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
1833
1834        // Auth / Protocol / ScriptNotLoaded: terminal.
1835        assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
1836        assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1837        assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
1838    }
1839
1840    #[test]
1841    fn is_retryable_backend_context_uses_kind_table() {
1842        let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
1843        assert!(err.is_retryable());
1844
1845        let err =
1846            crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
1847        assert!(!err.is_retryable());
1848    }
1849
1850    #[test]
1851    fn is_retryable_library_load_delegates_to_inner_kind() {
1852        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1853            mk_fk_err(ErrorKind::IoError),
1854        ));
1855        assert!(err.is_retryable());
1856
1857        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1858            mk_fk_err(ErrorKind::AuthenticationFailed),
1859        ));
1860        assert!(!err.is_retryable());
1861
1862        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
1863            expected: "1".into(),
1864            got: "2".into(),
1865        });
1866        assert!(!err.is_retryable());
1867    }
1868
1869    #[test]
1870    fn is_retryable_business_logic_variants_are_false() {
1871        assert!(!ServerError::NotFound("x".into()).is_retryable());
1872        assert!(!ServerError::InvalidInput("x".into()).is_retryable());
1873        assert!(!ServerError::OperationFailed("x".into()).is_retryable());
1874        assert!(!ServerError::Script("x".into()).is_retryable());
1875        assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
1876    }
1877
1878    #[test]
1879    fn backend_kind_delegates_through_library_load() {
1880        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1881            mk_fk_err(ErrorKind::ClusterDown),
1882        ));
1883        assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
1884
1885        let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
1886            expected: "1".into(),
1887            got: "2".into(),
1888        });
1889        assert_eq!(err.backend_kind(), None);
1890    }
1891
1892
1893    #[test]
1894    fn valkey_version_too_low_is_not_retryable() {
1895        let err = ServerError::ValkeyVersionTooLow {
1896            detected: "7.0".into(),
1897            required: "7.2".into(),
1898        };
1899        assert!(!err.is_retryable());
1900        assert_eq!(err.backend_kind(), None);
1901    }
1902
1903    #[test]
1904    fn valkey_version_too_low_error_message_includes_both_versions() {
1905        let err = ServerError::ValkeyVersionTooLow {
1906            detected: "7.0".into(),
1907            required: "7.2".into(),
1908        };
1909        let msg = err.to_string();
1910        assert!(msg.contains("7.0"), "detected version in message: {msg}");
1911        assert!(msg.contains("7.2"), "required version in message: {msg}");
1912        assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
1913    }
1914}