pub struct Server { /* private fields */ }Expand description
FlowFabric server — connects everything together.
Manages the Valkey connection, Lua library loading, background scanners, and provides a minimal API for Phase 1.
Implementations§
Source§impl Server
impl Server
Sourcepub async fn start(config: ServerConfig) -> Result<Self, ServerError>
pub async fn start(config: ServerConfig) -> Result<Self, ServerError>
Start the FlowFabric server.
Boot sequence:
- Connect to Valkey
- Validate or create partition config key
- Load the FlowFabric Lua library
- Start engine (14 background scanners)
Sourcepub async fn start_with_metrics(
config: ServerConfig,
metrics: Arc<Metrics>,
) -> Result<Self, ServerError>
pub async fn start_with_metrics( config: ServerConfig, metrics: Arc<Metrics>, ) -> Result<Self, ServerError>
PR-94: boot the server with a shared observability registry.
Scanner cycle + scheduler metrics record into this registry;
main.rs threads the same handle into the router so /metrics
exposes what the engine produces. The no-arg Server::start
forwards here with a fresh Metrics::new() — under the default
build that’s the shim, under observability it’s a real
registry not shared with any HTTP route (useful for tests
exercising the engine in isolation).
RFC-017 Stage A: this path gates config.backend against
[BACKEND_STAGE_READY] (refuses BackendKind::Postgres at
boot per §9.0), dials the Valkey cluster through the legacy
path, then synthesises an Arc<ValkeyBackend> around the
dialed client and populates Server.backend. The dual-field
posture is explicit through Stage D; Stage E retires the
legacy Client fields. See Server::start_with_backend for
the test-injection entry point that takes a caller-supplied
backend.
Sourcepub async fn start_with_backend(
config: ServerConfig,
backend: Arc<dyn EngineBackend>,
metrics: Arc<Metrics>,
) -> Result<Self, ServerError>
pub async fn start_with_backend( config: ServerConfig, backend: Arc<dyn EngineBackend>, metrics: Arc<Metrics>, ) -> Result<Self, ServerError>
RFC-017 Stage A: test-injection + future-embedded-user entry
point. Takes a caller-constructed Arc<dyn EngineBackend> +
the Valkey connection/engine scaffolding
Server::start_with_metrics normally dials for itself.
Stage A scope: Stage A is still dual-field — the legacy
client / tail_client / engine / scheduler fields are
constructed here exactly as in the main boot path, because
unmigrated handlers still need them. The caller-supplied
backend populates the new trait-object field and services
the handlers migrated in this stage (see RFC-017 §4
migration table).
Stage D evolution: once the boot path relocates into each
backend’s connect_with_metrics (RFC-017 §9 Stage D), this
entry point becomes the sole constructor — Server::start and
Server::start_with_metrics are thin shims that build the
backend first, then forward here.
Today (Stage A) this path is exercised by MockBackend in
tests/parity_stage_a.rs; it does NOT replace the Valkey
dial under the main binary.
Sourcepub fn backend(&self) -> &Arc<dyn EngineBackend>
pub fn backend(&self) -> &Arc<dyn EngineBackend>
RFC-017 Stage A: access the backend trait-object driving
migrated handlers. Stable surface for tests that need to
inspect the backend directly (e.g. backend_label()
assertions). The Server will dispatch more handlers through
this handle as Stages B-D land.
Sourcepub fn config(&self) -> &ServerConfig
pub fn config(&self) -> &ServerConfig
Get the server config.
Sourcepub fn partition_config(&self) -> &PartitionConfig
pub fn partition_config(&self) -> &PartitionConfig
Get the partition config.
Sourcepub async fn create_execution(
&self,
args: &CreateExecutionArgs,
) -> Result<CreateExecutionResult, ServerError>
pub async fn create_execution( &self, args: &CreateExecutionArgs, ) -> Result<CreateExecutionResult, ServerError>
Create a new execution. RFC-017 Stage D2: delegates through the
backend trait. The KEYS/ARGV build + FCALL dispatch + result parse
live verbatim in ValkeyBackend::create_execution.
Sourcepub async fn cancel_execution(
&self,
args: &CancelExecutionArgs,
) -> Result<CancelExecutionResult, ServerError>
pub async fn cancel_execution( &self, args: &CancelExecutionArgs, ) -> Result<CancelExecutionResult, ServerError>
Cancel an execution. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn get_execution_state(
&self,
execution_id: &ExecutionId,
) -> Result<PublicState, ServerError>
pub async fn get_execution_state( &self, execution_id: &ExecutionId, ) -> Result<PublicState, ServerError>
Get the public state of an execution.
Reads public_state from the exec_core hash. Returns the parsed
PublicState enum. If the execution is not found, returns an error.
Sourcepub async fn get_execution_result(
&self,
execution_id: &ExecutionId,
) -> Result<Option<Vec<u8>>, ServerError>
pub async fn get_execution_result( &self, execution_id: &ExecutionId, ) -> Result<Option<Vec<u8>>, ServerError>
Read the raw result payload written by ff_complete_execution.
The Lua side stores the payload at ctx.result() via plain SET.
No FCALL — this is a direct GET; returns Ok(None) when the
execution is missing, not yet complete, or (in a future
retention-policy world) when the result was trimmed.
§Contract vs get_execution_state
get_execution_state is the authoritative completion signal. If
a caller observes state == completed but get_execution_result
returns None, the result bytes are unavailable — not a caller
bug and not a server bug, just the retention policy trimming the
blob. V1 sets no retention, so callers on v1 can treat
state == completed + Ok(None) as a server bug.
§Ordering
Callers MUST wait for state == completed before calling this
method; polls issued before the state transition may hit a
narrow window where the completion Lua has written
public_state = completed but the result key SET is still
on-wire. The current Lua ff_complete_execution writes both in
the same atomic script, so the window is effectively zero for
direct callers — but retries via ff_replay_execution open it
briefly.
Sourcepub async fn create_budget(
&self,
args: &CreateBudgetArgs,
) -> Result<CreateBudgetResult, ServerError>
pub async fn create_budget( &self, args: &CreateBudgetArgs, ) -> Result<CreateBudgetResult, ServerError>
Create a new budget policy. Create a new budget policy. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn create_quota_policy(
&self,
args: &CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, ServerError>
pub async fn create_quota_policy( &self, args: &CreateQuotaPolicyArgs, ) -> Result<CreateQuotaPolicyResult, ServerError>
Create a new quota/rate-limit policy. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn get_budget_status(
&self,
budget_id: &BudgetId,
) -> Result<BudgetStatus, ServerError>
pub async fn get_budget_status( &self, budget_id: &BudgetId, ) -> Result<BudgetStatus, ServerError>
Read-only budget status for operator visibility. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn report_usage(
&self,
budget_id: &BudgetId,
args: &ReportUsageArgs,
) -> Result<ReportUsageResult, ServerError>
pub async fn report_usage( &self, budget_id: &BudgetId, args: &ReportUsageArgs, ) -> Result<ReportUsageResult, ServerError>
Report usage against a budget and check limits. RFC-017 Stage D2:
delegates through the backend trait’s admin variant
(report_usage_admin — no worker handle required on the admin
path).
Sourcepub async fn reset_budget(
&self,
budget_id: &BudgetId,
) -> Result<ResetBudgetResult, ServerError>
pub async fn reset_budget( &self, budget_id: &BudgetId, ) -> Result<ResetBudgetResult, ServerError>
Reset a budget’s usage counters and schedule the next reset. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn create_flow(
&self,
args: &CreateFlowArgs,
) -> Result<CreateFlowResult, ServerError>
pub async fn create_flow( &self, args: &CreateFlowArgs, ) -> Result<CreateFlowResult, ServerError>
Create a new flow container. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn add_execution_to_flow(
&self,
args: &AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, ServerError>
pub async fn add_execution_to_flow( &self, args: &AddExecutionToFlowArgs, ) -> Result<AddExecutionToFlowResult, ServerError>
Add an execution to a flow.
§Atomic single-FCALL commit (RFC-011 §7.3)
Post-RFC-011 phase-3, exec_core co-locates with flow_core under
hash-tag routing (both hash to {fp:N} via the exec id’s
embedded partition). A single atomic FCALL writes:
members_setSADD (flow membership)exec_core.flow_idHSET (back-pointer)flow_indexSADD (self-heal)flow_coreHINCRBY node_count / graph_revision + HSET last_mutation_at
All four writes commit atomically or none do (Valkey scripting
contract: validates-before-writing in the Lua body means
flow_not_found / flow_already_terminal early-returns fire
BEFORE any redis.call() mutation, and a mid-body error after
writes is not expected because all writes are on the same slot).
The pre-RFC-011 two-phase contract (membership FCALL on {fp:N} plus separate HSET on {p:N}), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
§Consumer contract
The caller’s args.execution_id must be co-located with
args.flow_id’s partition — i.e. minted via
ExecutionId::for_flow(&args.flow_id, config). Passing a
solo-minted id (or any exec id hashing to a different
{fp:N} than the flow’s) will fail at the Valkey level with
CROSSSLOT on a clustered deploy.
Callers with a flow context in scope always use for_flow;
this is the only supported mint path for flow-member execs
post-RFC-011. Test fixtures that pre-date the co-location
contract use TestCluster::new_execution_id_on_partition to
pin to a specific hash-tag index for fcall_create_flow-style
helpers that hard-code their flow partition.
Sourcepub async fn cancel_flow(
&self,
args: &CancelFlowArgs,
) -> Result<CancelFlowResult, ServerError>
pub async fn cancel_flow( &self, args: &CancelFlowArgs, ) -> Result<CancelFlowResult, ServerError>
Cancel a flow.
Flips public_flow_state to cancelled atomically via
ff_cancel_flow on {fp:N}. For cancel_all policy, member
executions must be cancelled cross-partition; this dispatch runs in
the background and the call returns CancelFlowResult::CancellationScheduled
immediately. For all other policies (or flows with no members), or
when the flow was already in a terminal state (idempotent retry),
the call returns CancelFlowResult::Cancelled.
Clients that need synchronous completion can call Self::cancel_flow_wait.
§Backpressure
Each call that hits the async dispatch path spawns a new task into
the shared background JoinSet. Rapid repeated calls against the
same flow will spawn multiple overlapping dispatch tasks. This is
not a correctness issue — each member cancel is idempotent and
terminal flows short-circuit via ff_core::contracts::CancelFlowHeader::AlreadyTerminal
— but heavy burst callers should either use ?wait=true (serialises
the dispatch on the HTTP thread, giving natural backpressure) or
implement client-side deduplication on flow_id. The JoinSet is
drained with a 15s timeout on Self::shutdown, so very long
dispatch tails may be aborted during graceful shutdown.
§Orphan-member semantics on shutdown abort
If shutdown fires JoinSet::abort_all() after its drain timeout
while a dispatch loop is mid-iteration, the already-issued
ff_cancel_execution FCALLs (atomic Lua) complete cleanly with
terminal_outcome = cancelled and the caller-supplied reason. The
members not yet visited are abandoned mid-loop. They remain in
whichever state they were in (active/eligible/suspended) until the
natural lifecycle scanners reach them: active leases expire
(lease_expiry) and attempt-timeout them to expired, suspended
members time out to skipped, eligible ones sit until retention
trim. So no orphan state — but the terminal_outcome for the
abandoned members will be expired/skipped rather than
cancelled, and the operator-supplied reason is lost for them.
Audit tooling that requires reason fidelity across shutdowns should
use ?wait=true.
Sourcepub async fn cancel_flow_wait(
&self,
args: &CancelFlowArgs,
) -> Result<CancelFlowResult, ServerError>
pub async fn cancel_flow_wait( &self, args: &CancelFlowArgs, ) -> Result<CancelFlowResult, ServerError>
Cancel a flow and wait for all member cancellations to complete
inline. Slower than Self::cancel_flow for large flows, but
guarantees every member is in a terminal state on return.
Sourcepub async fn stage_dependency_edge(
&self,
args: &StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, ServerError>
pub async fn stage_dependency_edge( &self, args: &StageDependencyEdgeArgs, ) -> Result<StageDependencyEdgeResult, ServerError>
Stage a dependency edge between two executions in a flow.
Runs on the flow partition {fp:N}. KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
Sourcepub async fn apply_dependency_to_child(
&self,
args: &ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, ServerError>
pub async fn apply_dependency_to_child( &self, args: &ApplyDependencyToChildArgs, ) -> Result<ApplyDependencyToChildResult, ServerError>
Apply a staged dependency edge to the child execution. RFC-017 Stage D2: delegates through the backend trait.
Sourcepub async fn deliver_signal(
&self,
args: &DeliverSignalArgs,
) -> Result<DeliverSignalResult, ServerError>
pub async fn deliver_signal( &self, args: &DeliverSignalArgs, ) -> Result<DeliverSignalResult, ServerError>
Deliver a signal to a suspended (or pending-waitpoint) execution.
Pre-reads exec_core for waitpoint/suspension fields needed for KEYS. KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
Sourcepub async fn change_priority(
&self,
execution_id: &ExecutionId,
new_priority: i32,
) -> Result<ChangePriorityResult, ServerError>
pub async fn change_priority( &self, execution_id: &ExecutionId, new_priority: i32, ) -> Result<ChangePriorityResult, ServerError>
Change an execution’s priority. RFC-017 Stage D2: delegates
through the backend trait. Empty lane_id triggers the backend-
internal HGET pre-read (matches legacy inherent behaviour).
Sourcepub async fn claim_for_worker(
&self,
lane: &LaneId,
worker_id: &WorkerId,
worker_instance_id: &WorkerInstanceId,
worker_capabilities: &BTreeSet<String>,
grant_ttl_ms: u64,
) -> Result<Option<ClaimGrant>, ServerError>
pub async fn claim_for_worker( &self, lane: &LaneId, worker_id: &WorkerId, worker_instance_id: &WorkerInstanceId, worker_capabilities: &BTreeSet<String>, grant_ttl_ms: u64, ) -> Result<Option<ClaimGrant>, ServerError>
Scheduler-routed claim entry point.
RFC-017 Wave 8 Stage E3 (§7): dispatches through the backend
trait. The Valkey backend forwards to its wired
ff_scheduler::Scheduler; the Postgres backend forwards to
ff_backend_postgres::scheduler::PostgresScheduler’s
FOR UPDATE SKIP LOCKED admission pipeline. Returns
Ok(None) when no eligible execution exists on the lane at
this scan cycle — the enum-typed trait outcome
(ClaimForWorkerOutcome::NoWork) is collapsed to Option::None
for the inherent-call contract pre-existing Stage E.
Error mapping: scheduler-class errors arrive as
EngineError via the trait boundary and thread through
ServerError::Engine’s HTTP arm
(budget / capability / unavailable classes land on the
documented 400/409/503 response codes — see api::ApiError::into_response).
Sourcepub async fn revoke_lease(
&self,
execution_id: &ExecutionId,
) -> Result<RevokeLeaseResult, ServerError>
pub async fn revoke_lease( &self, execution_id: &ExecutionId, ) -> Result<RevokeLeaseResult, ServerError>
Revoke an active lease (operator-initiated). RFC-017 Stage D2:
delegates through the backend trait. The backend’s trait impl
returns RevokeLeaseResult::AlreadySatisfied when no active
lease is present; the Server facade preserves its pre-migration
ServerError::NotFound behaviour by re-mapping that variant.
Sourcepub async fn get_execution(
&self,
execution_id: &ExecutionId,
) -> Result<ExecutionInfo, ServerError>
pub async fn get_execution( &self, execution_id: &ExecutionId, ) -> Result<ExecutionInfo, ServerError>
Get full execution info (HGETALL-shape on Valkey; SELECT-shape on
Postgres once Wave 9 wires it). RFC-017 Stage E2: routed through
the backend trait’s ff_core::engine_backend::EngineBackend::read_execution_info.
Sourcepub async fn list_executions_page(
&self,
partition_id: u16,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, ServerError>
pub async fn list_executions_page( &self, partition_id: u16, cursor: Option<ExecutionId>, limit: usize, ) -> Result<ListExecutionsPage, ServerError>
Partition-scoped forward-only cursor listing of executions.
Parity-wrapper around the Valkey body of
ff_core::engine_backend::EngineBackend::list_executions.
Issue #182 replaced the previous offset + lane + state-filter
shape with this cursor-based API (per owner adjudication:
cursor-everywhere, HTTP surface unreleased). Reads
ff:idx:{p:N}:all_executions, sorts lexicographically on
ExecutionId, filters > cursor, and trims to limit.
Sourcepub async fn replay_execution(
&self,
execution_id: &ExecutionId,
) -> Result<ReplayExecutionResult, ServerError>
pub async fn replay_execution( &self, execution_id: &ExecutionId, ) -> Result<ReplayExecutionResult, ServerError>
Replay a terminal execution. RFC-017 Stage D2: delegates through
the backend trait; the variadic-KEYS pre-read (HMGET + SMEMBERS
for inbound edges on skipped flow members) now lives inside
ValkeyBackend::replay_execution.
Sourcepub async fn read_attempt_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from_id: &str,
to_id: &str,
count_limit: u64,
) -> Result<StreamFrames, ServerError>
pub async fn read_attempt_stream( &self, execution_id: &ExecutionId, attempt_index: AttemptIndex, from_id: &str, to_id: &str, count_limit: u64, ) -> Result<StreamFrames, ServerError>
Read frames from an attempt’s stream (XRANGE wrapper) plus terminal
markers (closed_at, closed_reason) so consumers can stop polling
when the producer finalizes.
from_id and to_id accept XRANGE special markers: "-" for
earliest, "+" for latest. count_limit MUST be >= 1 —
0 returns a ServerError::InvalidInput (matches the REST boundary
and the Lua-side reject).
Cluster-safe: the attempt’s {p:N} partition is derived from the
execution id, so all KEYS share the same slot.
Sourcepub async fn tail_attempt_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
last_id: &str,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, ServerError>
pub async fn tail_attempt_stream( &self, execution_id: &ExecutionId, attempt_index: AttemptIndex, last_id: &str, block_ms: u64, count_limit: u64, ) -> Result<StreamFrames, ServerError>
Tail a live attempt’s stream (XREAD BLOCK wrapper). Returns frames plus the terminal signal so a polling consumer can exit when the producer closes the stream.
last_id is exclusive — XREAD returns entries with id > last_id.
Pass "0-0" to read from the beginning.
block_ms == 0 → non-blocking peek (returns immediately).
block_ms > 0 → blocks up to that many ms. Empty frames +
closed_at=None → timeout, no new data, still open.
count_limit MUST be >= 1; 0 returns InvalidInput.
Implemented as a direct XREAD command (not FCALL) because blocking
commands are rejected inside Valkey Functions. The terminal
markers come from a companion HMGET on stream_meta — see
ff_script::stream_tail module docs.
Sourcepub async fn shutdown(self)
pub async fn shutdown(self)
Graceful shutdown — stops scanners, drains background handler tasks (e.g. cancel_flow member dispatch) with a bounded timeout, then waits for scanners to finish.
Shutdown order is chosen so in-flight stream ops (read/tail) drain cleanly without new arrivals piling up:
stream_semaphore.close()— new read/tail attempts fail fast withServerError::OperationFailed("stream semaphore closed …")which the REST layer surfaces as a 500 withretryable=false(ops tooling may choose to wait + retry on 503-class responses; the body clearly names the shutdown reason).- Drain handler-spawned background tasks with a 15s ceiling.
engine.shutdown()stops scanners.
Existing in-flight tails finish on their natural block_ms
boundary (up to ~30s); the tail_client is dropped when Server
is dropped after this function returns. We do NOT wait for tails
to drain explicitly — the semaphore-close + natural-timeout
combination bounds shutdown to roughly block_ms + 15s in the
worst case. Callers observing a dropped connection retry against
whatever replacement is coming up.
Source§impl Server
impl Server
Sourcepub async fn rotate_waitpoint_secret(
&self,
new_kid: &str,
new_secret_hex: &str,
) -> Result<RotateWaitpointSecretResult, ServerError>
pub async fn rotate_waitpoint_secret( &self, new_kid: &str, new_secret_hex: &str, ) -> Result<RotateWaitpointSecretResult, ServerError>
Rotate the waitpoint HMAC secret. Promotes the current kid to previous
(accepted within FF_WAITPOINT_HMAC_GRACE_MS), installs new_secret_hex
as the new current kid. Idempotent: re-running with the same new_kid
and new_secret_hex converges partitions to the same state.
Returns a structured result so operators can see which partitions failed. HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
Auto Trait Implementations§
impl Freeze for Server
impl !RefUnwindSafe for Server
impl Send for Server
impl Sync for Server
impl Unpin for Server
impl UnsafeUnpin for Server
impl !UnwindSafe for Server
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more