Skip to main content

Server

Struct Server 

Source
pub struct Server { /* private fields */ }
Expand description

FlowFabric server — connects everything together.

Manages the Valkey connection, Lua library loading, background scanners, and provides a minimal API for Phase 1.

Implementations§

Source§

impl Server

Source

pub async fn start(config: ServerConfig) -> Result<Self, ServerError>

Start the FlowFabric server.

Boot sequence:

  1. Connect to Valkey
  2. Validate or create partition config key
  3. Load the FlowFabric Lua library
  4. Start engine (14 background scanners)
Source

pub fn client(&self) -> &Client

Get a reference to the ferriskey client.

Source

pub fn config(&self) -> &ServerConfig

Get the server config.

Source

pub fn partition_config(&self) -> &PartitionConfig

Get the partition config.

Source

pub async fn create_execution( &self, args: &CreateExecutionArgs, ) -> Result<CreateExecutionResult, ServerError>

Create a new execution.

Uses raw FCALL — will migrate to typed ff-script wrappers in Step 1.2.

Source

pub async fn cancel_execution( &self, args: &CancelExecutionArgs, ) -> Result<CancelExecutionResult, ServerError>

Cancel an execution.

Source

pub async fn get_execution_state( &self, execution_id: &ExecutionId, ) -> Result<PublicState, ServerError>

Get the public state of an execution.

Reads public_state from the exec_core hash. Returns the parsed PublicState enum. If the execution is not found, returns an error.

Source

pub async fn get_execution_result( &self, execution_id: &ExecutionId, ) -> Result<Option<Vec<u8>>, ServerError>

Read the raw result payload written by ff_complete_execution.

The Lua side stores the payload at ctx.result() via plain SET. No FCALL — this is a direct GET; returns Ok(None) when the execution is missing, not yet complete, or (in a future retention-policy world) when the result was trimmed.

§Contract vs get_execution_state

get_execution_state is the authoritative completion signal. If a caller observes state == completed but get_execution_result returns None, the result bytes are unavailable — not a caller bug and not a server bug, just the retention policy trimming the blob. V1 sets no retention, so callers on v1 can treat state == completed + Ok(None) as a server bug.

§Ordering

Callers MUST wait for state == completed before calling this method; polls issued before the state transition may hit a narrow window where the completion Lua has written public_state = completed but the result key SET is still on-wire. The current Lua ff_complete_execution writes both in the same atomic script, so the window is effectively zero for direct callers — but retries via ff_replay_execution open it briefly.

Source

pub async fn list_pending_waitpoints( &self, execution_id: &ExecutionId, ) -> Result<Vec<PendingWaitpointInfo>, ServerError>

List the active (pending or active) waitpoints for an execution.

Returns one PendingWaitpointInfo per open waitpoint, including the HMAC-SHA1 waitpoint_token needed to deliver authenticated signals. closed waitpoints are elided — callers looking at history should read the stream or lease history instead.

Read plan: SSCAN ctx.waitpoints() with COUNT 100 (bounded page size, matching the unblock / flow-projector / budget- reconciler convention) to enumerate waitpoint IDs, then TWO pipelines:

  • Pass 1 — single round-trip containing, per waitpoint, one HMGET over the documented field set + one HGET for total_matchers on the condition hash.
  • Pass 2 (conditional) — single round-trip containing an HMGET per waitpoint with total_matchers > 0 to read the matcher:N:name fields.

No FCALL — this is a read-only view built from already- persisted state, so skipping Lua keeps the Valkey single- writer path uncontended. HMGET (vs HGETALL) bounds the per- waitpoint read to the documented field set and defends against a poisoned waitpoint hash with unbounded extra fields accumulating response memory.

§Empty result semantics (TOCTOU)

An empty Vec is returned in three cases:

  1. The execution exists but has never suspended.
  2. All existing waitpoints are closed (already resolved).
  3. A narrow teardown race: SSCAN read the waitpoint set after a concurrent ff_close_waitpoint or execution-cleanup script deleted the waitpoint hashes but before it SREM’d the set members. Each HMGET returns all-None and we skip.

Callers that get an unexpected empty list should cross-check execution state (get_execution_state) to distinguish “pipeline moved past suspended” from “nothing to review yet”.

A waitpoint hash that’s present but missing its waitpoint_token field is similarly elided and a server-side WARN is emitted — this indicates storage corruption (a write that half-populated the hash) and operators should investigate.

Source

pub async fn create_budget( &self, args: &CreateBudgetArgs, ) -> Result<CreateBudgetResult, ServerError>

Create a new budget policy.

Source

pub async fn create_quota_policy( &self, args: &CreateQuotaPolicyArgs, ) -> Result<CreateQuotaPolicyResult, ServerError>

Create a new quota/rate-limit policy.

Source

pub async fn get_budget_status( &self, budget_id: &BudgetId, ) -> Result<BudgetStatus, ServerError>

Read-only budget status for operator visibility.

Source

pub async fn report_usage( &self, budget_id: &BudgetId, args: &ReportUsageArgs, ) -> Result<ReportUsageResult, ServerError>

Report usage against a budget and check limits.

Source

pub async fn reset_budget( &self, budget_id: &BudgetId, ) -> Result<ResetBudgetResult, ServerError>

Reset a budget’s usage counters and schedule the next reset.

Source

pub async fn create_flow( &self, args: &CreateFlowArgs, ) -> Result<CreateFlowResult, ServerError>

Create a new flow container.

Source

pub async fn add_execution_to_flow( &self, args: &AddExecutionToFlowArgs, ) -> Result<AddExecutionToFlowResult, ServerError>

Add an execution to a flow.

§Atomic single-FCALL commit (RFC-011 §7.3)

Post-RFC-011 phase-3, exec_core co-locates with flow_core under hash-tag routing (both hash to {fp:N} via the exec id’s embedded partition). A single atomic FCALL writes:

  • members_set SADD (flow membership)
  • exec_core.flow_id HSET (back-pointer)
  • flow_index SADD (self-heal)
  • flow_core HINCRBY node_count / graph_revision + HSET last_mutation_at

All four writes commit atomically or none do (Valkey scripting contract: validates-before-writing in the Lua body means flow_not_found / flow_already_terminal early-returns fire BEFORE any redis.call() mutation, and a mid-body error after writes is not expected because all writes are on the same slot).

The pre-RFC-011 two-phase contract (membership FCALL on {fp:N} plus separate HSET on {p:N}), orphan window, and reconciliation-scanner plan (issue #21, now superseded) are all retired.

§Consumer contract

The caller’s args.execution_id must be co-located with args.flow_id’s partition — i.e. minted via ExecutionId::for_flow(&args.flow_id, config). Passing a solo-minted id (or any exec id hashing to a different {fp:N} than the flow’s) will fail at the Valkey level with CROSSSLOT on a clustered deploy.

Callers with a flow context in scope always use for_flow; this is the only supported mint path for flow-member execs post-RFC-011. Test fixtures that pre-date the co-location contract use TestCluster::new_execution_id_on_partition to pin to a specific hash-tag index for fcall_create_flow-style helpers that hard-code their flow partition.

Source

pub async fn cancel_flow( &self, args: &CancelFlowArgs, ) -> Result<CancelFlowResult, ServerError>

Cancel a flow.

Flips public_flow_state to cancelled atomically via ff_cancel_flow on {fp:N}. For cancel_all policy, member executions must be cancelled cross-partition; this dispatch runs in the background and the call returns CancelFlowResult::CancellationScheduled immediately. For all other policies (or flows with no members), or when the flow was already in a terminal state (idempotent retry), the call returns CancelFlowResult::Cancelled.

Clients that need synchronous completion can call Self::cancel_flow_wait.

§Backpressure

Each call that hits the async dispatch path spawns a new task into the shared background JoinSet. Rapid repeated calls against the same flow will spawn multiple overlapping dispatch tasks. This is not a correctness issue — each member cancel is idempotent and terminal flows short-circuit via [ParsedCancelFlow::AlreadyTerminal] — but heavy burst callers should either use ?wait=true (serialises the dispatch on the HTTP thread, giving natural backpressure) or implement client-side deduplication on flow_id. The JoinSet is drained with a 15s timeout on Self::shutdown, so very long dispatch tails may be aborted during graceful shutdown.

§Orphan-member semantics on shutdown abort

If shutdown fires JoinSet::abort_all() after its drain timeout while a dispatch loop is mid-iteration, the already-issued ff_cancel_execution FCALLs (atomic Lua) complete cleanly with terminal_outcome = cancelled and the caller-supplied reason. The members not yet visited are abandoned mid-loop. They remain in whichever state they were in (active/eligible/suspended) until the natural lifecycle scanners reach them: active leases expire (lease_expiry) and attempt-timeout them to expired, suspended members time out to skipped, eligible ones sit until retention trim. So no orphan state — but the terminal_outcome for the abandoned members will be expired/skipped rather than cancelled, and the operator-supplied reason is lost for them. Audit tooling that requires reason fidelity across shutdowns should use ?wait=true.

Source

pub async fn cancel_flow_wait( &self, args: &CancelFlowArgs, ) -> Result<CancelFlowResult, ServerError>

Cancel a flow and wait for all member cancellations to complete inline. Slower than Self::cancel_flow for large flows, but guarantees every member is in a terminal state on return.

Source

pub async fn stage_dependency_edge( &self, args: &StageDependencyEdgeArgs, ) -> Result<StageDependencyEdgeResult, ServerError>

Stage a dependency edge between two executions in a flow.

Runs on the flow partition {fp:N}. KEYS (6), ARGV (8) — matches lua/flow.lua ff_stage_dependency_edge.

Source

pub async fn apply_dependency_to_child( &self, args: &ApplyDependencyToChildArgs, ) -> Result<ApplyDependencyToChildResult, ServerError>

Apply a staged dependency edge to the child execution.

Runs on the child execution partition {p:N}. KEYS (7), ARGV (7) — matches lua/flow.lua ff_apply_dependency_to_child.

Source

pub async fn deliver_signal( &self, args: &DeliverSignalArgs, ) -> Result<DeliverSignalResult, ServerError>

Deliver a signal to a suspended (or pending-waitpoint) execution.

Pre-reads exec_core for waitpoint/suspension fields needed for KEYS. KEYS (13), ARGV (17) — matches lua/signal.lua ff_deliver_signal.

Source

pub async fn change_priority( &self, execution_id: &ExecutionId, new_priority: i32, ) -> Result<ChangePriorityResult, ServerError>

Change an execution’s priority.

KEYS (2), ARGV (2) — matches lua/scheduling.lua ff_change_priority.

Source

pub async fn claim_for_worker( &self, lane: &LaneId, worker_id: &WorkerId, worker_instance_id: &WorkerInstanceId, worker_capabilities: &BTreeSet<String>, grant_ttl_ms: u64, ) -> Result<Option<ClaimGrant>, ServerError>

Scheduler-routed claim entry point (Batch C item 2 PR-B).

Delegates to ff_scheduler::Scheduler::claim_for_worker which runs budget + quota + capability admission before issuing the grant. Returns Ok(None) when no eligible execution exists on the lane at this scan cycle. The worker’s subsequent claim_from_grant(lane, grant) mints the lease.

Keeping the claim-grant mint inside the server (rather than the worker) means capability CSV validation, budget/quota breach checks, and lane routing run in one place for every tenant worker — the same invariants as the direct-valkey-claim path enforces inline, but gated at a single server choke point.

Source

pub async fn revoke_lease( &self, execution_id: &ExecutionId, ) -> Result<RevokeLeaseResult, ServerError>

Revoke an active lease (operator-initiated).

Source

pub async fn get_execution( &self, execution_id: &ExecutionId, ) -> Result<ExecutionInfo, ServerError>

Get full execution info via HGETALL on exec_core.

Source

pub async fn list_executions( &self, partition_id: u16, lane: &LaneId, state_filter: &str, offset: u64, limit: u64, ) -> Result<ListExecutionsResult, ServerError>

List executions from a partition’s index ZSET.

No FCALL — direct ZRANGE + pipelined HMGET reads.

Source

pub async fn replay_execution( &self, execution_id: &ExecutionId, ) -> Result<ReplayExecutionResult, ServerError>

Replay a terminal execution.

Pre-reads exec_core for flow_id and dep edges (variable KEYS). KEYS (4+N), ARGV (2+N) — matches lua/flow.lua ff_replay_execution.

Source

pub async fn read_attempt_stream( &self, execution_id: &ExecutionId, attempt_index: AttemptIndex, from_id: &str, to_id: &str, count_limit: u64, ) -> Result<StreamFrames, ServerError>

Read frames from an attempt’s stream (XRANGE wrapper) plus terminal markers (closed_at, closed_reason) so consumers can stop polling when the producer finalizes.

from_id and to_id accept XRANGE special markers: "-" for earliest, "+" for latest. count_limit MUST be >= 10 returns a ServerError::InvalidInput (matches the REST boundary and the Lua-side reject).

Cluster-safe: the attempt’s {p:N} partition is derived from the execution id, so all KEYS share the same slot.

Source

pub async fn tail_attempt_stream( &self, execution_id: &ExecutionId, attempt_index: AttemptIndex, last_id: &str, block_ms: u64, count_limit: u64, ) -> Result<StreamFrames, ServerError>

Tail a live attempt’s stream (XREAD BLOCK wrapper). Returns frames plus the terminal signal so a polling consumer can exit when the producer closes the stream.

last_id is exclusive — XREAD returns entries with id > last_id. Pass "0-0" to read from the beginning.

block_ms == 0 → non-blocking peek (returns immediately). block_ms > 0 → blocks up to that many ms. Empty frames + closed_at=None → timeout, no new data, still open.

count_limit MUST be >= 1; 0 returns InvalidInput.

Implemented as a direct XREAD command (not FCALL) because blocking commands are rejected inside Valkey Functions. The terminal markers come from a companion HMGET on stream_meta — see ff_script::stream_tail module docs.

Source

pub async fn shutdown(self)

Graceful shutdown — stops scanners, drains background handler tasks (e.g. cancel_flow member dispatch) with a bounded timeout, then waits for scanners to finish.

Shutdown order is chosen so in-flight stream ops (read/tail) drain cleanly without new arrivals piling up:

  1. stream_semaphore.close() — new read/tail attempts fail fast with ServerError::OperationFailed("stream semaphore closed …") which the REST layer surfaces as a 500 with retryable=false (ops tooling may choose to wait + retry on 503-class responses; the body clearly names the shutdown reason).
  2. Drain handler-spawned background tasks with a 15s ceiling.
  3. engine.shutdown() stops scanners.

Existing in-flight tails finish on their natural block_ms boundary (up to ~30s); the tail_client is dropped when Server is dropped after this function returns. We do NOT wait for tails to drain explicitly — the semaphore-close + natural-timeout combination bounds shutdown to roughly block_ms + 15s in the worst case. Callers observing a dropped connection retry against whatever replacement is coming up.

Source§

impl Server

Source

pub async fn rotate_waitpoint_secret( &self, new_kid: &str, new_secret_hex: &str, ) -> Result<RotateWaitpointSecretResult, ServerError>

Rotate the waitpoint HMAC secret. Promotes the current kid to previous (accepted within FF_WAITPOINT_HMAC_GRACE_MS), installs new_secret_hex as the new current kid. Idempotent: re-running with the same new_kid and new_secret_hex converges partitions to the same state.

Returns a structured result so operators can see which partitions failed. HTTP layer returns 200 if any partition succeeded, 500 only if all fail.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<A, B, T> HttpServerConnExec<A, B> for T
where B: Body,