Skip to main content

FlowFabricWorker

Struct FlowFabricWorker 

Source
pub struct FlowFabricWorker { /* private fields */ }
Expand description

FlowFabric worker — connects to Valkey, claims executions, and provides the worker-facing API.

§Admission control

claim_next() lives behind the direct-valkey-claim feature flag and bypasses the scheduler’s admission controls: it reads the eligible ZSET directly and mints its own claim grant without consulting budget ({b:M}) or quota ({q:K}) policies. Default-off. Intended for benchmarks, tests, and single-tenant development where the scheduler hop is measurement noise, not for production.

For production deployments, consume scheduler-issued grants via FlowFabricWorker::claim_from_grant — the scheduler enforces budget breach, quota sliding-window, concurrency cap, and capability-match checks before issuing grants.

§Usage

use ff_sdk::{FlowFabricWorker, WorkerConfig};

let config = WorkerConfig::new("localhost", 6379, "w1", "w1-i1", "default", "main");
let worker = FlowFabricWorker::connect(config).await?;

loop {
    if let Some(task) = worker.claim_next().await? {
        // Process task...
        task.complete(Some(b"result".to_vec())).await?;
    } else {
        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

Implementations§

Source§

impl FlowFabricWorker

Source

pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError>

Connect to Valkey and prepare the worker.

Establishes the ferriskey connection. Does NOT load the FlowFabric library — that is the server’s responsibility (ff-server calls ff_script::loader::ensure_library() on startup). The SDK assumes the library is already loaded.

Source

pub fn client(&self) -> &Client

Get a reference to the underlying ferriskey client.

Source

pub fn config(&self) -> &WorkerConfig

Get the worker config.

Source

pub fn partition_config(&self) -> &PartitionConfig

Get the server-published partition config this worker bound to at connect(). Callers need this when computing partition hash-tags for direct-client reads (e.g. ff-sdk::read_stream) to stay aligned with the server’s num_flow_partitions — using PartitionConfig::default() assumes 256 partitions and silently misses data on deployments with any other value.

Source

pub async fn claim_from_grant( &self, lane: LaneId, grant: ClaimGrant, ) -> Result<ClaimedTask, SdkError>

Consume a ClaimGrant and claim the granted execution on this worker. The intended production entry point: pair with ff_scheduler::Scheduler::claim_for_worker to flow scheduler-issued grants into the SDK without enabling the direct-valkey-claim feature (which bypasses budget/quota admission control).

The worker’s concurrency semaphore is checked BEFORE the FCALL so a saturated worker does not consume the grant: the grant stays valid for its remaining TTL and the caller can either release it back to the scheduler or retry after some other in-flight task completes.

On success the returned ClaimedTask holds a concurrency permit that releases automatically on complete/fail/cancel/drop — same contract as claim_next.

§Arguments
  • lane — the lane the grant was issued for. Must match what was passed to Scheduler::claim_for_worker; the Lua FCALL uses it to look up lane_eligible, lane_active, and the worker_leases index slot.
  • grant — the ClaimGrant returned by the scheduler.
§Errors
  • SdkError::WorkerAtCapacitymax_concurrent_tasks permits all held. Retryable; the grant is untouched.
  • ScriptError::InvalidClaimGrant — grant missing, consumed, or worker_id mismatch (wrapped in SdkError::Script).
  • ScriptError::ClaimGrantExpired — grant TTL elapsed (wrapped in SdkError::Script).
  • ScriptError::CapabilityMismatch — execution’s required capabilities not a subset of this worker’s caps (wrapped in SdkError::Script). Surfaced post-grant if a race between grant issuance and caps change allows it.
  • ScriptError::Parseff_claim_execution returned an unexpected shape (wrapped in SdkError::Script).
  • SdkError::Valkey / SdkError::ValkeyContext — transport error during the FCALL or the read_execution_context follow-up.
Source

pub async fn claim_via_server( &self, admin: &FlowFabricAdminClient, lane: &LaneId, grant_ttl_ms: u64, ) -> Result<Option<ClaimedTask>, SdkError>

Scheduler-routed claim: POST the server’s /v1/workers/{id}/claim, then chain to Self::claim_from_grant.

Batch C item 2 PR-B. This is the production entry point — budget + quota + capability admission run server-side inside ff_scheduler::Scheduler::claim_for_worker. Callers don’t enable the direct-valkey-claim feature.

Returns Ok(None) when the server says no eligible execution (HTTP 204). Callers typically back off by config.claim_poll_interval_ms and try again, same cadence as the direct-claim path’s Ok(None).

The admin client is the established HTTP surface (FlowFabricAdminClient) reused here so workers don’t keep a second reqwest client around. Build once at worker boot and hand in by reference on every claim.

Source

pub async fn claim_from_reclaim_grant( &self, grant: ReclaimGrant, ) -> Result<ClaimedTask, SdkError>

Consume a ReclaimGrant and transition the granted attempt_interrupted execution into a started state on this worker. Symmetric partner to claim_from_grant for the resume path.

The grant must have been issued to THIS worker (matching worker_id at grant time). A mismatch returns Err(Script(InvalidClaimGrant)). The grant is consumed atomically by ff_claim_resumed_execution; a second call with the same grant also returns InvalidClaimGrant.

§Concurrency

The worker’s concurrency semaphore is checked BEFORE the FCALL (same contract as claim_from_grant). Reclaim does NOT assume pre-existing capacity on this worker — a reclaim can land on a fresh worker instance that just came up after a crash/restart and is picking up a previously-interrupted execution. If the worker is saturated, the grant stays valid for its remaining TTL and the caller can release it or retry.

On success the returned ClaimedTask holds a concurrency permit that releases automatically on complete/fail/cancel/drop.

§Errors
  • SdkError::WorkerAtCapacitymax_concurrent_tasks permits all held. Retryable; the grant is untouched (no FCALL was issued, so ff_claim_resumed_execution did not atomically consume the grant key).
  • ScriptError::InvalidClaimGrant — grant missing, consumed, or worker_id mismatch.
  • ScriptError::ClaimGrantExpired — grant TTL elapsed.
  • ScriptError::NotAResumedExecutionattempt_state is not attempt_interrupted.
  • ScriptError::ExecutionNotLeaseablelifecycle_phase is not runnable.
  • ScriptError::ExecutionNotFound — core key missing.
  • SdkError::Valkey / SdkError::ValkeyContext — transport.
Source

pub async fn deliver_signal( &self, execution_id: &ExecutionId, waitpoint_id: &WaitpointId, signal: Signal, ) -> Result<SignalOutcome, SdkError>

Deliver a signal to a suspended execution’s waitpoint.

The engine atomically records the signal, evaluates the resume condition, and optionally transitions the execution from suspended to runnable.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more