Skip to main content

FlowFabricWorker

Struct FlowFabricWorker 

Source
pub struct FlowFabricWorker { /* private fields */ }
Expand description

FlowFabric worker — connects to Valkey, claims executions, and provides the worker-facing API.

§Admission control

claim_next() lives behind the direct-valkey-claim feature flag and bypasses the scheduler’s admission controls: it reads the eligible ZSET directly and mints its own claim grant without consulting budget ({b:M}) or quota ({q:K}) policies. Default-off. Intended for benchmarks, tests, and single-tenant development where the scheduler hop is measurement noise, not for production.

For production deployments, consume scheduler-issued grants via FlowFabricWorker::claim_from_grant — the scheduler enforces budget breach, quota sliding-window, concurrency cap, and capability-match checks before issuing grants.

§Usage

use ff_sdk::{FlowFabricWorker, WorkerConfig};

let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
let worker = FlowFabricWorker::connect(config).await?;

loop {
    if let Some(task) = worker.claim_next().await? {
        // Process task...
        task.complete(Some(b"result".to_vec())).await?;
    } else {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

Implementations§

Source§

impl FlowFabricWorker

Source

pub async fn describe_execution( &self, id: &ExecutionId, ) -> Result<Option<ExecutionSnapshot>, SdkError>

Read a typed snapshot of one execution.

Returns Ok(None) when no execution exists at id (exec_core hash absent). Returns Ok(Some(snapshot)) on success. Errors propagate Valkey transport faults and decode failures.

§Consistency

The snapshot is assembled from two pipelined HGETALLs — one for exec_core, one for the sibling tags hash — issued in a single round trip against the partition holding the execution. The two reads share a hash-tag so they always land on the same Valkey slot in cluster mode. They are NOT MULTI/EXEC-atomic: a concurrent FCALL that HSETs both keys can interleave, and a caller may observe exec_core fields from epoch N+1 alongside tags from epoch N (or vice versa). This matches the last-write-wins-per-field semantics every existing HGETALL consumer already assumes.

§Field semantics
  • public_state is the engine-maintained derived label written atomically by every state-mutating FCALL. Parsed from the snake_case string stored on exec_core.
  • blocking_reason / blocking_detailNone when the exec_core field is the empty string (cleared on transition).
  • current_attemptNone before the first claim (exec_core current_attempt_id empty).
  • current_leaseNone when no lease is held (typical for terminal, suspended, or pre-claim executions).
  • current_waitpointNone unless an active suspension has pinned a waitpoint id.
  • tags — empty map if the tags hash is absent (common for executions created without tags_json).
Source§

impl FlowFabricWorker

Source

pub async fn describe_flow( &self, id: &FlowId, ) -> Result<Option<FlowSnapshot>, SdkError>

Read a typed snapshot of one flow.

Returns Ok(None) when no flow exists at id (flow_core hash absent). Returns Ok(Some(snapshot)) on success. Errors propagate Valkey transport faults and decode failures.

§Consistency

The snapshot is assembled from a single HGETALL flow_core. No second key is pipelined: unlike describe_execution, flow tags live inline on flow_core under the namespaced-prefix convention (see FlowSnapshot::tags). A single round trip is sufficient and reflects last-write-wins-per-field semantics under concurrent FCALLs — identical to every existing HGETALL consumer.

§Field semantics
  • public_flow_state — engine-written literal (open, running, blocked, cancelled, completed, failed). Surfaced as String because FF has no typed enum yet.
  • cancelled_at / cancel_reason / cancellation_policy — populated only after cancel_flow. None for live flows and for pre-cancel_flow-persistence-era flows.
  • tags — any flow_core field matching ^[a-z][a-z0-9_]*\. (the namespaced-tag convention). FF’s own fields stay in snake_case without dots, so there’s no collision. Fields that match neither shape are treated as corruption and fail loud.
Source§

impl FlowFabricWorker

Source

pub async fn describe_edge( &self, flow_id: &FlowId, edge_id: &EdgeId, ) -> Result<Option<EdgeSnapshot>, SdkError>

Read a typed snapshot of one dependency edge.

Takes both flow_id and edge_id: the edge hash is stored under the flow’s partition (ff:flow:{fp:N}:<flow_id>:edge:<edge_id>) and FF does not maintain a global edge_id -> flow_id index. The caller already knows the flow from the staging call result or the consumer’s own metadata.

Returns Ok(None) when the edge hash is absent (never staged, or staged under a different flow). Returns Ok(Some(snapshot)) on success.

§Consistency

Single HGETALL — the flow-scoped edge hash is written once at staging time and never mutated (per-execution resolution state lives on a separate dep:<edge_id> hash), so a single round trip is authoritative.

Source

pub async fn list_outgoing_edges( &self, upstream_eid: &ExecutionId, ) -> Result<Vec<EdgeSnapshot>, SdkError>

List all outgoing dependency edges originating from an execution.

Returns an empty Vec when the execution has no outgoing edges (including the case where the execution is standalone, i.e. not attached to any flow — such executions cannot participate in dependency edges).

§Reads
  1. HGET exec_core flow_id — resolve the flow owning the adjacency set. Missing or empty flow_id returns an empty Vec.
  2. SMEMBERS on the flow-scoped out:<upstream_eid> set.
  3. One pipelined round trip issuing one HGETALL per edge_id.

Ordering is unspecified — the adjacency set is an unordered SET. Callers that need deterministic order should sort by EdgeSnapshot::edge_id (or created_at) themselves.

Source

pub async fn list_incoming_edges( &self, downstream_eid: &ExecutionId, ) -> Result<Vec<EdgeSnapshot>, SdkError>

List all incoming dependency edges landing on an execution. See [list_outgoing_edges] for the read shape.

Source§

impl FlowFabricWorker

Source

pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError>

Connect to Valkey and prepare the worker.

Establishes the ferriskey connection. Does NOT load the FlowFabric library — that is the server’s responsibility (ff-server calls ff_script::loader::ensure_library() on startup). The SDK assumes the library is already loaded.

Source

pub async fn connect_with( config: WorkerConfig, backend: Arc<dyn EngineBackend>, ) -> Result<Self, SdkError>

Store a pre-built EngineBackend on the worker. Builds the worker via the legacy FlowFabricWorker::connect path first (so the embedded ferriskey::Client that the Stage 1b non-migrated hot paths still use is dialed), then replaces the default ValkeyBackend wrapper with the caller-supplied Arc<dyn EngineBackend>.

Stage 1b scope — what the injected backend covers today. After this PR, ClaimedTask’s 8 migrated ops (renew_lease / update_progress / resume_signals / delay_execution / move_to_waiting_children / complete / cancel / fail) route through the injected backend. That’s the first material use of connect_with: a mock backend now genuinely sees the worker’s per-task write-surface calls. The 4 trait-shape-deferred ops (create_pending_waitpoint, append_frame, suspend, report_usage) still reach the embedded ferriskey::Client directly until issue #117’s trait amendment lands; claim_next / claim_from_grant / claim_from_reclaim_grant / deliver_signal / admin queries are Stage 1c hot-path work. Stage 1d removes the embedded client entirely.

Today’s constructor is therefore NOT yet a drop-in way to swap in a non-Valkey backend — it requires a reachable Valkey node for the 4 deferred + hot-path ops. Tests that exercise only the 8 migrated ops can run fully against a mock backend.

Source

pub fn backend(&self) -> Option<&Arc<dyn EngineBackend>>

Borrow the EngineBackend this worker forwards Stage-1b trait ops through.

RFC-012 Stage 1b. Always returns Some(&self.backend) — the Option wrapper is retained for API stability with the Stage-1a shape. Stage 1c narrows the return type to &Arc<dyn EngineBackend>.

Source

pub fn client(&self) -> &Client

Get a reference to the underlying ferriskey client.

Source

pub fn config(&self) -> &WorkerConfig

Get the worker config.

Source

pub fn partition_config(&self) -> &PartitionConfig

Get the server-published partition config this worker bound to at connect(). Callers need this when computing partition hash-tags for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned with the server’s num_flow_partitions — using PartitionConfig::default() assumes 256 partitions and silently misses data on deployments with any other value.

Source

pub async fn claim_from_grant( &self, lane: LaneId, grant: ClaimGrant, ) -> Result<ClaimedTask, SdkError>

Consume a ClaimGrant and claim the granted execution on this worker. The intended production entry point: pair with ff_scheduler::Scheduler::claim_for_worker to flow scheduler-issued grants into the SDK without enabling the direct-valkey-claim feature (which bypasses budget/quota admission control).

The worker’s concurrency semaphore is checked BEFORE the FCALL so a saturated worker does not consume the grant: the grant stays valid for its remaining TTL and the caller can either release it back to the scheduler or retry after some other in-flight task completes.

On success the returned ClaimedTask holds a concurrency permit that releases automatically on complete/fail/cancel/drop — same contract as claim_next.

§Arguments
  • lane — the lane the grant was issued for. Must match what was passed to Scheduler::claim_for_worker; the Lua FCALL uses it to look up lane_eligible, lane_active, and the worker_leases index slot.
  • grant — the ClaimGrant returned by the scheduler.
§Errors
  • SdkError::WorkerAtCapacitymax_concurrent_tasks permits all held. Retryable; the grant is untouched.
  • ScriptError::InvalidClaimGrant — grant missing, consumed, or worker_id mismatch (wrapped in SdkError::Engine).
  • ScriptError::ClaimGrantExpired — grant TTL elapsed (wrapped in SdkError::Engine).
  • ScriptError::CapabilityMismatch — execution’s required capabilities not a subset of this worker’s caps (wrapped in SdkError::Engine). Surfaced post-grant if a race between grant issuance and caps change allows it.
  • ScriptError::Parseff_claim_execution returned an unexpected shape (wrapped in SdkError::Engine).
  • SdkError::Valkey / SdkError::ValkeyContext — transport error during the FCALL or the read_execution_context follow-up.
Source

pub async fn claim_via_server( &self, admin: &FlowFabricAdminClient, lane: &LaneId, grant_ttl_ms: u64, ) -> Result<Option<ClaimedTask>, SdkError>

Scheduler-routed claim: POST the server’s /v1/workers/{id}/claim, then chain to Self::claim_from_grant.

Batch C item 2 PR-B. This is the production entry point — budget + quota + capability admission run server-side inside ff_scheduler::Scheduler::claim_for_worker. Callers don’t enable the direct-valkey-claim feature.

Returns Ok(None) when the server says no eligible execution (HTTP 204). Callers typically back off by config.claim_poll_interval_ms and try again, same cadence as the direct-claim path’s Ok(None).

The admin client is the established HTTP surface (FlowFabricAdminClient) reused here so workers don’t keep a second reqwest client around. Build once at worker boot and hand in by reference on every claim.

Source

pub async fn claim_from_reclaim_grant( &self, grant: ReclaimGrant, ) -> Result<ClaimedTask, SdkError>

Consume a ReclaimGrant and transition the granted attempt_interrupted execution into a started state on this worker. Symmetric partner to claim_from_grant for the resume path.

The grant must have been issued to THIS worker (matching worker_id at grant time). A mismatch returns Err(Script(InvalidClaimGrant)). The grant is consumed atomically by ff_claim_resumed_execution; a second call with the same grant also returns InvalidClaimGrant.

§Concurrency

The worker’s concurrency semaphore is checked BEFORE the FCALL (same contract as claim_from_grant). Reclaim does NOT assume pre-existing capacity on this worker — a reclaim can land on a fresh worker instance that just came up after a crash/restart and is picking up a previously-interrupted execution. If the worker is saturated, the grant stays valid for its remaining TTL and the caller can release it or retry.

On success the returned ClaimedTask holds a concurrency permit that releases automatically on complete/fail/cancel/drop.

§Errors
  • SdkError::WorkerAtCapacitymax_concurrent_tasks permits all held. Retryable; the grant is untouched (no FCALL was issued, so ff_claim_resumed_execution did not atomically consume the grant key).
  • ScriptError::InvalidClaimGrant — grant missing, consumed, or worker_id mismatch.
  • ScriptError::ClaimGrantExpired — grant TTL elapsed.
  • ScriptError::NotAResumedExecutionattempt_state is not attempt_interrupted.
  • ScriptError::ExecutionNotLeaseablelifecycle_phase is not runnable.
  • ScriptError::ExecutionNotFound — core key missing.
  • SdkError::Valkey / SdkError::ValkeyContext — transport.
Source

pub async fn deliver_signal( &self, execution_id: &ExecutionId, waitpoint_id: &WaitpointId, signal: Signal, ) -> Result<SignalOutcome, SdkError>

Deliver a signal to a suspended execution’s waitpoint.

The engine atomically records the signal, evaluates the resume condition, and optionally transitions the execution from suspended to runnable.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more