ff_server/server.rs
1use std::sync::Arc;
2use std::time::Duration;
3
4use ferriskey::ClientBuilder;
5use tokio::sync::Mutex as AsyncMutex;
6use tokio::task::JoinSet;
7use ff_core::engine_backend::EngineBackend;
8use ff_core::engine_error::EngineError;
9use ff_core::contracts::{
10 AddExecutionToFlowArgs, AddExecutionToFlowResult, BudgetStatus, CancelExecutionArgs,
11 CancelExecutionResult, CancelFlowArgs, CancelFlowResult, ChangePriorityResult,
12 CreateBudgetArgs, CreateBudgetResult, CreateExecutionArgs, CreateExecutionResult,
13 CreateFlowArgs, CreateFlowResult, CreateQuotaPolicyArgs, CreateQuotaPolicyResult,
14 ApplyDependencyToChildArgs, ApplyDependencyToChildResult,
15 DeliverSignalArgs, DeliverSignalResult, ExecutionInfo,
16 ListExecutionsPage, ReplayExecutionResult,
17 ReportUsageArgs, ReportUsageResult, ResetBudgetResult,
18 RevokeLeaseResult,
19 StageDependencyEdgeArgs, StageDependencyEdgeResult,
20};
21use ff_core::partition::{execution_partition, flow_partition, PartitionConfig};
22use ff_core::state::PublicState;
23use ff_core::types::*;
24use ff_engine::Engine;
25use ff_script::retry::is_retryable_kind;
26
27use crate::config::{BackendKind, ServerConfig};
28
29/// RFC-017 §9.0: backends that may boot as of this Stage. Postgres
30/// joins at Stage E (v0.8.0). Compiled into the binary by design —
31/// see RFC-017 §9.0 "Fleet-wide cutover requirement" for the
32/// rolling-upgrade implication.
33const BACKEND_STAGE_READY: &[&str] = &["valkey", "postgres"];
34
35/// Upper bound on `member_execution_ids` returned in the
36/// [`CancelFlowResult::Cancelled`] response when the flow was already in a
37/// terminal state (idempotent retry). The first (non-idempotent) cancel call
38/// returns the full list; retries only need a sample.
39const ALREADY_TERMINAL_MEMBER_CAP: usize = 1000;
40
41/// FlowFabric server — connects everything together.
42///
43/// Manages the Valkey connection, Lua library loading, background scanners,
44/// and provides a minimal API for Phase 1.
45pub struct Server {
46 /// Server-wide Semaphore(1) gating admin rotate calls. Legitimate
47 /// operators rotate ~monthly and can afford to serialize; concurrent
48 /// rotate requests are an attack or misbehaving script. Holding the
49 /// permit also guards against interleaved partial rotations on the
50 /// Server side of the per-partition locks, surfacing contention as
51 /// HTTP 429 instead of silently queueing and blowing past the 120s
52 /// HTTP timeout. See `rotate_waitpoint_secret` handler.
53 admin_rotate_semaphore: Arc<tokio::sync::Semaphore>,
54 /// Valkey engine (14 scanners). `None` on the Postgres boot path
55 /// — engine scanners are Valkey-only (RFC-017 Wave 8 Stage E1;
56 /// Postgres reconcilers run out-of-process via
57 /// `ff-backend-postgres::reconcilers` + a separate scanner
58 /// supervisor that Stage E2/E3 will wire).
59 engine: Option<Engine>,
60 config: ServerConfig,
61 // RFC-017 Wave 8 Stage E3 (§9.3 / F8+Q4): `Server::scheduler` field
62 // removed. The Valkey `ff_scheduler::Scheduler` is owned by
63 // `ValkeyBackend` (installed via `with_scheduler` during boot);
64 // the Postgres twin is constructed per-call inside
65 // `PostgresBackend::claim_for_worker`. Server-side dispatch of
66 // `claim_for_worker` now flows through `self.backend` only —
67 // trait cutover per §4 row 9.
68 /// Background tasks spawned by async handlers (e.g. cancel_flow member
69 /// dispatch). Drained on shutdown with a bounded timeout.
70 background_tasks: Arc<AsyncMutex<JoinSet<()>>>,
71 /// PR-94: observability registry. Always present; the no-op shim
72 /// takes zero runtime cost when the `observability` feature is
73 /// off, and the real OTEL-backed registry is passed in via
74 /// [`Server::start_with_metrics`] when on. Same `Arc` is shared
75 /// with [`Engine::start_with_metrics`] and
76 /// [`ff_scheduler::Scheduler::with_metrics`] so a single scrape
77 /// sees everything the process produces.
78 metrics: Arc<ff_observability::Metrics>,
79 /// RFC-017 Stage A: backend trait object for the data-plane
80 /// migration. Dual-field posture — the existing `client` /
81 /// `tail_client` / `engine` / `scheduler` fields still serve the
82 /// unmigrated handlers during Stages A-D; migrated handlers
83 /// dispatch here. Stages B-D progressively retire the Client
84 /// fields per RFC-017 §9.
85 backend: Arc<dyn EngineBackend>,
86}
87
88/// Server error type.
89#[derive(Debug, thiserror::Error)]
90pub enum ServerError {
91 /// Backend transport error. Previously wrapped `ferriskey::Error`
92 /// directly (#88); now carries a backend-agnostic
93 /// [`ff_core::BackendError`] so consumers match on
94 /// [`ff_core::BackendErrorKind`] instead of ferriskey's native
95 /// taxonomy. The ferriskey → [`ff_core::BackendError`] mapping
96 /// lives in `ff_backend_valkey::backend_error_from_ferriskey`.
97 #[error("backend: {0}")]
98 Backend(#[from] ff_core::BackendError),
99 /// Backend error with additional context. Previously
100 /// `ValkeyContext { source: ferriskey::Error }` (#88).
101 #[error("backend ({context}): {source}")]
102 BackendContext {
103 #[source]
104 source: ff_core::BackendError,
105 context: String,
106 },
107 #[error("config: {0}")]
108 Config(#[from] crate::config::ConfigError),
109 #[error("library load: {0}")]
110 LibraryLoad(#[from] ff_script::loader::LoadError),
111 #[error("partition mismatch: {0}")]
112 PartitionMismatch(String),
113 #[error("not found: {0}")]
114 NotFound(String),
115 #[error("invalid input: {0}")]
116 InvalidInput(String),
117 #[error("operation failed: {0}")]
118 OperationFailed(String),
119 #[error("script: {0}")]
120 Script(String),
121 /// Server-wide concurrency limit reached on a labelled pool. Surfaces
122 /// as HTTP 429 at the REST boundary so load balancers and clients can
123 /// retry with backoff. The `source` label ("stream_ops", "admin_rotate",
124 /// etc.) identifies WHICH pool is exhausted so operators aren't
125 /// misled by a single "tail unavailable" string when the real fault
126 /// is rotation contention.
127 ///
128 /// Fields: (source_label, max_permits).
129 #[error("too many concurrent {0} calls (max: {1})")]
130 ConcurrencyLimitExceeded(&'static str, u32),
131 /// Detected Valkey version is below the RFC-011 §13 minimum. The engine
132 /// depends on Valkey Functions (stabilized in 7.2), RESP3 (7.2+), and
133 /// hash-tag routing; older versions do not implement the required
134 /// primitives. Operator action: upgrade Valkey.
135 #[error(
136 "valkey version too low: detected {detected}, required >= {required} (RFC-011 §13)"
137 )]
138 ValkeyVersionTooLow {
139 detected: String,
140 required: String,
141 },
142 /// RFC-017 §9.0 hard-gate: selected backend is not yet permitted
143 /// to boot in this `ff-server` binary. Operator action is to
144 /// either (a) select `FF_BACKEND=valkey`, or (b) upgrade to a
145 /// Stage E binary once v0.8.0 ships.
146 ///
147 /// `stage` names the current stage ("A"/"B"/"C"/"D") so operator
148 /// tooling can correlate the refusal with the migration plan.
149 #[error(
150 "backend not ready: {backend} (not in BACKEND_STAGE_READY; current stage {stage}). \
151 Set FF_BACKEND=valkey or FF_BACKEND=postgres."
152 )]
153 BackendNotReady {
154 backend: &'static str,
155 stage: &'static str,
156 },
157 /// RFC-017 Stage A: an `EngineBackend` trait method returned a
158 /// typed error that is not one of the specific business outcomes
159 /// existing `ServerError` variants model. Includes transport
160 /// faults, validation/corruption, and `Unavailable` for methods
161 /// the backend has not implemented yet.
162 ///
163 /// Stage B/C migrations may refine individual arms into richer
164 /// `ServerError` variants as more handlers route through the
165 /// trait; Stage A keeps this catch-all so the migration lands
166 /// additively. Boxed to keep `ServerError` small (clippy
167 /// `result_large_err` — `EngineError` is ~200 bytes).
168 #[error("engine: {0}")]
169 Engine(#[from] Box<ff_core::engine_error::EngineError>),
170}
171
172/// Lift a native `ferriskey::Error` into [`ServerError::Backend`] via
173/// [`ff_backend_valkey::backend_error_from_ferriskey`] (#88). Keeps
174/// `?`-propagation ergonomic at ferriskey call sites while the
175/// public variant stays backend-agnostic.
176impl From<ferriskey::Error> for ServerError {
177 fn from(err: ferriskey::Error) -> Self {
178 Self::Backend(ff_backend_valkey::backend_error_from_ferriskey(&err))
179 }
180}
181
182/// Lift an unboxed `EngineError` into [`ServerError::Engine`]. The
183/// variant stores a `Box<EngineError>` to keep `ServerError` small
184/// (clippy `result_large_err`); this `From` restores `?`-propagation
185/// from trait-dispatched handler paths.
186impl From<ff_core::engine_error::EngineError> for ServerError {
187 fn from(err: ff_core::engine_error::EngineError) -> Self {
188 Self::Engine(Box::new(err))
189 }
190}
191
192/// Build a [`ServerError::BackendContext`] from a native
193/// `ferriskey::Error` and a call-site label (#88).
194pub(crate) fn backend_context(
195 err: ferriskey::Error,
196 context: impl Into<String>,
197) -> ServerError {
198 ServerError::BackendContext {
199 source: ff_backend_valkey::backend_error_from_ferriskey(&err),
200 context: context.into(),
201 }
202}
203
204impl ServerError {
205 /// Returns the classified [`ff_core::BackendErrorKind`] if this
206 /// error carries a backend transport fault. Covers direct
207 /// Backend variants and library-load failures.
208 ///
209 /// Renamed from `valkey_kind` in #88 — the previous return type
210 /// `Option<ferriskey::ErrorKind>` leaked ferriskey into every
211 /// consumer doing retry/error classification.
212 pub fn backend_kind(&self) -> Option<ff_core::BackendErrorKind> {
213 match self {
214 Self::Backend(be) | Self::BackendContext { source: be, .. } => Some(be.kind()),
215 Self::LibraryLoad(e) => e
216 .valkey_kind()
217 .map(ff_backend_valkey::classify_ferriskey_kind),
218 // RFC-017 Stage A: Engine(EngineError) arm is intentionally
219 // lumped in with the rest (no backend_kind — variants like
220 // Validation / NotFound are business-logic, and Transport
221 // variants already surface under Backend upstream in most
222 // paths). Stage B/C will revisit when more handlers route
223 // through the trait.
224 _ => None,
225 }
226 }
227
228 /// Whether this error is safely retryable by a caller. For
229 /// backend transport variants, delegates to
230 /// [`ff_core::BackendErrorKind::is_retryable`]. Business-logic
231 /// rejections (NotFound, InvalidInput, OperationFailed, Script,
232 /// Config, PartitionMismatch) return false — those won't change
233 /// on retry.
234 pub fn is_retryable(&self) -> bool {
235 match self {
236 Self::Backend(be) | Self::BackendContext { source: be, .. } => {
237 be.kind().is_retryable()
238 }
239 Self::LibraryLoad(load_err) => load_err
240 .valkey_kind()
241 .map(is_retryable_kind)
242 .unwrap_or(false),
243 Self::Config(_)
244 | Self::PartitionMismatch(_)
245 | Self::NotFound(_)
246 | Self::InvalidInput(_)
247 | Self::OperationFailed(_)
248 | Self::Script(_) => false,
249 // Back off and retry — the bound is a server-side permit pool,
250 // so the retry will succeed once a permit frees up. Applies
251 // equally to stream ops, admin rotate, etc.
252 Self::ConcurrencyLimitExceeded(_, _) => true,
253 // Operator must upgrade Valkey; a retry at the caller won't help.
254 Self::ValkeyVersionTooLow { .. } => false,
255 // Operator must change FF_BACKEND; not retryable.
256 Self::BackendNotReady { .. } => false,
257 // EngineError's classification helpers handle transport
258 // retry semantics; mirror them so trait-dispatched handlers
259 // keep the same retry policy as the Client path.
260 Self::Engine(e) => matches!(
261 e.as_ref(),
262 EngineError::Transport { .. } | EngineError::Contextual { .. }
263 ),
264 }
265 }
266}
267
268impl Server {
269 /// Start the FlowFabric server.
270 ///
271 /// Boot sequence:
272 /// 1. Connect to Valkey
273 /// 2. Validate or create partition config key
274 /// 3. Load the FlowFabric Lua library
275 /// 4. Start engine (14 background scanners)
276 pub async fn start(config: ServerConfig) -> Result<Self, ServerError> {
277 Self::start_with_metrics(config, Arc::new(ff_observability::Metrics::new())).await
278 }
279
280 /// PR-94: boot the server with a shared observability registry.
281 ///
282 /// Scanner cycle + scheduler metrics record into this registry;
283 /// `main.rs` threads the same handle into the router so `/metrics`
284 /// exposes what the engine produces. The no-arg [`Server::start`]
285 /// forwards here with a fresh `Metrics::new()` — under the default
286 /// build that's the shim, under `observability` it's a real
287 /// registry not shared with any HTTP route (useful for tests
288 /// exercising the engine in isolation).
289 ///
290 /// **RFC-017 Stage A:** this path gates `config.backend` against
291 /// [`BACKEND_STAGE_READY`] (refuses `BackendKind::Postgres` at
292 /// boot per §9.0), dials the Valkey cluster through the legacy
293 /// path, then synthesises an `Arc<ValkeyBackend>` around the
294 /// dialed client and populates `Server.backend`. The dual-field
295 /// posture is explicit through Stage D; Stage E retires the
296 /// legacy Client fields. See [`Server::start_with_backend`] for
297 /// the test-injection entry point that takes a caller-supplied
298 /// backend.
299 pub async fn start_with_metrics(
300 config: ServerConfig,
301 metrics: Arc<ff_observability::Metrics>,
302 ) -> Result<Self, ServerError> {
303 // RFC-017 §9.0 hard-gate. At v0.8.0 (Stage E4) both `valkey` and
304 // `postgres` are ready; this check remains as defence-in-depth
305 // so future backend additions must explicitly opt into the list.
306 // The `FF_BACKEND_ACCEPT_UNREADY` / `FF_ENV=development` dev-mode
307 // override was retired at Stage E4 because it is no longer
308 // needed — both supported backends now boot without override.
309 let label = config.backend.as_str();
310 if !BACKEND_STAGE_READY.contains(&label) {
311 return Err(ServerError::BackendNotReady {
312 backend: match config.backend {
313 BackendKind::Postgres => "postgres",
314 BackendKind::Valkey => "valkey",
315 },
316 stage: "E4",
317 });
318 }
319
320 // RFC-017 Wave 8 Stage E1: Postgres dial branch. Stage E4 flips
321 // `BACKEND_STAGE_READY` to include "postgres"; until then the
322 // dev-mode override above is the only way in. On the Postgres
323 // path we skip the Valkey-specific engine/scanner + scheduler
324 // construction entirely (E3 wires the scheduler twin).
325 if matches!(config.backend, BackendKind::Postgres) {
326 return Self::start_postgres_branch(config, metrics).await;
327 }
328 // Step 1: Connect to Valkey via ClientBuilder
329 tracing::info!(
330 host = %config.valkey.host, port = config.valkey.port,
331 tls = config.valkey.tls, cluster = config.valkey.cluster,
332 "connecting to Valkey"
333 );
334 let mut builder = ClientBuilder::new()
335 .host(&config.valkey.host, config.valkey.port)
336 .connect_timeout(Duration::from_secs(10))
337 .request_timeout(Duration::from_millis(5000));
338 if config.valkey.tls {
339 builder = builder.tls();
340 }
341 if config.valkey.cluster {
342 builder = builder.cluster();
343 }
344 let client = builder
345 .build()
346 .await
347 .map_err(|e| crate::server::backend_context(e, "connect"))?;
348
349 // Verify connectivity
350 let pong: String = client
351 .cmd("PING")
352 .execute()
353 .await
354 .map_err(|e| crate::server::backend_context(e, "PING"))?;
355 if pong != "PONG" {
356 return Err(ServerError::OperationFailed(format!(
357 "unexpected PING response: {pong}"
358 )));
359 }
360 tracing::info!("Valkey connection established");
361
362 // RFC-017 Wave 8 Stage D (§4 row 12): the five Valkey-specific
363 // deployment-initialisation steps (version verify, partition
364 // config, HMAC install, FUNCTION LOAD, lanes seed) now live
365 // behind `ValkeyBackend::initialize_deployment`. Load-bearing
366 // ordering contract preserved in that method's doc-comment.
367 // The call is sequenced after the connection but before the
368 // engine + backend wiring below so the pre-relocation boot
369 // ordering is byte-for-byte identical.
370 let init_backend = ff_backend_valkey::ValkeyBackend::from_client_and_partitions(
371 client.clone(),
372 config.partition_config,
373 );
374 init_backend
375 .initialize_deployment(
376 &config.waitpoint_hmac_secret,
377 &config.lanes,
378 config.valkey.skip_library_load,
379 )
380 .await
381 .map_err(|e| ServerError::Engine(Box::new(e)))?;
382
383 // Step 4: Start engine with scanners
384 // Build a fresh EngineConfig rather than cloning (EngineConfig doesn't derive Clone).
385 let engine_cfg = ff_engine::EngineConfig {
386 partition_config: config.partition_config,
387 lanes: config.lanes.clone(),
388 lease_expiry_interval: config.engine_config.lease_expiry_interval,
389 delayed_promoter_interval: config.engine_config.delayed_promoter_interval,
390 index_reconciler_interval: config.engine_config.index_reconciler_interval,
391 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
392 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
393 pending_wp_expiry_interval: config.engine_config.pending_wp_expiry_interval,
394 retention_trimmer_interval: config.engine_config.retention_trimmer_interval,
395 budget_reset_interval: config.engine_config.budget_reset_interval,
396 budget_reconciler_interval: config.engine_config.budget_reconciler_interval,
397 quota_reconciler_interval: config.engine_config.quota_reconciler_interval,
398 unblock_interval: config.engine_config.unblock_interval,
399 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
400 flow_projector_interval: config.engine_config.flow_projector_interval,
401 execution_deadline_interval: config.engine_config.execution_deadline_interval,
402 cancel_reconciler_interval: config.engine_config.cancel_reconciler_interval,
403 edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
404 edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
405 scanner_filter: config.engine_config.scanner_filter.clone(),
406 };
407 // Engine scanners keep running on the MAIN `client` mux — NOT on
408 // `tail_client`. Scanner cadence is foreground-latency-coupled by
409 // design (an incident that blocks all FCALLs should also visibly
410 // block scanners), and keeping scanners off the tail client means a
411 // long-poll tail can never starve lease-expiry, retention-trim,
412 // etc. Do not change this without revisiting RFC-006 Impl Notes.
413 // Build the Valkey completion backend (issue #90) and subscribe.
414 // This replaces the pre-#90 `CompletionListenerConfig` path:
415 // the wire subscription now lives in ff-backend-valkey, the
416 // engine just consumes the resulting stream.
417 let mut valkey_conn = ff_core::backend::ValkeyConnection::new(
418 config.valkey.host.clone(),
419 config.valkey.port,
420 );
421 valkey_conn.tls = config.valkey.tls;
422 valkey_conn.cluster = config.valkey.cluster;
423 let completion_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
424 client.clone(),
425 config.partition_config,
426 valkey_conn,
427 );
428 let completion_stream = <ff_backend_valkey::ValkeyBackend as ff_core::completion_backend::CompletionBackend>::subscribe_completions(&completion_backend)
429 .await
430 .map_err(|e| ServerError::OperationFailed(format!(
431 "subscribe_completions: {e}"
432 )))?;
433
434 let engine = Engine::start_with_completions(
435 engine_cfg,
436 client.clone(),
437 metrics.clone(),
438 completion_stream,
439 );
440
441 // RFC-017 Stage B: the dedicated `tail_client`, the
442 // `stream_semaphore`, and the `xread_block_lock` moved into
443 // `ValkeyBackend` (§6). After issue #204's switch to per-call
444 // `duplicate_connection()` inside `tail_stream_impl`, the
445 // dedicated tail mux + serialising mutex are no longer
446 // required — each `XREAD BLOCK` gets its own socket — so
447 // the encapsulated impl only needs the bounded semaphore to
448 // preserve the existing 429-on-contention contract at the
449 // REST boundary.
450 tracing::info!(
451 max_concurrent_stream_ops = config.max_concurrent_stream_ops,
452 "stream-op semaphore lives inside ValkeyBackend (RFC-017 Stage B)"
453 );
454
455 // Admin surface warning. /v1/admin/* endpoints (waitpoint HMAC
456 // rotation, etc.) are only protected by the global Bearer
457 // middleware in api.rs — which is only installed when
458 // config.api_token is set. Without FF_API_TOKEN, a public
459 // deployment exposes secret rotation to anyone that can reach
460 // the listen_addr. Warn loudly so operators can't miss it; we
461 // don't fail-start because single-tenant dev uses this path.
462 if config.api_token.is_none() {
463 tracing::warn!(
464 listen_addr = %config.listen_addr,
465 "FF_API_TOKEN is unset — /v1/admin/* endpoints (including \
466 rotate-waitpoint-secret) are UNAUTHENTICATED. Set \
467 FF_API_TOKEN for any deployment reachable from untrusted \
468 networks."
469 );
470 // Explicit callout for the credential-bearing read endpoints.
471 // Auditors grep for these on a per-endpoint basis; lumping
472 // into the admin warning alone hides the fact that
473 // /pending-waitpoints returns HMAC tokens and /result
474 // returns completion payload bytes.
475 tracing::warn!(
476 listen_addr = %config.listen_addr,
477 "FF_API_TOKEN is unset — GET /v1/executions/{{id}}/pending-waitpoints \
478 returns HMAC waitpoint_tokens (bearer credentials for signal delivery) \
479 and GET /v1/executions/{{id}}/result returns raw completion payload \
480 bytes (may contain PII). Both are UNAUTHENTICATED in this \
481 configuration."
482 );
483 }
484
485 // Partition counts — post-RFC-011 there are three physical families.
486 // Execution keys co-locate with their parent flow's partition (§2 of
487 // the RFC), so `num_flow_partitions` governs both exec and flow
488 // routing; no separate `num_execution_partitions` count exists.
489 tracing::info!(
490 flow_partitions = config.partition_config.num_flow_partitions,
491 budget_partitions = config.partition_config.num_budget_partitions,
492 quota_partitions = config.partition_config.num_quota_partitions,
493 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
494 listen_addr = %config.listen_addr,
495 "FlowFabric server started. Partitions (flow/budget/quota): {}/{}/{}. Scanners: 14 active.",
496 config.partition_config.num_flow_partitions,
497 config.partition_config.num_budget_partitions,
498 config.partition_config.num_quota_partitions,
499 );
500
501 let scheduler = Arc::new(ff_scheduler::Scheduler::with_metrics(
502 client.clone(),
503 config.partition_config,
504 metrics.clone(),
505 ));
506
507 // RFC-017 Stage A: synthesise an `Arc<ValkeyBackend>` around the
508 // already-dialed client. Zero additional round-trips; the
509 // backend shares the same ferriskey connection as the legacy
510 // path so migrated + legacy handlers observe identical state.
511 // Stage B relocates `tail_client` / `stream_semaphore` into
512 // the backend; Stage E retires the Client fields entirely.
513 let mut valkey_backend = ff_backend_valkey::ValkeyBackend::from_client_partitions_and_connection(
514 client.clone(),
515 config.partition_config,
516 {
517 let mut c = ff_core::backend::ValkeyConnection::new(
518 config.valkey.host.clone(),
519 config.valkey.port,
520 );
521 c.tls = config.valkey.tls;
522 c.cluster = config.valkey.cluster;
523 c
524 },
525 );
526 // RFC-017 Stage B: size the backend's stream-op semaphore
527 // before handing out the `Arc`. `get_mut` succeeds here
528 // because `valkey_backend` is the sole `Arc` owner at this
529 // point.
530 if !valkey_backend.with_stream_semaphore_permits(config.max_concurrent_stream_ops) {
531 return Err(ServerError::OperationFailed(
532 "ValkeyBackend stream semaphore sizing failed (unexpected Arc sharing)".into(),
533 ));
534 }
535 // RFC-017 Stage C: install the scheduler handle so the
536 // backend's `claim_for_worker` trait impl dispatches through
537 // it. Stage E3 removed the redundant `Server::scheduler`
538 // field — the backend is the sole owner of the scheduler
539 // after this install.
540 if !valkey_backend.with_scheduler(scheduler) {
541 return Err(ServerError::OperationFailed(
542 "ValkeyBackend scheduler wiring failed (unexpected Arc sharing)".into(),
543 ));
544 }
545 let backend: Arc<dyn EngineBackend> = valkey_backend as Arc<dyn EngineBackend>;
546
547 Ok(Self {
548 // Single-permit semaphore: only one rotate-waitpoint-secret can
549 // be mid-flight server-wide. Attackers or misbehaving scripts
550 // firing parallel rotations fail fast with 429 instead of
551 // queueing HTTP handlers.
552 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
553 engine: Some(engine),
554 config,
555 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
556 metrics,
557 backend,
558 })
559 }
560
561 /// RFC-017 Wave 8 Stage E1/E2: Postgres boot branch for
562 /// [`Server::start_with_metrics`]. Called after the §9.0 hard-gate
563 /// admitted the boot under the dev-mode override.
564 ///
565 /// **Scope (Stage E2):**
566 /// * Dial Postgres + wrap an `Arc<PostgresBackend>` as the
567 /// `backend` field (all migrated HTTP handlers dispatch through
568 /// the trait — create_execution / create_flow / add member /
569 /// stage edge / apply dep / describe_flow / cancel_flow / etc.).
570 /// * **No Valkey dial.** The E1 residual ambient-Valkey client is
571 /// retired in E2 — `Server::client` is gone, `cancel_flow_header`
572 /// / `ack_cancel_member` / `read_execution_info` /
573 /// `read_execution_state` / `fetch_waitpoint_token_v07` all flow
574 /// through the trait. Legacy reads that Postgres does not yet
575 /// implement surface as `EngineError::Unavailable` (Wave 9).
576 /// * Skip engine + scheduler. Stage E3 wires the Postgres-specific
577 /// `claim_for_worker` via the trait; until then the scheduler
578 /// field stays `None`.
579 /// * Run `apply_migrations` against the Postgres pool so an empty
580 /// database becomes usable without operator out-of-band steps.
581 async fn start_postgres_branch(
582 config: ServerConfig,
583 metrics: Arc<ff_observability::Metrics>,
584 ) -> Result<Self, ServerError> {
585 if config.postgres.url.is_empty() {
586 return Err(ServerError::InvalidInput(
587 "FF_BACKEND=postgres requires FF_POSTGRES_URL".into(),
588 ));
589 }
590 tracing::info!(
591 pool_size = config.postgres.pool_size,
592 "dialing Postgres backend (RFC-017 Stage E3)"
593 );
594 let mut pg_backend_arc = ff_backend_postgres::PostgresBackend::connect_with_metrics(
595 config.postgres_config(),
596 config.partition_config,
597 metrics.clone(),
598 )
599 .await
600 .map_err(|e| ServerError::Engine(Box::new(e)))?;
601
602 // Apply schema migrations idempotently so an empty target
603 // database becomes usable. The underlying pool is shared with
604 // the backend — one pool, one migration run.
605 ff_backend_postgres::apply_migrations(pg_backend_arc.pool())
606 .await
607 .map_err(|e| {
608 ServerError::OperationFailed(format!("postgres apply_migrations: {e}"))
609 })?;
610
611 // RFC-017 Wave 8 Stage E3: spawn the six Postgres reconcilers
612 // (attempt_timeout, lease_expiry, suspension_timeout,
613 // dependency, edge_cancel_dispatcher, edge_cancel_reconciler)
614 // as background tick loops owned by the backend. Drained on
615 // `Server::shutdown` via `backend.shutdown_prepare(grace)`.
616 let scanner_cfg = ff_backend_postgres::PostgresScannerConfig {
617 attempt_timeout_interval: config.engine_config.attempt_timeout_interval,
618 lease_expiry_interval: config.engine_config.lease_expiry_interval,
619 suspension_timeout_interval: config.engine_config.suspension_timeout_interval,
620 dependency_reconciler_interval: config.engine_config.dependency_reconciler_interval,
621 edge_cancel_dispatcher_interval: config.engine_config.edge_cancel_dispatcher_interval,
622 edge_cancel_reconciler_interval: config.engine_config.edge_cancel_reconciler_interval,
623 budget_reset_interval: config.engine_config.budget_reset_interval,
624 dependency_stale_threshold_ms:
625 ff_backend_postgres::PostgresScannerConfig::DEFAULT_DEP_STALE_MS,
626 scanner_filter: config.engine_config.scanner_filter.clone(),
627 partition_config: config.partition_config,
628 };
629 if !pg_backend_arc.with_scanners(scanner_cfg) {
630 return Err(ServerError::OperationFailed(
631 "PostgresBackend scanner install failed (unexpected Arc sharing)".into(),
632 ));
633 }
634
635 let backend: Arc<dyn EngineBackend> = pg_backend_arc;
636
637 tracing::info!(
638 flow_partitions = config.partition_config.num_flow_partitions,
639 lanes = ?config.lanes.iter().map(|l| l.as_str()).collect::<Vec<_>>(),
640 "FlowFabric server started (Postgres backend, Stage E3). \
641 6 Postgres reconcilers active; claim_for_worker routed to \
642 PostgresScheduler. No ambient Valkey client."
643 );
644
645 Ok(Self {
646 admin_rotate_semaphore: Arc::new(tokio::sync::Semaphore::new(1)),
647 engine: None,
648 config,
649 background_tasks: Arc::new(AsyncMutex::new(JoinSet::new())),
650 metrics,
651 backend,
652 })
653 }
654
655 /// RFC-017 Stage A: test-injection + future-embedded-user entry
656 /// point. Takes a caller-constructed `Arc<dyn EngineBackend>` +
657 /// the Valkey connection/engine scaffolding
658 /// [`Server::start_with_metrics`] normally dials for itself.
659 ///
660 /// **Stage A scope:** Stage A is still dual-field — the legacy
661 /// `client` / `tail_client` / `engine` / `scheduler` fields are
662 /// constructed here exactly as in the main boot path, because
663 /// unmigrated handlers still need them. The caller-supplied
664 /// `backend` populates the new trait-object field and services
665 /// the handlers migrated in this stage (see RFC-017 §4
666 /// migration table).
667 ///
668 /// **Stage D evolution:** once the boot path relocates into each
669 /// backend's `connect_with_metrics` (RFC-017 §9 Stage D), this
670 /// entry point becomes the sole constructor — `Server::start` and
671 /// `Server::start_with_metrics` are thin shims that build the
672 /// backend first, then forward here.
673 ///
674 /// Today (Stage A) this path is exercised by `MockBackend` in
675 /// `tests/parity_stage_a.rs`; it does NOT replace the Valkey
676 /// dial under the main binary.
677 pub async fn start_with_backend(
678 config: ServerConfig,
679 backend: Arc<dyn EngineBackend>,
680 metrics: Arc<ff_observability::Metrics>,
681 ) -> Result<Self, ServerError> {
682 // Stage A: forward through the legacy dial so unmigrated
683 // handlers keep working, then overwrite `backend` with the
684 // caller-supplied handle. Stage D rewires this so the
685 // caller's backend drives the whole boot.
686 let mut server = Self::start_with_metrics(config, metrics).await?;
687 server.backend = backend;
688 Ok(server)
689 }
690
691 /// RFC-017 Stage A: access the backend trait-object driving
692 /// migrated handlers. Stable surface for tests that need to
693 /// inspect the backend directly (e.g. `backend_label()`
694 /// assertions). The Server will dispatch more handlers through
695 /// this handle as Stages B-D land.
696 pub fn backend(&self) -> &Arc<dyn EngineBackend> {
697 &self.backend
698 }
699
700 /// PR-94: access the shared observability registry.
701 pub fn metrics(&self) -> &Arc<ff_observability::Metrics> {
702 &self.metrics
703 }
704
705 // RFC-017 Stage E2 (§9 Stage D bullet completed): `Server::client()`
706 // accessor + underlying `Client` field both removed. External
707 // callers route ping / healthz through the backend trait
708 // (`self.backend.ping()` → `ValkeyBackend::ping`). The
709 // `ff_cancel_flow` header FCALL, its `flow_already_terminal`
710 // HMGET/SMEMBERS fallback, the per-member `ff_ack_cancel_member`
711 // backlog drain, and the `get_execution*` / `fetch_waitpoint_token_v07`
712 // reads are all reachable through the Stage E2 trait additions
713 // (`cancel_flow_header`, `ack_cancel_member`, `read_execution_info`,
714 // `read_execution_state`, `fetch_waitpoint_token_v07`).
715
716 /// Get the server config.
717 pub fn config(&self) -> &ServerConfig {
718 &self.config
719 }
720
721 /// Get the partition config.
722 pub fn partition_config(&self) -> &PartitionConfig {
723 &self.config.partition_config
724 }
725
726 // ── Minimal Phase 1 API ──
727
728 /// Create a new execution. RFC-017 Stage D2: delegates through the
729 /// backend trait. The KEYS/ARGV build + FCALL dispatch + result parse
730 /// live verbatim in `ValkeyBackend::create_execution`.
731 pub async fn create_execution(
732 &self,
733 args: &CreateExecutionArgs,
734 ) -> Result<CreateExecutionResult, ServerError> {
735 Ok(self.backend.create_execution(args.clone()).await?)
736 }
737
738 /// Cancel an execution. RFC-017 Stage D2: delegates through the
739 /// backend trait.
740 pub async fn cancel_execution(
741 &self,
742 args: &CancelExecutionArgs,
743 ) -> Result<CancelExecutionResult, ServerError> {
744 Ok(self.backend.cancel_execution(args.clone()).await?)
745 }
746
747 /// Get the public state of an execution.
748 ///
749 /// Reads `public_state` from the exec_core hash. Returns the parsed
750 /// PublicState enum. If the execution is not found, returns an error.
751 pub async fn get_execution_state(
752 &self,
753 execution_id: &ExecutionId,
754 ) -> Result<PublicState, ServerError> {
755 // RFC-017 Stage E2: routed through the backend trait.
756 match self.backend.read_execution_state(execution_id).await? {
757 Some(s) => Ok(s),
758 None => Err(ServerError::NotFound(format!(
759 "execution not found: {execution_id}"
760 ))),
761 }
762 }
763
764 /// Read the raw result payload written by `ff_complete_execution`.
765 ///
766 /// The Lua side stores the payload at `ctx.result()` via plain `SET`.
767 /// No FCALL — this is a direct GET; returns `Ok(None)` when the
768 /// execution is missing, not yet complete, or (in a future
769 /// retention-policy world) when the result was trimmed.
770 ///
771 /// # Contract vs `get_execution_state`
772 ///
773 /// `get_execution_state` is the authoritative completion signal. If
774 /// a caller observes `state == completed` but `get_execution_result`
775 /// returns `None`, the result bytes are unavailable — not a caller
776 /// bug and not a server bug, just the retention policy trimming the
777 /// blob. V1 sets no retention, so callers on v1 can treat
778 /// `state == completed` + `Ok(None)` as a server bug.
779 ///
780 /// # Ordering
781 ///
782 /// Callers MUST wait for `state == completed` before calling this
783 /// method; polls issued before the state transition may hit a
784 /// narrow window where the completion Lua has written
785 /// `public_state = completed` but the `result` key SET is still
786 /// on-wire. The current Lua `ff_complete_execution` writes both in
787 /// the same atomic script, so the window is effectively zero for
788 /// direct callers — but retries via `ff_replay_execution` open it
789 /// briefly.
790 pub async fn get_execution_result(
791 &self,
792 execution_id: &ExecutionId,
793 ) -> Result<Option<Vec<u8>>, ServerError> {
794 // RFC-017 Stage E2: routed through the backend trait. The
795 // Valkey impl preserves binary-safe semantics via ferriskey's
796 // `Vec<u8>` FromValue; Postgres returns Unavailable until the
797 // result-store migration lands.
798 Ok(self.backend.get_execution_result(execution_id).await?)
799 }
800
801
802 // ── Budget / Quota API ──
803
804 /// Create a new budget policy.
805 /// Create a new budget policy. RFC-017 Stage D2: delegates through
806 /// the backend trait.
807 pub async fn create_budget(
808 &self,
809 args: &CreateBudgetArgs,
810 ) -> Result<CreateBudgetResult, ServerError> {
811 Ok(self.backend.create_budget(args.clone()).await?)
812 }
813
814 /// Create a new quota/rate-limit policy. RFC-017 Stage D2: delegates
815 /// through the backend trait.
816 pub async fn create_quota_policy(
817 &self,
818 args: &CreateQuotaPolicyArgs,
819 ) -> Result<CreateQuotaPolicyResult, ServerError> {
820 Ok(self.backend.create_quota_policy(args.clone()).await?)
821 }
822
823 /// Read-only budget status for operator visibility. RFC-017 Stage
824 /// D2: delegates through the backend trait.
825 pub async fn get_budget_status(
826 &self,
827 budget_id: &BudgetId,
828 ) -> Result<BudgetStatus, ServerError> {
829 Ok(self.backend.get_budget_status(budget_id).await?)
830 }
831
832 /// Report usage against a budget and check limits. RFC-017 Stage D2:
833 /// delegates through the backend trait's admin variant
834 /// (`report_usage_admin` — no worker handle required on the admin
835 /// path).
836 pub async fn report_usage(
837 &self,
838 budget_id: &BudgetId,
839 args: &ReportUsageArgs,
840 ) -> Result<ReportUsageResult, ServerError> {
841 let mut admin_args = ff_core::contracts::ReportUsageAdminArgs::new(
842 args.dimensions.clone(),
843 args.deltas.clone(),
844 args.now,
845 );
846 if let Some(key) = args.dedup_key.as_ref() {
847 admin_args = admin_args.with_dedup_key(key.clone());
848 }
849 Ok(self.backend.report_usage_admin(budget_id, admin_args).await?)
850 }
851
852 /// Reset a budget's usage counters and schedule the next reset.
853 /// RFC-017 Stage D2: delegates through the backend trait.
854 pub async fn reset_budget(
855 &self,
856 budget_id: &BudgetId,
857 ) -> Result<ResetBudgetResult, ServerError> {
858 let args = ff_core::contracts::ResetBudgetArgs {
859 budget_id: budget_id.clone(),
860 now: TimestampMs::now(),
861 };
862 Ok(self.backend.reset_budget(args).await?)
863 }
864
865 // ── Flow API ──
866
867 /// Create a new flow container. RFC-017 Stage D2: delegates through
868 /// the backend trait.
869 pub async fn create_flow(
870 &self,
871 args: &CreateFlowArgs,
872 ) -> Result<CreateFlowResult, ServerError> {
873 Ok(self.backend.create_flow(args.clone()).await?)
874 }
875
876 /// Add an execution to a flow.
877 ///
878 /// # Atomic single-FCALL commit (RFC-011 §7.3)
879 ///
880 /// Post-RFC-011 phase-3, exec_core co-locates with flow_core under
881 /// hash-tag routing (both hash to `{fp:N}` via the exec id's
882 /// embedded partition). A single atomic FCALL writes:
883 ///
884 /// - `members_set` SADD (flow membership)
885 /// - `exec_core.flow_id` HSET (back-pointer)
886 /// - `flow_index` SADD (self-heal)
887 /// - `flow_core` HINCRBY node_count / graph_revision +
888 /// HSET last_mutation_at
889 ///
890 /// All four writes commit atomically or none do (Valkey scripting
891 /// contract: validates-before-writing in the Lua body means
892 /// `flow_not_found` / `flow_already_terminal` early-returns fire
893 /// BEFORE any `redis.call()` mutation, and a mid-body error after
894 /// writes is not expected because all writes are on the same slot).
895 ///
896 /// The pre-RFC-011 two-phase contract (membership FCALL on `{fp:N}` plus separate HSET on `{p:N}`), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
897 ///
898 /// # Consumer contract
899 ///
900 /// The caller's `args.execution_id` **must** be co-located with
901 /// `args.flow_id`'s partition — i.e. minted via
902 /// `ExecutionId::for_flow(&args.flow_id, config)`. Passing a
903 /// `solo`-minted id (or any exec id hashing to a different
904 /// `{fp:N}` than the flow's) will fail at the Valkey level with
905 /// `CROSSSLOT` on a clustered deploy.
906 ///
907 /// Callers with a flow context in scope always use `for_flow`;
908 /// this is the only supported mint path for flow-member execs
909 /// post-RFC-011. Test fixtures that pre-date the co-location
910 /// contract use `TestCluster::new_execution_id_on_partition` to
911 /// pin to a specific hash-tag index for `fcall_create_flow`-style
912 /// helpers that hard-code their flow partition.
913 pub async fn add_execution_to_flow(
914 &self,
915 args: &AddExecutionToFlowArgs,
916 ) -> Result<AddExecutionToFlowResult, ServerError> {
917 // Preserve the typed `ServerError::PartitionMismatch` pre-flight
918 // check — the backend trait's implementation returns an
919 // `EngineError` on CROSSSLOT, which would surface as
920 // `ServerError::Engine(_)` and hide the consumer-contract
921 // violation. Keep the explicit check at the facade boundary.
922 let flow_part = flow_partition(&args.flow_id, &self.config.partition_config);
923 let exec_part = execution_partition(&args.execution_id, &self.config.partition_config);
924 if exec_part.index != flow_part.index {
925 return Err(ServerError::PartitionMismatch(format!(
926 "add_execution_to_flow: execution_id's partition {exec_p} != flow_id's partition {flow_p}. \
927 Post-RFC-011 §7.3 co-location requires mint via `ExecutionId::for_flow(&flow_id, config)` \
928 so the exec's hash-tag matches the flow's `{{fp:N}}`.",
929 exec_p = exec_part.index,
930 flow_p = flow_part.index,
931 )));
932 }
933 Ok(self.backend.add_execution_to_flow(args.clone()).await?)
934 }
935
936 /// Cancel a flow.
937 ///
938 /// Flips `public_flow_state` to `cancelled` atomically via
939 /// `ff_cancel_flow` on `{fp:N}`. For `cancel_all` policy, member
940 /// executions must be cancelled cross-partition; this dispatch runs in
941 /// the background and the call returns [`CancelFlowResult::CancellationScheduled`]
942 /// immediately. For all other policies (or flows with no members), or
943 /// when the flow was already in a terminal state (idempotent retry),
944 /// the call returns [`CancelFlowResult::Cancelled`].
945 ///
946 /// Clients that need synchronous completion can call [`Self::cancel_flow_wait`].
947 ///
948 /// # Backpressure
949 ///
950 /// Each call that hits the async dispatch path spawns a new task into
951 /// the shared background `JoinSet`. Rapid repeated calls against the
952 /// same flow will spawn *multiple* overlapping dispatch tasks. This is
953 /// not a correctness issue — each member cancel is idempotent and
954 /// terminal flows short-circuit via [`ff_core::contracts::CancelFlowHeader::AlreadyTerminal`]
955 /// — but heavy burst callers should either use `?wait=true` (serialises
956 /// the dispatch on the HTTP thread, giving natural backpressure) or
957 /// implement client-side deduplication on `flow_id`. The `JoinSet` is
958 /// drained with a 15s timeout on [`Self::shutdown`], so very long
959 /// dispatch tails may be aborted during graceful shutdown.
960 ///
961 /// # Orphan-member semantics on shutdown abort
962 ///
963 /// If shutdown fires `JoinSet::abort_all()` after its drain timeout
964 /// while a dispatch loop is mid-iteration, the already-issued
965 /// `ff_cancel_execution` FCALLs (atomic Lua) complete cleanly with
966 /// `terminal_outcome = cancelled` and the caller-supplied reason. The
967 /// members not yet visited are abandoned mid-loop. They remain in
968 /// whichever state they were in (active/eligible/suspended) until the
969 /// natural lifecycle scanners reach them: active leases expire
970 /// (`lease_expiry`) and attempt-timeout them to `expired`, suspended
971 /// members time out to `skipped`, eligible ones sit until retention
972 /// trim. So no orphan state — but the terminal_outcome for the
973 /// abandoned members will be `expired`/`skipped` rather than
974 /// `cancelled`, and the operator-supplied `reason` is lost for them.
975 /// Audit tooling that requires reason fidelity across shutdowns should
976 /// use `?wait=true`.
977 pub async fn cancel_flow(
978 &self,
979 args: &CancelFlowArgs,
980 ) -> Result<CancelFlowResult, ServerError> {
981 self.cancel_flow_inner(args, false).await
982 }
983
984 /// Cancel a flow and wait for all member cancellations to complete
985 /// inline. Slower than [`Self::cancel_flow`] for large flows, but
986 /// guarantees every member is in a terminal state on return.
987 pub async fn cancel_flow_wait(
988 &self,
989 args: &CancelFlowArgs,
990 ) -> Result<CancelFlowResult, ServerError> {
991 self.cancel_flow_inner(args, true).await
992 }
993
994 async fn cancel_flow_inner(
995 &self,
996 args: &CancelFlowArgs,
997 wait: bool,
998 ) -> Result<CancelFlowResult, ServerError> {
999 // RFC-017 Stage E2: the header FCALL + AlreadyTerminal fetch
1000 // now dispatch through the backend trait. The Server no longer
1001 // owns a ferriskey `Client`; `self.backend.cancel_flow_header`
1002 // encapsulates the Valkey-specific FCALL + reload-on-failover
1003 // + HMGET/SMEMBERS-for-AlreadyTerminal work previously inlined
1004 // here.
1005 let header = self.backend.cancel_flow_header(args.clone()).await?;
1006
1007 let (policy, members) = match header {
1008 ff_core::contracts::CancelFlowHeader::Cancelled {
1009 cancellation_policy,
1010 member_execution_ids,
1011 } => (cancellation_policy, member_execution_ids),
1012 // Idempotent retry: flow was already cancelled/completed/failed.
1013 // Return Cancelled with the *stored* policy + (capped) member
1014 // list so observability tooling gets the real historical state
1015 // rather than echoing the caller's retry intent. The backend
1016 // has already done the HMGET + SMEMBERS; the Server just caps
1017 // the member list to bound wire bandwidth.
1018 ff_core::contracts::CancelFlowHeader::AlreadyTerminal {
1019 stored_cancellation_policy,
1020 stored_cancel_reason,
1021 member_execution_ids,
1022 } => {
1023 let total_members = member_execution_ids.len();
1024 let stored_members: Vec<String> = member_execution_ids
1025 .into_iter()
1026 .take(ALREADY_TERMINAL_MEMBER_CAP)
1027 .collect();
1028 tracing::debug!(
1029 flow_id = %args.flow_id,
1030 stored_policy = stored_cancellation_policy.as_deref().unwrap_or(""),
1031 stored_reason = stored_cancel_reason.as_deref().unwrap_or(""),
1032 total_members,
1033 returned_members = stored_members.len(),
1034 "cancel_flow: flow already terminal, returning idempotent Cancelled"
1035 );
1036 return Ok(CancelFlowResult::Cancelled {
1037 cancellation_policy: stored_cancellation_policy
1038 .unwrap_or_else(|| args.cancellation_policy.clone()),
1039 member_execution_ids: stored_members,
1040 });
1041 }
1042 // `CancelFlowHeader` is `#[non_exhaustive]`. Any future
1043 // variant must be reviewed at this match site before it
1044 // reaches the wire; fall closed with a typed server error.
1045 other => {
1046 return Err(ServerError::OperationFailed(format!(
1047 "cancel_flow_header: unknown CancelFlowHeader variant: {other:?}"
1048 )));
1049 }
1050 };
1051 let needs_dispatch = policy == "cancel_all" && !members.is_empty();
1052
1053 if !needs_dispatch {
1054 return Ok(CancelFlowResult::Cancelled {
1055 cancellation_policy: policy,
1056 member_execution_ids: members,
1057 });
1058 }
1059
1060 if wait {
1061 // Synchronous dispatch — cancel every member inline before returning.
1062 // Collect per-member failures so the caller sees a
1063 // PartiallyCancelled outcome instead of a false-positive
1064 // Cancelled when any member cancel faulted. The
1065 // cancel-backlog reconciler still retries the unacked
1066 // members; surfacing the partial state lets operator
1067 // tooling alert without polling per-member state.
1068 // RFC-017 Stage E2: ack drain dispatches via the backend
1069 // trait's `ack_cancel_member` — the Server no longer owns
1070 // a raw ferriskey `Client`.
1071 let mut failed: Vec<String> = Vec::new();
1072 for eid_str in &members {
1073 let Ok(eid) = ExecutionId::parse(eid_str) else {
1074 failed.push(eid_str.clone());
1075 continue;
1076 };
1077 let cancel_args = ff_core::contracts::CancelExecutionArgs {
1078 execution_id: eid.clone(),
1079 reason: args.reason.clone(),
1080 source: ff_core::types::CancelSource::OperatorOverride,
1081 lease_id: None,
1082 lease_epoch: None,
1083 attempt_id: None,
1084 now: args.now,
1085 };
1086 match self.backend.cancel_execution(cancel_args).await {
1087 Ok(_) => {
1088 ack_cancel_member_via_backend(
1089 self.backend.as_ref(),
1090 &args.flow_id,
1091 &eid,
1092 eid_str,
1093 )
1094 .await;
1095 }
1096 Err(e) => {
1097 if is_terminal_ack_engine_error(&e) {
1098 ack_cancel_member_via_backend(
1099 self.backend.as_ref(),
1100 &args.flow_id,
1101 &eid,
1102 eid_str,
1103 )
1104 .await;
1105 continue;
1106 }
1107 tracing::warn!(
1108 execution_id = %eid_str,
1109 error = %e,
1110 "cancel_flow(wait): individual execution cancel failed \
1111 (transport/contract fault; reconciler will retry if transient)"
1112 );
1113 failed.push(eid_str.clone());
1114 }
1115 }
1116 }
1117 if failed.is_empty() {
1118 return Ok(CancelFlowResult::Cancelled {
1119 cancellation_policy: policy,
1120 member_execution_ids: members,
1121 });
1122 }
1123 return Ok(CancelFlowResult::PartiallyCancelled {
1124 cancellation_policy: policy,
1125 member_execution_ids: members,
1126 failed_member_execution_ids: failed,
1127 });
1128 }
1129
1130 // Asynchronous dispatch — spawn into the shared JoinSet so
1131 // Server::shutdown can wait for pending cancellations (bounded
1132 // by a shutdown timeout). RFC-017 Stage E2: both the
1133 // per-member cancel and the backlog ack dispatch through the
1134 // backend trait (the Server no longer holds a ferriskey handle).
1135 let backend = self.backend.clone();
1136 let reason = args.reason.clone();
1137 let now = args.now;
1138 let dispatch_members = members.clone();
1139 let flow_id = args.flow_id.clone();
1140 // Every async cancel_flow contends on this lock, but the
1141 // critical section is tiny: try_join_next drain + spawn.
1142 let mut guard = self.background_tasks.lock().await;
1143
1144 // Reap completed background dispatches before spawning the next.
1145 while let Some(joined) = guard.try_join_next() {
1146 if let Err(e) = joined {
1147 tracing::warn!(
1148 error = %e,
1149 "cancel_flow: background dispatch task panicked or was aborted"
1150 );
1151 }
1152 }
1153
1154 guard.spawn(async move {
1155 // Bounded parallel dispatch via futures::stream::buffer_unordered.
1156 use futures::stream::StreamExt;
1157 const CONCURRENCY: usize = 16;
1158
1159 let member_count = dispatch_members.len();
1160 let flow_id_for_log = flow_id.clone();
1161 futures::stream::iter(dispatch_members)
1162 .map(|eid_str| {
1163 let backend = backend.clone();
1164 let reason = reason.clone();
1165 let flow_id = flow_id.clone();
1166 async move {
1167 let Ok(eid) = ExecutionId::parse(&eid_str) else {
1168 tracing::warn!(
1169 flow_id = %flow_id,
1170 execution_id = %eid_str,
1171 "cancel_flow(async): member id failed to parse; skipping"
1172 );
1173 return;
1174 };
1175 let cancel_args = ff_core::contracts::CancelExecutionArgs {
1176 execution_id: eid.clone(),
1177 reason: reason.clone(),
1178 source: ff_core::types::CancelSource::OperatorOverride,
1179 lease_id: None,
1180 lease_epoch: None,
1181 attempt_id: None,
1182 now,
1183 };
1184 match backend.cancel_execution(cancel_args).await {
1185 Ok(_) => {
1186 ack_cancel_member_via_backend(
1187 backend.as_ref(),
1188 &flow_id,
1189 &eid,
1190 &eid_str,
1191 )
1192 .await;
1193 }
1194 Err(e) => {
1195 if is_terminal_ack_engine_error(&e) {
1196 ack_cancel_member_via_backend(
1197 backend.as_ref(),
1198 &flow_id,
1199 &eid,
1200 &eid_str,
1201 )
1202 .await;
1203 } else {
1204 tracing::warn!(
1205 flow_id = %flow_id,
1206 execution_id = %eid_str,
1207 error = %e,
1208 "cancel_flow(async): individual execution cancel failed \
1209 (transport/contract fault; reconciler will retry if transient)"
1210 );
1211 }
1212 }
1213 }
1214 }
1215 })
1216 .buffer_unordered(CONCURRENCY)
1217 .for_each(|()| async {})
1218 .await;
1219
1220 tracing::debug!(
1221 flow_id = %flow_id_for_log,
1222 member_count,
1223 concurrency = CONCURRENCY,
1224 "cancel_flow: background member dispatch complete"
1225 );
1226 });
1227 drop(guard);
1228
1229 let member_count = u32::try_from(members.len()).unwrap_or(u32::MAX);
1230 Ok(CancelFlowResult::CancellationScheduled {
1231 cancellation_policy: policy,
1232 member_count,
1233 member_execution_ids: members,
1234 })
1235 }
1236
1237 /// Stage a dependency edge between two executions in a flow.
1238 ///
1239 /// Runs on the flow partition {fp:N}.
1240 /// KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
1241 pub async fn stage_dependency_edge(
1242 &self,
1243 args: &StageDependencyEdgeArgs,
1244 ) -> Result<StageDependencyEdgeResult, ServerError> {
1245 Ok(self.backend.stage_dependency_edge(args.clone()).await?)
1246 }
1247
1248 /// Apply a staged dependency edge to the child execution. RFC-017
1249 /// Stage D2: delegates through the backend trait.
1250 pub async fn apply_dependency_to_child(
1251 &self,
1252 args: &ApplyDependencyToChildArgs,
1253 ) -> Result<ApplyDependencyToChildResult, ServerError> {
1254 Ok(self.backend.apply_dependency_to_child(args.clone()).await?)
1255 }
1256
1257 // ── Execution operations API ──
1258
1259 /// Deliver a signal to a suspended (or pending-waitpoint) execution.
1260 ///
1261 /// Pre-reads exec_core for waitpoint/suspension fields needed for KEYS.
1262 /// KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
1263 pub async fn deliver_signal(
1264 &self,
1265 args: &DeliverSignalArgs,
1266 ) -> Result<DeliverSignalResult, ServerError> {
1267 // RFC-017 Stage A migration: dispatch through the backend
1268 // trait. The previous body (lane pre-read + KEYS(14) + ARGV(18)
1269 // FCALL dispatch + `parse_deliver_signal_result`) lives
1270 // verbatim inside `ValkeyBackend::deliver_signal` →
1271 // `deliver_signal_impl`. Clone required because the trait
1272 // method takes `DeliverSignalArgs` by value (see
1273 // `ff_core::engine_backend::EngineBackend::deliver_signal`).
1274 Ok(self.backend.deliver_signal(args.clone()).await?)
1275 }
1276
1277 /// Change an execution's priority. RFC-017 Stage D2: delegates
1278 /// through the backend trait. Empty `lane_id` triggers the backend-
1279 /// internal HGET pre-read (matches legacy inherent behaviour).
1280 pub async fn change_priority(
1281 &self,
1282 execution_id: &ExecutionId,
1283 new_priority: i32,
1284 ) -> Result<ChangePriorityResult, ServerError> {
1285 let args = ff_core::contracts::ChangePriorityArgs {
1286 execution_id: execution_id.clone(),
1287 new_priority,
1288 lane_id: LaneId::new(""),
1289 now: TimestampMs::now(),
1290 };
1291 Ok(self.backend.change_priority(args).await?)
1292 }
1293
1294 /// Scheduler-routed claim entry point.
1295 ///
1296 /// RFC-017 Wave 8 Stage E3 (§7): dispatches through the backend
1297 /// trait. The Valkey backend forwards to its wired
1298 /// [`ff_scheduler::Scheduler`]; the Postgres backend forwards to
1299 /// [`ff_backend_postgres::scheduler::PostgresScheduler`]'s
1300 /// `FOR UPDATE SKIP LOCKED` admission pipeline. Returns
1301 /// `Ok(None)` when no eligible execution exists on the lane at
1302 /// this scan cycle — the enum-typed trait outcome
1303 /// (`ClaimForWorkerOutcome::NoWork`) is collapsed to `Option::None`
1304 /// for the inherent-call contract pre-existing Stage E.
1305 ///
1306 /// Error mapping: scheduler-class errors arrive as
1307 /// [`EngineError`] via the trait boundary and thread through
1308 /// `ServerError::Engine`'s HTTP arm
1309 /// (budget / capability / unavailable classes land on the
1310 /// documented 400/409/503 response codes — see `api::ApiError::into_response`).
1311 pub async fn claim_for_worker(
1312 &self,
1313 lane: &LaneId,
1314 worker_id: &WorkerId,
1315 worker_instance_id: &WorkerInstanceId,
1316 worker_capabilities: &std::collections::BTreeSet<String>,
1317 grant_ttl_ms: u64,
1318 ) -> Result<Option<ff_core::contracts::ClaimGrant>, ServerError> {
1319 let args = ff_core::contracts::ClaimForWorkerArgs::new(
1320 lane.clone(),
1321 worker_id.clone(),
1322 worker_instance_id.clone(),
1323 worker_capabilities.clone(),
1324 grant_ttl_ms,
1325 );
1326 match self.backend.claim_for_worker(args).await? {
1327 ff_core::contracts::ClaimForWorkerOutcome::Granted(g) => Ok(Some(g)),
1328 ff_core::contracts::ClaimForWorkerOutcome::NoWork => Ok(None),
1329 // `#[non_exhaustive]` — any future additive variant
1330 // surfaces as a typed Engine error so callers see a
1331 // loud miss instead of a silent `None`.
1332 _ => Err(ServerError::Engine(Box::new(
1333 ff_core::engine_error::EngineError::Unavailable {
1334 op: "claim_for_worker: unknown ClaimForWorkerOutcome variant",
1335 },
1336 ))),
1337 }
1338 }
1339
1340 /// Revoke an active lease (operator-initiated). RFC-017 Stage D2:
1341 /// delegates through the backend trait. The backend's trait impl
1342 /// returns `RevokeLeaseResult::AlreadySatisfied` when no active
1343 /// lease is present; the Server facade preserves its pre-migration
1344 /// `ServerError::NotFound` behaviour by re-mapping that variant.
1345 pub async fn revoke_lease(
1346 &self,
1347 execution_id: &ExecutionId,
1348 ) -> Result<RevokeLeaseResult, ServerError> {
1349 let args = ff_core::contracts::RevokeLeaseArgs {
1350 execution_id: execution_id.clone(),
1351 expected_lease_id: None,
1352 worker_instance_id: WorkerInstanceId::new(""),
1353 reason: "operator_revoke".to_owned(),
1354 };
1355 match self.backend.revoke_lease(args).await? {
1356 RevokeLeaseResult::AlreadySatisfied { reason } if reason == "no_active_lease" => {
1357 Err(ServerError::NotFound(format!(
1358 "no active lease for execution {execution_id} (no current_worker_instance_id)"
1359 )))
1360 }
1361 other => Ok(other),
1362 }
1363 }
1364
1365 /// Get full execution info (HGETALL-shape on Valkey; SELECT-shape on
1366 /// Postgres once Wave 9 wires it). RFC-017 Stage E2: routed through
1367 /// the backend trait's [`ff_core::engine_backend::EngineBackend::read_execution_info`].
1368 pub async fn get_execution(
1369 &self,
1370 execution_id: &ExecutionId,
1371 ) -> Result<ExecutionInfo, ServerError> {
1372 match self.backend.read_execution_info(execution_id).await? {
1373 Some(info) => Ok(info),
1374 None => Err(ServerError::NotFound(format!(
1375 "execution not found: {execution_id}"
1376 ))),
1377 }
1378 }
1379
1380 /// Partition-scoped forward-only cursor listing of executions.
1381 ///
1382 /// Parity-wrapper around the Valkey body of
1383 /// [`ff_core::engine_backend::EngineBackend::list_executions`].
1384 /// Issue #182 replaced the previous offset + lane + state-filter
1385 /// shape with this cursor-based API (per owner adjudication:
1386 /// cursor-everywhere, HTTP surface unreleased). Reads
1387 /// `ff:idx:{p:N}:all_executions`, sorts lexicographically on
1388 /// `ExecutionId`, filters `> cursor`, and trims to `limit`.
1389 pub async fn list_executions_page(
1390 &self,
1391 partition_id: u16,
1392 cursor: Option<ExecutionId>,
1393 limit: usize,
1394 ) -> Result<ListExecutionsPage, ServerError> {
1395 // RFC-017 Stage A migration: dispatch through the backend
1396 // trait. The previous body (SMEMBERS + parse + lex-sort +
1397 // filter + cap) is preserved verbatim inside
1398 // `ValkeyBackend::list_executions`. One deliberate behaviour
1399 // change: corrupt members now surface as
1400 // `EngineError::Validation { kind: Corruption, .. }` (→
1401 // `ServerError::Engine`), where the legacy path warn-logged
1402 // and skipped them. This matches RFC-012's fail-loud contract
1403 // for read-surface corruption.
1404 let partition = ff_core::partition::Partition {
1405 family: ff_core::partition::PartitionFamily::Execution,
1406 index: partition_id,
1407 };
1408 let partition_key = ff_core::partition::PartitionKey::from(&partition);
1409 Ok(self
1410 .backend
1411 .list_executions(partition_key, cursor, limit)
1412 .await?)
1413 }
1414
1415 /// Replay a terminal execution. RFC-017 Stage D2: delegates through
1416 /// the backend trait; the variadic-KEYS pre-read (HMGET + SMEMBERS
1417 /// for inbound edges on skipped flow members) now lives inside
1418 /// `ValkeyBackend::replay_execution`.
1419 pub async fn replay_execution(
1420 &self,
1421 execution_id: &ExecutionId,
1422 ) -> Result<ReplayExecutionResult, ServerError> {
1423 let args = ff_core::contracts::ReplayExecutionArgs {
1424 execution_id: execution_id.clone(),
1425 now: TimestampMs::now(),
1426 };
1427 Ok(self.backend.replay_execution(args).await?)
1428 }
1429
1430 /// Read frames from an attempt's stream (XRANGE wrapper) plus terminal
1431 /// markers (`closed_at`, `closed_reason`) so consumers can stop polling
1432 /// when the producer finalizes.
1433 ///
1434 /// `from_id` and `to_id` accept XRANGE special markers: `"-"` for
1435 /// earliest, `"+"` for latest. `count_limit` MUST be `>= 1` —
1436 /// `0` returns a `ServerError::InvalidInput` (matches the REST boundary
1437 /// and the Lua-side reject).
1438 ///
1439 /// Cluster-safe: the attempt's `{p:N}` partition is derived from the
1440 /// execution id, so all KEYS share the same slot.
1441 pub async fn read_attempt_stream(
1442 &self,
1443 execution_id: &ExecutionId,
1444 attempt_index: AttemptIndex,
1445 from_id: &str,
1446 to_id: &str,
1447 count_limit: u64,
1448 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
1449 if count_limit == 0 {
1450 return Err(ServerError::InvalidInput(
1451 "count_limit must be >= 1".to_owned(),
1452 ));
1453 }
1454 // RFC-017 Stage B row 10: delegate through the trait. The
1455 // backend owns the stream-op semaphore + XRANGE dispatch; the
1456 // 429-on-contention semantics round-trip as
1457 // `EngineError::ResourceExhausted → ServerError::Engine →
1458 // HTTP 429` (see `ServerError::from` below).
1459 let from = wire_str_to_stream_cursor(from_id);
1460 let to = wire_str_to_stream_cursor(to_id);
1461 Ok(self
1462 .backend
1463 .read_stream(execution_id, attempt_index, from, to, count_limit)
1464 .await?)
1465 }
1466
1467 /// Tail a live attempt's stream (XREAD BLOCK wrapper). Returns frames
1468 /// plus the terminal signal so a polling consumer can exit when the
1469 /// producer closes the stream.
1470 ///
1471 /// `last_id` is exclusive — XREAD returns entries with id > last_id.
1472 /// Pass `"0-0"` to read from the beginning.
1473 ///
1474 /// `block_ms == 0` → non-blocking peek (returns immediately).
1475 /// `block_ms > 0` → blocks up to that many ms. Empty `frames` +
1476 /// `closed_at=None` → timeout, no new data, still open.
1477 ///
1478 /// `count_limit` MUST be `>= 1`; `0` returns `InvalidInput`.
1479 ///
1480 /// Implemented as a direct XREAD command (not FCALL) because blocking
1481 /// commands are rejected inside Valkey Functions. The terminal
1482 /// markers come from a companion HMGET on `stream_meta` — see
1483 /// `ff_script::stream_tail` module docs.
1484 pub async fn tail_attempt_stream(
1485 &self,
1486 execution_id: &ExecutionId,
1487 attempt_index: AttemptIndex,
1488 last_id: &str,
1489 block_ms: u64,
1490 count_limit: u64,
1491 ) -> Result<ff_core::contracts::StreamFrames, ServerError> {
1492 if count_limit == 0 {
1493 return Err(ServerError::InvalidInput(
1494 "count_limit must be >= 1".to_owned(),
1495 ));
1496 }
1497 // RFC-017 Stage B row 10: delegate through the trait. The
1498 // backend owns the stream-op semaphore + XREAD BLOCK dispatch
1499 // (via `duplicate_connection()` per #204, so neither a shared
1500 // tail client nor a serialising mutex is needed). Saturation
1501 // round-trips as `EngineError::ResourceExhausted → HTTP 429`;
1502 // a post-shutdown arrival round-trips as
1503 // `EngineError::Unavailable → HTTP 503`.
1504 let after = wire_str_to_stream_cursor(last_id);
1505 Ok(self
1506 .backend
1507 .tail_stream(
1508 execution_id,
1509 attempt_index,
1510 after,
1511 block_ms,
1512 count_limit,
1513 ff_core::backend::TailVisibility::All,
1514 )
1515 .await?)
1516 }
1517
1518 /// Graceful shutdown — stops scanners, drains background handler tasks
1519 /// (e.g. cancel_flow member dispatch) with a bounded timeout, then waits
1520 /// for scanners to finish.
1521 ///
1522 /// Shutdown order is chosen so in-flight stream ops (read/tail) drain
1523 /// cleanly without new arrivals piling up:
1524 ///
1525 /// 1. `stream_semaphore.close()` — new read/tail attempts fail fast
1526 /// with `ServerError::OperationFailed("stream semaphore closed …")`
1527 /// which the REST layer surfaces as a 500 with `retryable=false`
1528 /// (ops tooling may choose to wait + retry on 503-class responses;
1529 /// the body clearly names the shutdown reason).
1530 /// 2. Drain handler-spawned background tasks with a 15s ceiling.
1531 /// 3. `engine.shutdown()` stops scanners.
1532 ///
1533 /// Existing in-flight tails finish on their natural `block_ms`
1534 /// boundary (up to ~30s); the `tail_client` is dropped when `Server`
1535 /// is dropped after this function returns. We do NOT wait for tails
1536 /// to drain explicitly — the semaphore-close + natural-timeout
1537 /// combination bounds shutdown to roughly `block_ms + 15s` in the
1538 /// worst case. Callers observing a dropped connection retry against
1539 /// whatever replacement is coming up.
1540 pub async fn shutdown(self) {
1541 tracing::info!("shutting down FlowFabric server");
1542
1543 // Step 1: RFC-017 Stage B — delegate stream-op pool closure
1544 // + drain to the backend's `shutdown_prepare` hook. The
1545 // Valkey impl closes its semaphore (no new read/tail starts)
1546 // and awaits in-flight permits up to `grace`. A timeout here
1547 // is logged + counted on `ff_shutdown_timeout_total`; we
1548 // continue with best-effort drain of the server's own
1549 // background tasks rather than blocking shutdown behind a
1550 // single slow tail.
1551 let drain_timeout = Duration::from_secs(15);
1552 match self.backend.shutdown_prepare(drain_timeout).await {
1553 Ok(()) => tracing::info!(
1554 "backend shutdown_prepare complete (stream-op pool drained)"
1555 ),
1556 Err(ff_core::engine_error::EngineError::Timeout { elapsed, .. }) => {
1557 self.metrics.inc_shutdown_timeout();
1558 tracing::warn!(
1559 elapsed_ms = elapsed.as_millis() as u64,
1560 "shutdown_prepare exceeded grace; proceeding best-effort"
1561 );
1562 }
1563 Err(e) => {
1564 // Non-timeout errors don't block shutdown either, but
1565 // they're unexpected — log at warn so operators see
1566 // the signal without tripping an alert.
1567 tracing::warn!(
1568 err = %e,
1569 "shutdown_prepare returned error; proceeding best-effort"
1570 );
1571 }
1572 }
1573
1574 // Step 2: Drain handler-spawned background tasks with the same
1575 // ceiling as Engine::shutdown. If dispatch is still running at
1576 // the deadline, drop the JoinSet to abort remaining tasks.
1577 let background = self.background_tasks.clone();
1578 let drain = async move {
1579 let mut guard = background.lock().await;
1580 while guard.join_next().await.is_some() {}
1581 };
1582 match tokio::time::timeout(drain_timeout, drain).await {
1583 Ok(()) => {}
1584 Err(_) => {
1585 tracing::warn!(
1586 timeout_s = drain_timeout.as_secs(),
1587 "shutdown: background tasks did not finish in time, aborting"
1588 );
1589 self.background_tasks.lock().await.abort_all();
1590 }
1591 }
1592
1593 if let Some(engine) = self.engine {
1594 engine.shutdown().await;
1595 }
1596 tracing::info!("FlowFabric server shutdown complete");
1597 }
1598}
1599
1600/// RFC-017 Stage B: lift the wire string (`"-"`, `"+"`, or a concrete
1601/// entry id) used by the REST boundary into the typed
1602/// [`ff_core::contracts::StreamCursor`] the trait method expects.
1603/// Keeps the `read_attempt_stream` / `tail_attempt_stream`
1604/// public-function signatures byte-identical while dispatching
1605/// through the backend.
1606fn wire_str_to_stream_cursor(s: &str) -> ff_core::contracts::StreamCursor {
1607 match s {
1608 "-" => ff_core::contracts::StreamCursor::Start,
1609 "+" => ff_core::contracts::StreamCursor::End,
1610 other => ff_core::contracts::StreamCursor::At(other.to_owned()),
1611 }
1612}
1613
1614
1615/// Result of a waitpoint HMAC secret rotation across all execution partitions.
1616#[derive(Debug, Clone, serde::Serialize)]
1617pub struct RotateWaitpointSecretResult {
1618 /// Count of partitions that accepted the rotation.
1619 pub rotated: u16,
1620 /// Partition indices that failed — operator should investigate (Valkey
1621 /// outage, auth failure, cluster split). Rotation is idempotent, so a
1622 /// re-run after the underlying fault clears converges to the correct
1623 /// state.
1624 pub failed: Vec<u16>,
1625 /// New kid installed as current.
1626 pub new_kid: String,
1627}
1628
1629impl Server {
1630 /// Rotate the waitpoint HMAC secret. Promotes the current kid to previous
1631 /// (accepted within `FF_WAITPOINT_HMAC_GRACE_MS`), installs `new_secret_hex`
1632 /// as the new current kid. Idempotent: re-running with the same `new_kid`
1633 /// and `new_secret_hex` converges partitions to the same state.
1634 ///
1635 /// Returns a structured result so operators can see which partitions failed.
1636 /// HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
1637 pub async fn rotate_waitpoint_secret(
1638 &self,
1639 new_kid: &str,
1640 new_secret_hex: &str,
1641 ) -> Result<RotateWaitpointSecretResult, ServerError> {
1642 if new_kid.is_empty() || new_kid.contains(':') {
1643 return Err(ServerError::OperationFailed(
1644 "new_kid must be non-empty and must not contain ':'".into(),
1645 ));
1646 }
1647 if new_secret_hex.is_empty()
1648 || !new_secret_hex.len().is_multiple_of(2)
1649 || !new_secret_hex.chars().all(|c| c.is_ascii_hexdigit())
1650 {
1651 return Err(ServerError::OperationFailed(
1652 "new_secret_hex must be a non-empty even-length hex string".into(),
1653 ));
1654 }
1655
1656 // Single-writer gate — admin semaphore + audit log stay on
1657 // Server per RFC-017 §4 row 11. The per-partition fan-out
1658 // moved inside `ValkeyBackend::rotate_waitpoint_hmac_secret_all`.
1659 let _permit = match self.admin_rotate_semaphore.clone().try_acquire_owned() {
1660 Ok(p) => p,
1661 Err(tokio::sync::TryAcquireError::NoPermits) => {
1662 return Err(ServerError::ConcurrencyLimitExceeded("admin_rotate", 1));
1663 }
1664 Err(tokio::sync::TryAcquireError::Closed) => {
1665 return Err(ServerError::OperationFailed(
1666 "admin rotate semaphore closed (server shutting down)".into(),
1667 ));
1668 }
1669 };
1670
1671 let n = self.config.partition_config.num_flow_partitions;
1672 let grace_ms = self.config.waitpoint_hmac_grace_ms;
1673
1674 // RFC-017 Stage B row 11: delegate the per-partition fan-out
1675 // to the backend. The trait method returns one entry per
1676 // partition with an inner `Result` so partial success is
1677 // observable — matching the pre-migration Server body.
1678 let args = ff_core::contracts::RotateWaitpointHmacSecretAllArgs::new(
1679 new_kid.to_owned(),
1680 new_secret_hex.to_owned(),
1681 grace_ms,
1682 );
1683 let result = self
1684 .backend
1685 .rotate_waitpoint_hmac_secret_all(args)
1686 .await?;
1687
1688 let mut rotated = 0u16;
1689 let mut failed: Vec<u16> = Vec::new();
1690 for entry in &result.entries {
1691 match &entry.result {
1692 Ok(_) => {
1693 rotated += 1;
1694 tracing::debug!(
1695 partition = entry.partition,
1696 new_kid = %new_kid,
1697 "waitpoint_hmac_rotated"
1698 );
1699 }
1700 Err(e) => {
1701 tracing::error!(
1702 target: "audit",
1703 partition = entry.partition,
1704 err = %e,
1705 "waitpoint_hmac_rotation_failed"
1706 );
1707 failed.push(entry.partition);
1708 }
1709 }
1710 }
1711
1712 // Single aggregated audit event (RFC-017 row 11: audit emit
1713 // stays on Server).
1714 tracing::info!(
1715 target: "audit",
1716 new_kid = %new_kid,
1717 total_partitions = n,
1718 rotated,
1719 failed_count = failed.len(),
1720 "waitpoint_hmac_rotation_complete"
1721 );
1722
1723 Ok(RotateWaitpointSecretResult {
1724 rotated,
1725 failed,
1726 new_kid: new_kid.to_owned(),
1727 })
1728 }
1729}
1730
1731// ── FCALL result parsing ──
1732
1733
1734
1735
1736
1737// ── Flow FCALL result parsing ──
1738
1739
1740
1741
1742/// Extract a string from an FCALL result array at the given index.
1743/// Convert a `ScriptError` into a `ServerError` preserving `ferriskey::ErrorKind`
1744/// for transport-level variants. Business-logic variants keep their code as
1745/// `ServerError::Script(String)` so HTTP clients see a stable message.
1746///
1747/// Why this exists: before R2, the stream handlers did
1748/// `ScriptError → format!() → ServerError::Script(String)`, which erased
1749/// the ErrorKind and made `ServerError::is_retryable()` always return
1750/// false. Retry-capable clients (cairn-fabric) would not retry a legit
1751/// transient error like `IoError`.
1752#[allow(dead_code)] // retained for non-stream FCALL paths that still route via raw ScriptError; stream handlers moved to trait in RFC-017 Stage B
1753fn script_error_to_server(e: ff_script::error::ScriptError) -> ServerError {
1754 match e {
1755 ff_script::error::ScriptError::Valkey(valkey_err) => {
1756 crate::server::backend_context(valkey_err, "stream FCALL transport")
1757 }
1758 other => ServerError::Script(other.to_string()),
1759 }
1760}
1761
1762/// Acknowledge that a member cancel has committed. Delegates to
1763/// [`EngineBackend::ack_cancel_member`] (Valkey: `ff_ack_cancel_member`
1764/// FCALL on `{fp:N}` — SREM the execution from the flow's
1765/// `pending_cancels` set and, if empty, ZREM the flow from the
1766/// partition-level `cancel_backlog`). Best-effort — failures are
1767/// logged but not propagated, since the reconciler drains any
1768/// leftovers on its next pass.
1769async fn ack_cancel_member_via_backend(
1770 backend: &dyn EngineBackend,
1771 flow_id: &FlowId,
1772 execution_id: &ExecutionId,
1773 eid_str: &str,
1774) {
1775 if let Err(e) = backend.ack_cancel_member(flow_id, execution_id).await {
1776 tracing::warn!(
1777 flow_id = %flow_id,
1778 execution_id = %eid_str,
1779 error = %e,
1780 "ack_cancel_member failed; reconciler will drain on next pass"
1781 );
1782 }
1783}
1784
1785/// Engine-error variant: inspects an
1786/// `EngineError` returned from `self.backend.cancel_execution(...)`
1787/// and returns `true` when the member is already terminal. Matches the
1788/// Lua-code semantics of `execution_not_active` / `execution_not_found`
1789/// via the typed `State` / `Validation` classifications the backend
1790/// trait impl maps them to.
1791fn is_terminal_ack_engine_error(err: &EngineError) -> bool {
1792 match err {
1793 // Already terminal (Lua's `execution_not_active`) or missing
1794 // entirely (`execution_not_found`) — both treated as ack-worthy
1795 // so the cancel-backlog doesn't poison on a member already in
1796 // the intended terminal state.
1797 EngineError::State(kind) => matches!(
1798 kind,
1799 ff_core::engine_error::StateKind::Terminal
1800 ),
1801 EngineError::NotFound { .. } => true,
1802 EngineError::Contextual { source, .. } => is_terminal_ack_engine_error(source),
1803 _ => false,
1804 }
1805}
1806
1807
1808#[cfg(test)]
1809mod tests {
1810 use super::*;
1811 use ferriskey::ErrorKind;
1812
1813 fn mk_fk_err(kind: ErrorKind) -> ferriskey::Error {
1814 ferriskey::Error::from((kind, "synthetic"))
1815 }
1816
1817 #[test]
1818 fn is_retryable_backend_variant_uses_kind_table() {
1819 // Transport-bucketed: retryable.
1820 assert!(ServerError::from(mk_fk_err(ErrorKind::IoError)).is_retryable());
1821 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalSendError)).is_retryable());
1822 assert!(ServerError::from(mk_fk_err(ErrorKind::FatalReceiveError)).is_retryable());
1823 // Cluster-bucketed (Moved / Ask / TryAgain / ClusterDown): retryable
1824 // after topology settles — the #88 BackendErrorKind classifier
1825 // treats these as transient cluster-churn, a semantic refinement
1826 // over the previous ff-script retry table.
1827 assert!(ServerError::from(mk_fk_err(ErrorKind::TryAgain)).is_retryable());
1828 assert!(ServerError::from(mk_fk_err(ErrorKind::ClusterDown)).is_retryable());
1829 assert!(ServerError::from(mk_fk_err(ErrorKind::Moved)).is_retryable());
1830 assert!(ServerError::from(mk_fk_err(ErrorKind::Ask)).is_retryable());
1831 // BusyLoading: retryable.
1832 assert!(ServerError::from(mk_fk_err(ErrorKind::BusyLoadingError)).is_retryable());
1833
1834 // Auth / Protocol / ScriptNotLoaded: terminal.
1835 assert!(!ServerError::from(mk_fk_err(ErrorKind::AuthenticationFailed)).is_retryable());
1836 assert!(!ServerError::from(mk_fk_err(ErrorKind::NoScriptError)).is_retryable());
1837 assert!(!ServerError::from(mk_fk_err(ErrorKind::ReadOnly)).is_retryable());
1838 }
1839
1840 #[test]
1841 fn is_retryable_backend_context_uses_kind_table() {
1842 let err = crate::server::backend_context(mk_fk_err(ErrorKind::IoError), "HGET test");
1843 assert!(err.is_retryable());
1844
1845 let err =
1846 crate::server::backend_context(mk_fk_err(ErrorKind::AuthenticationFailed), "auth");
1847 assert!(!err.is_retryable());
1848 }
1849
1850 #[test]
1851 fn is_retryable_library_load_delegates_to_inner_kind() {
1852 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1853 mk_fk_err(ErrorKind::IoError),
1854 ));
1855 assert!(err.is_retryable());
1856
1857 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1858 mk_fk_err(ErrorKind::AuthenticationFailed),
1859 ));
1860 assert!(!err.is_retryable());
1861
1862 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
1863 expected: "1".into(),
1864 got: "2".into(),
1865 });
1866 assert!(!err.is_retryable());
1867 }
1868
1869 #[test]
1870 fn is_retryable_business_logic_variants_are_false() {
1871 assert!(!ServerError::NotFound("x".into()).is_retryable());
1872 assert!(!ServerError::InvalidInput("x".into()).is_retryable());
1873 assert!(!ServerError::OperationFailed("x".into()).is_retryable());
1874 assert!(!ServerError::Script("x".into()).is_retryable());
1875 assert!(!ServerError::PartitionMismatch("x".into()).is_retryable());
1876 }
1877
1878 #[test]
1879 fn backend_kind_delegates_through_library_load() {
1880 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::Valkey(
1881 mk_fk_err(ErrorKind::ClusterDown),
1882 ));
1883 assert_eq!(err.backend_kind(), Some(ff_core::BackendErrorKind::Cluster));
1884
1885 let err = ServerError::LibraryLoad(ff_script::loader::LoadError::VersionMismatch {
1886 expected: "1".into(),
1887 got: "2".into(),
1888 });
1889 assert_eq!(err.backend_kind(), None);
1890 }
1891
1892
1893 #[test]
1894 fn valkey_version_too_low_is_not_retryable() {
1895 let err = ServerError::ValkeyVersionTooLow {
1896 detected: "7.0".into(),
1897 required: "7.2".into(),
1898 };
1899 assert!(!err.is_retryable());
1900 assert_eq!(err.backend_kind(), None);
1901 }
1902
1903 #[test]
1904 fn valkey_version_too_low_error_message_includes_both_versions() {
1905 let err = ServerError::ValkeyVersionTooLow {
1906 detected: "7.0".into(),
1907 required: "7.2".into(),
1908 };
1909 let msg = err.to_string();
1910 assert!(msg.contains("7.0"), "detected version in message: {msg}");
1911 assert!(msg.contains("7.2"), "required version in message: {msg}");
1912 assert!(msg.contains("RFC-011"), "RFC pointer in message: {msg}");
1913 }
1914}