pub struct Server { /* private fields */ }Expand description
FlowFabric server — connects everything together.
Manages the Valkey connection, Lua library loading, background scanners, and provides a minimal API for Phase 1.
Implementations§
Source§impl Server
impl Server
Sourcepub async fn start(config: ServerConfig) -> Result<Self, ServerError>
pub async fn start(config: ServerConfig) -> Result<Self, ServerError>
Start the FlowFabric server.
Boot sequence:
- Connect to Valkey
- Validate or create partition config key
- Load the FlowFabric Lua library
- Start engine (14 background scanners)
Sourcepub async fn start_with_metrics(
config: ServerConfig,
metrics: Arc<Metrics>,
) -> Result<Self, ServerError>
pub async fn start_with_metrics( config: ServerConfig, metrics: Arc<Metrics>, ) -> Result<Self, ServerError>
PR-94: boot the server with a shared observability registry.
Scanner cycle + scheduler metrics record into this registry;
main.rs threads the same handle into the router so /metrics
exposes what the engine produces. The no-arg Server::start
forwards here with a fresh Metrics::new() — under the default
build that’s the shim, under observability it’s a real
registry not shared with any HTTP route (useful for tests
exercising the engine in isolation).
Sourcepub fn config(&self) -> &ServerConfig
pub fn config(&self) -> &ServerConfig
Get the server config.
Sourcepub fn partition_config(&self) -> &PartitionConfig
pub fn partition_config(&self) -> &PartitionConfig
Get the partition config.
Sourcepub async fn create_execution(
&self,
args: &CreateExecutionArgs,
) -> Result<CreateExecutionResult, ServerError>
pub async fn create_execution( &self, args: &CreateExecutionArgs, ) -> Result<CreateExecutionResult, ServerError>
Create a new execution.
Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.
Sourcepub async fn cancel_execution(
&self,
args: &CancelExecutionArgs,
) -> Result<CancelExecutionResult, ServerError>
pub async fn cancel_execution( &self, args: &CancelExecutionArgs, ) -> Result<CancelExecutionResult, ServerError>
Cancel an execution.
Sourcepub async fn get_execution_state(
&self,
execution_id: &ExecutionId,
) -> Result<PublicState, ServerError>
pub async fn get_execution_state( &self, execution_id: &ExecutionId, ) -> Result<PublicState, ServerError>
Get the public state of an execution.
Reads public_state from the exec_core hash. Returns the parsed
PublicState enum. If the execution is not found, returns an error.
Sourcepub async fn get_execution_result(
&self,
execution_id: &ExecutionId,
) -> Result<Option<Vec<u8>>, ServerError>
pub async fn get_execution_result( &self, execution_id: &ExecutionId, ) -> Result<Option<Vec<u8>>, ServerError>
Read the raw result payload written by ff_complete_execution.
The Lua side stores the payload at ctx.result() via plain SET.
No FCALL — this is a direct GET; returns Ok(None) when the
execution is missing, not yet complete, or (in a future
retention-policy world) when the result was trimmed.
§Contract vs get_execution_state
get_execution_state is the authoritative completion signal. If
a caller observes state == completed but get_execution_result
returns None, the result bytes are unavailable — not a caller
bug and not a server bug, just the retention policy trimming the
blob. V1 sets no retention, so callers on v1 can treat
state == completed + Ok(None) as a server bug.
§Ordering
Callers MUST wait for state == completed before calling this
method; polls issued before the state transition may hit a
narrow window where the completion Lua has written
public_state = completed but the result key SET is still
on-wire. The current Lua ff_complete_execution writes both in
the same atomic script, so the window is effectively zero for
direct callers — but retries via ff_replay_execution open it
briefly.
Sourcepub async fn list_pending_waitpoints(
&self,
execution_id: &ExecutionId,
) -> Result<Vec<PendingWaitpointInfo>, ServerError>
pub async fn list_pending_waitpoints( &self, execution_id: &ExecutionId, ) -> Result<Vec<PendingWaitpointInfo>, ServerError>
List the active (pending or active) waitpoints for an execution.
Returns one PendingWaitpointInfo per open waitpoint, including the
HMAC-SHA1 waitpoint_token needed to deliver authenticated signals.
closed waitpoints are elided — callers looking at history should
read the stream or lease history instead.
Read plan: SSCAN ctx.waitpoints() with COUNT 100 (bounded
page size, matching the unblock / flow-projector / budget-
reconciler convention) to enumerate waitpoint IDs, then TWO
pipelines:
- Pass 1 — single round-trip containing, per waitpoint, one
HMGET over the documented field set + one HGET for
total_matcherson the condition hash. - Pass 2 (conditional) — single round-trip containing an
HMGET per waitpoint with
total_matchers > 0to read thematcher:N:namefields.
No FCALL — this is a read-only view built from already- persisted state, so skipping Lua keeps the Valkey single- writer path uncontended. HMGET (vs HGETALL) bounds the per- waitpoint read to the documented field set and defends against a poisoned waitpoint hash with unbounded extra fields accumulating response memory.
§Empty result semantics (TOCTOU)
An empty Vec is returned in three cases:
- The execution exists but has never suspended.
- All existing waitpoints are
closed(already resolved). - A narrow teardown race:
SSCANread the waitpoint set after a concurrentff_close_waitpointor execution-cleanup script deleted the waitpoint hashes but before it SREM’d the set members. Each HMGET returns all-None and we skip.
Callers that get an unexpected empty list should cross-check
execution state (get_execution_state) to distinguish “pipeline
moved past suspended” from “nothing to review yet”.
A waitpoint hash that’s present but missing its waitpoint_token
field is similarly elided and a server-side WARN is emitted —
this indicates storage corruption (a write that half-populated
the hash) and operators should investigate.
Sourcepub async fn create_budget(
&self,
args: &CreateBudgetArgs,
) -> Result<CreateBudgetResult, ServerError>
pub async fn create_budget( &self, args: &CreateBudgetArgs, ) -> Result<CreateBudgetResult, ServerError>
Create a new budget policy.
Sourcepub async fn create_quota_policy(
&self,
args: &CreateQuotaPolicyArgs,
) -> Result<CreateQuotaPolicyResult, ServerError>
pub async fn create_quota_policy( &self, args: &CreateQuotaPolicyArgs, ) -> Result<CreateQuotaPolicyResult, ServerError>
Create a new quota/rate-limit policy.
Sourcepub async fn get_budget_status(
&self,
budget_id: &BudgetId,
) -> Result<BudgetStatus, ServerError>
pub async fn get_budget_status( &self, budget_id: &BudgetId, ) -> Result<BudgetStatus, ServerError>
Read-only budget status for operator visibility.
Sourcepub async fn report_usage(
&self,
budget_id: &BudgetId,
args: &ReportUsageArgs,
) -> Result<ReportUsageResult, ServerError>
pub async fn report_usage( &self, budget_id: &BudgetId, args: &ReportUsageArgs, ) -> Result<ReportUsageResult, ServerError>
Report usage against a budget and check limits.
Sourcepub async fn reset_budget(
&self,
budget_id: &BudgetId,
) -> Result<ResetBudgetResult, ServerError>
pub async fn reset_budget( &self, budget_id: &BudgetId, ) -> Result<ResetBudgetResult, ServerError>
Reset a budget’s usage counters and schedule the next reset.
Sourcepub async fn create_flow(
&self,
args: &CreateFlowArgs,
) -> Result<CreateFlowResult, ServerError>
pub async fn create_flow( &self, args: &CreateFlowArgs, ) -> Result<CreateFlowResult, ServerError>
Create a new flow container.
Sourcepub async fn add_execution_to_flow(
&self,
args: &AddExecutionToFlowArgs,
) -> Result<AddExecutionToFlowResult, ServerError>
pub async fn add_execution_to_flow( &self, args: &AddExecutionToFlowArgs, ) -> Result<AddExecutionToFlowResult, ServerError>
Add an execution to a flow.
§Atomic single-FCALL commit (RFC-011 §7.3)
Post-RFC-011 phase-3, exec_core co-locates with flow_core under
hash-tag routing (both hash to {fp:N} via the exec id’s
embedded partition). A single atomic FCALL writes:
members_setSADD (flow membership)exec_core.flow_idHSET (back-pointer)flow_indexSADD (self-heal)flow_coreHINCRBY node_count / graph_revision + HSET last_mutation_at
All four writes commit atomically or none do (Valkey scripting
contract: validates-before-writing in the Lua body means
flow_not_found / flow_already_terminal early-returns fire
BEFORE any redis.call() mutation, and a mid-body error after
writes is not expected because all writes are on the same slot).
The pre-RFC-011 two-phase contract (membership FCALL on {fp:N} plus separate HSET on {p:N}), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.
§Consumer contract
The caller’s args.execution_id must be co-located with
args.flow_id’s partition — i.e. minted via
ExecutionId::for_flow(&args.flow_id, config). Passing a
solo-minted id (or any exec id hashing to a different
{fp:N} than the flow’s) will fail at the Valkey level with
CROSSSLOT on a clustered deploy.
Callers with a flow context in scope always use for_flow;
this is the only supported mint path for flow-member execs
post-RFC-011. Test fixtures that pre-date the co-location
contract use TestCluster::new_execution_id_on_partition to
pin to a specific hash-tag index for fcall_create_flow-style
helpers that hard-code their flow partition.
Sourcepub async fn cancel_flow(
&self,
args: &CancelFlowArgs,
) -> Result<CancelFlowResult, ServerError>
pub async fn cancel_flow( &self, args: &CancelFlowArgs, ) -> Result<CancelFlowResult, ServerError>
Cancel a flow.
Flips public_flow_state to cancelled atomically via
ff_cancel_flow on {fp:N}. For cancel_all policy, member
executions must be cancelled cross-partition; this dispatch runs in
the background and the call returns CancelFlowResult::CancellationScheduled
immediately. For all other policies (or flows with no members), or
when the flow was already in a terminal state (idempotent retry),
the call returns CancelFlowResult::Cancelled.
Clients that need synchronous completion can call Self::cancel_flow_wait.
§Backpressure
Each call that hits the async dispatch path spawns a new task into
the shared background JoinSet. Rapid repeated calls against the
same flow will spawn multiple overlapping dispatch tasks. This is
not a correctness issue — each member cancel is idempotent and
terminal flows short-circuit via [ParsedCancelFlow::AlreadyTerminal]
— but heavy burst callers should either use ?wait=true (serialises
the dispatch on the HTTP thread, giving natural backpressure) or
implement client-side deduplication on flow_id. The JoinSet is
drained with a 15s timeout on Self::shutdown, so very long
dispatch tails may be aborted during graceful shutdown.
§Orphan-member semantics on shutdown abort
If shutdown fires JoinSet::abort_all() after its drain timeout
while a dispatch loop is mid-iteration, the already-issued
ff_cancel_execution FCALLs (atomic Lua) complete cleanly with
terminal_outcome = cancelled and the caller-supplied reason. The
members not yet visited are abandoned mid-loop. They remain in
whichever state they were in (active/eligible/suspended) until the
natural lifecycle scanners reach them: active leases expire
(lease_expiry) and attempt-timeout them to expired, suspended
members time out to skipped, eligible ones sit until retention
trim. So no orphan state — but the terminal_outcome for the
abandoned members will be expired/skipped rather than
cancelled, and the operator-supplied reason is lost for them.
Audit tooling that requires reason fidelity across shutdowns should
use ?wait=true.
Sourcepub async fn cancel_flow_wait(
&self,
args: &CancelFlowArgs,
) -> Result<CancelFlowResult, ServerError>
pub async fn cancel_flow_wait( &self, args: &CancelFlowArgs, ) -> Result<CancelFlowResult, ServerError>
Cancel a flow and wait for all member cancellations to complete
inline. Slower than Self::cancel_flow for large flows, but
guarantees every member is in a terminal state on return.
Sourcepub async fn stage_dependency_edge(
&self,
args: &StageDependencyEdgeArgs,
) -> Result<StageDependencyEdgeResult, ServerError>
pub async fn stage_dependency_edge( &self, args: &StageDependencyEdgeArgs, ) -> Result<StageDependencyEdgeResult, ServerError>
Stage a dependency edge between two executions in a flow.
Runs on the flow partition {fp:N}. KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.
Sourcepub async fn apply_dependency_to_child(
&self,
args: &ApplyDependencyToChildArgs,
) -> Result<ApplyDependencyToChildResult, ServerError>
pub async fn apply_dependency_to_child( &self, args: &ApplyDependencyToChildArgs, ) -> Result<ApplyDependencyToChildResult, ServerError>
Apply a staged dependency edge to the child execution.
Runs on the child execution partition {p:N}. KEYS (8), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.
Sourcepub async fn deliver_signal(
&self,
args: &DeliverSignalArgs,
) -> Result<DeliverSignalResult, ServerError>
pub async fn deliver_signal( &self, args: &DeliverSignalArgs, ) -> Result<DeliverSignalResult, ServerError>
Deliver a signal to a suspended (or pending-waitpoint) execution.
Pre-reads exec_core for waitpoint/suspension fields needed for KEYS. KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.
Sourcepub async fn change_priority(
&self,
execution_id: &ExecutionId,
new_priority: i32,
) -> Result<ChangePriorityResult, ServerError>
pub async fn change_priority( &self, execution_id: &ExecutionId, new_priority: i32, ) -> Result<ChangePriorityResult, ServerError>
Change an execution’s priority.
KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.
Sourcepub async fn claim_for_worker(
&self,
lane: &LaneId,
worker_id: &WorkerId,
worker_instance_id: &WorkerInstanceId,
worker_capabilities: &BTreeSet<String>,
grant_ttl_ms: u64,
) -> Result<Option<ClaimGrant>, ServerError>
pub async fn claim_for_worker( &self, lane: &LaneId, worker_id: &WorkerId, worker_instance_id: &WorkerInstanceId, worker_capabilities: &BTreeSet<String>, grant_ttl_ms: u64, ) -> Result<Option<ClaimGrant>, ServerError>
Scheduler-routed claim entry point (Batch C item 2 PR-B).
Delegates to ff_scheduler::Scheduler::claim_for_worker which
runs budget + quota + capability admission before issuing the
grant. Returns Ok(None) when no eligible execution exists on
the lane at this scan cycle. The worker’s subsequent
claim_from_grant(lane, grant) mints the lease.
Keeping the claim-grant mint inside the server (rather than the
worker) means capability CSV validation, budget/quota breach
checks, and lane routing run in one place for every tenant
worker — the same invariants as the direct-valkey-claim path
enforces inline, but gated at a single server choke point.
Sourcepub async fn revoke_lease(
&self,
execution_id: &ExecutionId,
) -> Result<RevokeLeaseResult, ServerError>
pub async fn revoke_lease( &self, execution_id: &ExecutionId, ) -> Result<RevokeLeaseResult, ServerError>
Revoke an active lease (operator-initiated).
Sourcepub async fn get_execution(
&self,
execution_id: &ExecutionId,
) -> Result<ExecutionInfo, ServerError>
pub async fn get_execution( &self, execution_id: &ExecutionId, ) -> Result<ExecutionInfo, ServerError>
Get full execution info via HGETALL on exec_core.
Sourcepub async fn list_executions_page(
&self,
partition_id: u16,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, ServerError>
pub async fn list_executions_page( &self, partition_id: u16, cursor: Option<ExecutionId>, limit: usize, ) -> Result<ListExecutionsPage, ServerError>
Partition-scoped forward-only cursor listing of executions.
Parity-wrapper around the Valkey body of
ff_core::engine_backend::EngineBackend::list_executions.
Issue #182 replaced the previous offset + lane + state-filter
shape with this cursor-based API (per owner adjudication:
cursor-everywhere, HTTP surface unreleased). Reads
ff:idx:{p:N}:all_executions, sorts lexicographically on
ExecutionId, filters > cursor, and trims to limit.
Sourcepub async fn replay_execution(
&self,
execution_id: &ExecutionId,
) -> Result<ReplayExecutionResult, ServerError>
pub async fn replay_execution( &self, execution_id: &ExecutionId, ) -> Result<ReplayExecutionResult, ServerError>
Replay a terminal execution.
Pre-reads exec_core for flow_id and dep edges (variable KEYS). KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.
Sourcepub async fn read_attempt_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
from_id: &str,
to_id: &str,
count_limit: u64,
) -> Result<StreamFrames, ServerError>
pub async fn read_attempt_stream( &self, execution_id: &ExecutionId, attempt_index: AttemptIndex, from_id: &str, to_id: &str, count_limit: u64, ) -> Result<StreamFrames, ServerError>
Read frames from an attempt’s stream (XRANGE wrapper) plus terminal
markers (closed_at, closed_reason) so consumers can stop polling
when the producer finalizes.
from_id and to_id accept XRANGE special markers: "-" for
earliest, "+" for latest. count_limit MUST be >= 1 —
0 returns a ServerError::InvalidInput (matches the REST boundary
and the Lua-side reject).
Cluster-safe: the attempt’s {p:N} partition is derived from the
execution id, so all KEYS share the same slot.
Sourcepub async fn tail_attempt_stream(
&self,
execution_id: &ExecutionId,
attempt_index: AttemptIndex,
last_id: &str,
block_ms: u64,
count_limit: u64,
) -> Result<StreamFrames, ServerError>
pub async fn tail_attempt_stream( &self, execution_id: &ExecutionId, attempt_index: AttemptIndex, last_id: &str, block_ms: u64, count_limit: u64, ) -> Result<StreamFrames, ServerError>
Tail a live attempt’s stream (XREAD BLOCK wrapper). Returns frames plus the terminal signal so a polling consumer can exit when the producer closes the stream.
last_id is exclusive — XREAD returns entries with id > last_id.
Pass "0-0" to read from the beginning.
block_ms == 0 → non-blocking peek (returns immediately).
block_ms > 0 → blocks up to that many ms. Empty frames +
closed_at=None → timeout, no new data, still open.
count_limit MUST be >= 1; 0 returns InvalidInput.
Implemented as a direct XREAD command (not FCALL) because blocking
commands are rejected inside Valkey Functions. The terminal
markers come from a companion HMGET on stream_meta — see
ff_script::stream_tail module docs.
Sourcepub async fn shutdown(self)
pub async fn shutdown(self)
Graceful shutdown — stops scanners, drains background handler tasks (e.g. cancel_flow member dispatch) with a bounded timeout, then waits for scanners to finish.
Shutdown order is chosen so in-flight stream ops (read/tail) drain cleanly without new arrivals piling up:
stream_semaphore.close()— new read/tail attempts fail fast withServerError::OperationFailed("stream semaphore closed …")which the REST layer surfaces as a 500 withretryable=false(ops tooling may choose to wait + retry on 503-class responses; the body clearly names the shutdown reason).- Drain handler-spawned background tasks with a 15s ceiling.
engine.shutdown()stops scanners.
Existing in-flight tails finish on their natural block_ms
boundary (up to ~30s); the tail_client is dropped when Server
is dropped after this function returns. We do NOT wait for tails
to drain explicitly — the semaphore-close + natural-timeout
combination bounds shutdown to roughly block_ms + 15s in the
worst case. Callers observing a dropped connection retry against
whatever replacement is coming up.
Source§impl Server
impl Server
Sourcepub async fn rotate_waitpoint_secret(
&self,
new_kid: &str,
new_secret_hex: &str,
) -> Result<RotateWaitpointSecretResult, ServerError>
pub async fn rotate_waitpoint_secret( &self, new_kid: &str, new_secret_hex: &str, ) -> Result<RotateWaitpointSecretResult, ServerError>
Rotate the waitpoint HMAC secret. Promotes the current kid to previous
(accepted within FF_WAITPOINT_HMAC_GRACE_MS), installs new_secret_hex
as the new current kid. Idempotent: re-running with the same new_kid
and new_secret_hex converges partitions to the same state.
Returns a structured result so operators can see which partitions failed. HTTP layer returns 200 if any partition succeeded, 500 only if all fail.
Auto Trait Implementations§
impl Freeze for Server
impl !RefUnwindSafe for Server
impl Send for Server
impl Sync for Server
impl Unpin for Server
impl UnsafeUnpin for Server
impl !UnwindSafe for Server
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more