pub struct FlowFabricWorker { /* private fields */ }Expand description
FlowFabric worker — connects to Valkey, claims executions, and provides the worker-facing API.
§Admission control
claim_next() lives behind the direct-valkey-claim feature flag and
bypasses the scheduler’s admission controls: it reads the eligible
ZSET directly and mints its own claim grant without consulting budget
({b:M}) or quota ({q:K}) policies. Default-off. Intended for
benchmarks, tests, and single-tenant development where the scheduler
hop is measurement noise, not for production.
For production deployments, consume scheduler-issued grants via
FlowFabricWorker::claim_from_grant — the scheduler enforces
budget breach, quota sliding-window, concurrency cap, and
capability-match checks before issuing grants.
§Usage
use ff_core::backend::BackendConfig;
use ff_core::types::{LaneId, Namespace, WorkerId, WorkerInstanceId};
use ff_sdk::{FlowFabricWorker, WorkerConfig};
let config = WorkerConfig {
backend: BackendConfig::valkey("localhost", 6379),
worker_id: WorkerId::new("w1"),
worker_instance_id: WorkerInstanceId::new("w1-i1"),
namespace: Namespace::new("default"),
lanes: vec![LaneId::new("main")],
capabilities: Vec::new(),
lease_ttl_ms: 30_000,
claim_poll_interval_ms: 1_000,
max_concurrent_tasks: 1,
};
let worker = FlowFabricWorker::connect(config).await?;
loop {
if let Some(task) = worker.claim_next().await? {
// Process task...
task.complete(Some(b"result".to_vec())).await?;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}Implementations§
Source§impl FlowFabricWorker
impl FlowFabricWorker
Sourcepub async fn describe_execution(
&self,
id: &ExecutionId,
) -> Result<Option<ExecutionSnapshot>, SdkError>
pub async fn describe_execution( &self, id: &ExecutionId, ) -> Result<Option<ExecutionSnapshot>, SdkError>
Read a typed snapshot of one execution. See
ExecutionSnapshot for field semantics.
Post-T3 this is a thin forwarder onto the bundled
EngineBackend::describe_execution
impl; the HGETALL pipeline body and strict-parse decoder both
live in ff-backend-valkey / ff-core so alternate backends
can serve the same call shape. Parse failures surface as
SdkError::Engine(EngineError::Validation { kind: Corruption, .. }).
Sourcepub async fn describe_flow(
&self,
id: &FlowId,
) -> Result<Option<FlowSnapshot>, SdkError>
pub async fn describe_flow( &self, id: &FlowId, ) -> Result<Option<FlowSnapshot>, SdkError>
Read a typed snapshot of one flow. See FlowSnapshot for
field semantics.
Post-T3 this is a thin forwarder onto the bundled
EngineBackend::describe_flow
impl.
Sourcepub async fn list_flows(
&self,
partition: PartitionKey,
cursor: Option<FlowId>,
limit: usize,
) -> Result<ListFlowsPage, SdkError>
pub async fn list_flows( &self, partition: PartitionKey, cursor: Option<FlowId>, limit: usize, ) -> Result<ListFlowsPage, SdkError>
List flows on a partition with cursor-based pagination (issue #185).
partition is an opaque PartitionKey — typically obtained
from a [crate::ClaimGrant] or from
ff_core::partition::flow_partition when the caller already
knows a FlowId on the partition it wants to enumerate.
cursor is None on the first call and the previous page’s
next_cursor on subsequent calls; iteration terminates when
the returned next_cursor is None.
Thin forwarder onto
EngineBackend::list_flows.
Sourcepub async fn describe_edge(
&self,
flow_id: &FlowId,
edge_id: &EdgeId,
) -> Result<Option<EdgeSnapshot>, SdkError>
pub async fn describe_edge( &self, flow_id: &FlowId, edge_id: &EdgeId, ) -> Result<Option<EdgeSnapshot>, SdkError>
Read a typed snapshot of one dependency edge.
Takes both flow_id and edge_id: the edge hash is stored
under the flow’s partition
(ff:flow:{fp:N}:<flow_id>:edge:<edge_id>) and FF does not
maintain a global edge_id -> flow_id index. The caller
already knows the flow from the staging call result or the
consumer’s own metadata.
Returns Ok(None) when the edge hash is absent (never staged,
or staged under a different flow).
Post-#160 this is a thin forwarder onto the bundled
EngineBackend::describe_edge
impl.
Sourcepub async fn list_outgoing_edges(
&self,
upstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError>
pub async fn list_outgoing_edges( &self, upstream_eid: &ExecutionId, ) -> Result<Vec<EdgeSnapshot>, SdkError>
List all outgoing dependency edges originating from an execution.
Returns an empty Vec when the execution has no outgoing edges
— including standalone executions not attached to any flow.
§Reads
EngineBackend::resolve_execution_flow_id(single HGET onexec_core.flow_idfor the Valkey backend).EngineBackend::list_edgeson the resolved flow (SMEMBERS- pipelined HGETALL on the flow’s partition).
Ordering is unspecified — the adjacency set is an unordered
SET. Callers that need deterministic order should sort by
EdgeSnapshot::edge_id or created_at.
Sourcepub async fn list_incoming_edges(
&self,
downstream_eid: &ExecutionId,
) -> Result<Vec<EdgeSnapshot>, SdkError>
pub async fn list_incoming_edges( &self, downstream_eid: &ExecutionId, ) -> Result<Vec<EdgeSnapshot>, SdkError>
List all incoming dependency edges landing on an execution.
See [list_outgoing_edges] for the read shape.
Sourcepub async fn list_lanes(
&self,
cursor: Option<LaneId>,
limit: usize,
) -> Result<ListLanesPage, SdkError>
pub async fn list_lanes( &self, cursor: Option<LaneId>, limit: usize, ) -> Result<ListLanesPage, SdkError>
Enumerate registered lanes with cursor-based pagination.
Thin forwarder onto
EngineBackend::list_lanes.
Lanes are global (not partition-scoped); the Valkey backend
serves this from the ff:idx:lanes SET, sorts by lane name,
and returns a limit-sized page starting after cursor
(exclusive). Loop until ListLanesPage::next_cursor is
None to read the full registry.
Sourcepub async fn list_suspended(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListSuspendedPage, SdkError>
pub async fn list_suspended( &self, partition: PartitionKey, cursor: Option<ExecutionId>, limit: usize, ) -> Result<ListSuspendedPage, SdkError>
List suspended executions in one partition, cursor-paginated,
with each entry’s suspension reason_code populated (issue
#183).
Thin forwarder onto the bundled
EngineBackend::list_suspended
impl. cursor = None starts a fresh scan; feed the returned
ListSuspendedPage::next_cursor back in to page forward until
it returns None. See
ff_core::contracts::SuspendedExecutionEntry for the per-row
fields (including the free-form reason code).
Sourcepub async fn list_executions(
&self,
partition: PartitionKey,
cursor: Option<ExecutionId>,
limit: usize,
) -> Result<ListExecutionsPage, SdkError>
pub async fn list_executions( &self, partition: PartitionKey, cursor: Option<ExecutionId>, limit: usize, ) -> Result<ListExecutionsPage, SdkError>
Forward-only paginated listing of executions in a partition.
Thin forwarder onto
EngineBackend::list_executions.
Pagination is cursor-based: pass cursor = None for the first
page and feed the returned
ListExecutionsPage::next_cursor back as cursor until it
returns None. See the trait rustdoc for the full contract.
Source§impl FlowFabricWorker
impl FlowFabricWorker
Sourcepub async fn connect(config: WorkerConfig) -> Result<Self, SdkError>
pub async fn connect(config: WorkerConfig) -> Result<Self, SdkError>
Connect to Valkey and prepare the worker.
Establishes the ferriskey connection. Does NOT load the FlowFabric
library — that is the server’s responsibility (ff-server calls
ff_script::loader::ensure_library() on startup). The SDK assumes
the library is already loaded.
§Smoke / dev scripts: rotate WorkerInstanceId
The SDK writes a SET-NX liveness sentinel keyed on the worker’s
WorkerInstanceId. When a smoke / dev script reuses the same
WorkerInstanceId across restarts, subsequent runs trap behind
the prior run’s SET-NX until the liveness key’s TTL (≈ 2× the
configured lease TTL) expires — the worker appears stuck and
claims nothing. Iterative scripts should synthesise a fresh
WorkerInstanceId per process (e.g. WorkerInstanceId::new()
or embed a UUID/timestamp) rather than hard-coding a stable
value. Production workers that cleanly shut down release the
key; only crashed / kill -9’d processes hit this trap.
Sourcepub async fn connect_with(
config: WorkerConfig,
backend: Arc<dyn EngineBackend>,
completion: Option<Arc<dyn CompletionBackend>>,
) -> Result<Self, SdkError>
pub async fn connect_with( config: WorkerConfig, backend: Arc<dyn EngineBackend>, completion: Option<Arc<dyn CompletionBackend>>, ) -> Result<Self, SdkError>
Store pre-built EngineBackend and (optional)
CompletionBackend handles on the worker. Builds the worker
via the legacy FlowFabricWorker::connect path first (so the
embedded ferriskey::Client that the Stage 1b non-migrated hot
paths still use is dialed), then replaces the default
ValkeyBackend wrapper with the caller-supplied trait objects.
The completion argument is explicit: 0.3.3 previously accepted
only backend and completion_backend() silently returned
None on this path because Arc<dyn EngineBackend> cannot be
upcast to Arc<dyn CompletionBackend> without loss of
trait-object identity. 0.3.4 lets the caller decide.
Some(arc)— caller supplies a completion backend.Self::completion_backendreturnsSome(clone).None— this backend does not support push-based completion (future Postgres backend without LISTEN/NOTIFY, test mocks).Self::completion_backendreturnsNone.
When the underlying backend implements both traits (as
ValkeyBackend does), pass the same Arc twice — the two
trait-object views share one allocation:
use std::sync::Arc;
use ff_backend_valkey::ValkeyBackend;
use ff_sdk::{FlowFabricWorker, WorkerConfig};
// Valkey (completion supported):
let valkey = Arc::new(ValkeyBackend::connect(backend_config).await?);
let worker = FlowFabricWorker::connect_with(
worker_config,
valkey.clone(),
Some(valkey),
).await?;Backend without completion support:
let worker = FlowFabricWorker::connect_with(
worker_config,
backend,
None,
).await?;Stage 1b + Round-7 scope — what the injected backend covers
today. The injected backend currently covers these per-task
ClaimedTask ops: update_progress / resume_signals /
delay_execution / move_to_waiting_children / complete /
cancel / fail / create_pending_waitpoint /
append_frame / report_usage. A mock backend therefore sees
that portion of the worker’s per-task write surface. Lease
renewal also routes through backend.renew(&handle). Round-7
(#135/#145) closed the four trait-shape gaps tracked by #117,
but suspend still reaches the embedded ferriskey::Client
directly via ff_suspend_execution — this is the deferred
suspend per RFC-012 §R7.6.1, pending Stage 1d input-shape
work. claim_next / claim_from_grant /
claim_from_reclaim_grant / deliver_signal / admin queries
are Stage 1c hot-path work. Stage 1d removes the embedded
client entirely.
Today’s constructor is therefore NOT yet a drop-in way to swap
in a non-Valkey backend — it requires a reachable Valkey node
for suspend plus the remaining hot-path ops. Tests that
exercise only the migrated per-task ops can run fully against
a mock backend.
Sourcepub fn backend(&self) -> Option<&Arc<dyn EngineBackend>>
pub fn backend(&self) -> Option<&Arc<dyn EngineBackend>>
Borrow the EngineBackend this worker forwards Stage-1b trait
ops through.
RFC-012 Stage 1b. Always returns Some(&self.backend) —
the Option wrapper is retained for API stability with the
Stage-1a shape. Stage 1c narrows the return type to
&Arc<dyn EngineBackend>.
Sourcepub fn completion_backend(&self) -> Option<Arc<dyn CompletionBackend>>
pub fn completion_backend(&self) -> Option<Arc<dyn CompletionBackend>>
Handle to the completion-event subscription backend, for consumers that need to observe execution completions (DAG reconcilers, tenant-isolated subscribers).
Returns Some when the worker was built through
Self::connect on the default valkey-default feature
(the bundled ValkeyBackend implements
CompletionBackend),
or via Self::connect_with with a Some(..) completion
handle. Returns None when the caller passed None to
Self::connect_with — i.e. the backend does not support
push-based completion streams (future Postgres without
LISTEN/NOTIFY, test mocks).
The returned handle shares the same underlying allocation as
Self::backend; calls through it (e.g.
subscribe_completions_filtered) hit the same connection
the worker itself uses.
Sourcepub fn config(&self) -> &WorkerConfig
pub fn config(&self) -> &WorkerConfig
Get the worker config.
Sourcepub fn partition_config(&self) -> &PartitionConfig
pub fn partition_config(&self) -> &PartitionConfig
Get the server-published partition config this worker bound to at
connect(). Exposed so consumers that mint custom
ExecutionIds (e.g. for describe_execution lookups on ids
produced outside this worker) stay aligned with the server’s
num_flow_partitions — using PartitionConfig::default()
assumes 256 partitions and silently misses data on deployments
with any other value.
Sourcepub async fn claim_from_grant(
&self,
lane: LaneId,
grant: ClaimGrant,
) -> Result<ClaimedTask, SdkError>
pub async fn claim_from_grant( &self, lane: LaneId, grant: ClaimGrant, ) -> Result<ClaimedTask, SdkError>
Consume a ClaimGrant and claim the granted execution on
this worker. The intended production entry point: pair with
ff_scheduler::Scheduler::claim_for_worker to flow
scheduler-issued grants into the SDK without enabling the
direct-valkey-claim feature (which bypasses budget/quota
admission control).
The worker’s concurrency semaphore is checked BEFORE the FCALL so a saturated worker does not consume the grant: the grant stays valid for its remaining TTL and the caller can either release it back to the scheduler or retry after some other in-flight task completes.
On success the returned ClaimedTask holds a concurrency
permit that releases automatically on
complete/fail/cancel/drop — same contract as
claim_next.
§Arguments
lane— the lane the grant was issued for. Must match what was passed toScheduler::claim_for_worker; the Lua FCALL uses it to look uplane_eligible,lane_active, and theworker_leasesindex slot.grant— theClaimGrantreturned by the scheduler.
§Errors
SdkError::WorkerAtCapacity—max_concurrent_taskspermits all held. Retryable; the grant is untouched.ScriptError::InvalidClaimGrant— grant missing, consumed, orworker_idmismatch (wrapped inSdkError::Engine).ScriptError::ClaimGrantExpired— grant TTL elapsed (wrapped inSdkError::Engine).ScriptError::CapabilityMismatch— execution’s required capabilities not a subset of this worker’s caps (wrapped inSdkError::Engine). Surfaced post-grant if a race between grant issuance and caps change allows it.ScriptError::Parse—ff_claim_executionreturned an unexpected shape (wrapped inSdkError::Engine).SdkError::Backend/SdkError::BackendContext— transport error during the FCALL or theread_execution_contextfollow-up.
Sourcepub async fn claim_via_server(
&self,
admin: &FlowFabricAdminClient,
lane: &LaneId,
grant_ttl_ms: u64,
) -> Result<Option<ClaimedTask>, SdkError>
pub async fn claim_via_server( &self, admin: &FlowFabricAdminClient, lane: &LaneId, grant_ttl_ms: u64, ) -> Result<Option<ClaimedTask>, SdkError>
Scheduler-routed claim: POST the server’s
/v1/workers/{id}/claim, then chain to
Self::claim_from_grant.
Batch C item 2 PR-B. This is the production entry point —
budget + quota + capability admission run server-side inside
ff_scheduler::Scheduler::claim_for_worker. Callers don’t
enable the direct-valkey-claim feature.
Returns Ok(None) when the server says no eligible execution
(HTTP 204). Callers typically back off by
config.claim_poll_interval_ms and try again, same cadence
as the direct-claim path’s Ok(None).
The admin client is the established HTTP surface
(FlowFabricAdminClient) reused here so workers don’t keep a
second reqwest client around. Build once at worker boot and
hand in by reference on every claim.
Sourcepub async fn claim_from_reclaim_grant(
&self,
grant: ReclaimGrant,
) -> Result<ClaimedTask, SdkError>
pub async fn claim_from_reclaim_grant( &self, grant: ReclaimGrant, ) -> Result<ClaimedTask, SdkError>
Consume a ReclaimGrant and transition the granted
attempt_interrupted execution into a started state on this
worker. Symmetric partner to claim_from_grant for the
resume path.
The grant must have been issued to THIS worker (matching
worker_id at grant time). A mismatch returns
Err(Script(InvalidClaimGrant)). The grant is consumed
atomically by ff_claim_resumed_execution; a second call with
the same grant also returns InvalidClaimGrant.
§Concurrency
The worker’s concurrency semaphore is checked BEFORE the FCALL
(same contract as claim_from_grant). Reclaim does NOT
assume pre-existing capacity on this worker — a reclaim can
land on a fresh worker instance that just came up after a
crash/restart and is picking up a previously-interrupted
execution. If the worker is saturated, the grant stays valid
for its remaining TTL and the caller can release it or retry.
On success the returned ClaimedTask holds a concurrency
permit that releases automatically on
complete/fail/cancel/drop.
§Errors
SdkError::WorkerAtCapacity—max_concurrent_taskspermits all held. Retryable; the grant is untouched (no FCALL was issued, soff_claim_resumed_executiondid not atomically consume the grant key).ScriptError::InvalidClaimGrant— grant missing, consumed, orworker_idmismatch.ScriptError::ClaimGrantExpired— grant TTL elapsed.ScriptError::NotAResumedExecution—attempt_stateis notattempt_interrupted.ScriptError::ExecutionNotLeaseable—lifecycle_phaseis notrunnable.ScriptError::ExecutionNotFound— core key missing.SdkError::Backend/SdkError::BackendContext— transport.
Sourcepub async fn deliver_signal(
&self,
execution_id: &ExecutionId,
waitpoint_id: &WaitpointId,
signal: Signal,
) -> Result<SignalOutcome, SdkError>
pub async fn deliver_signal( &self, execution_id: &ExecutionId, waitpoint_id: &WaitpointId, signal: Signal, ) -> Result<SignalOutcome, SdkError>
Deliver a signal to a suspended execution’s waitpoint.
The engine atomically records the signal, evaluates the resume condition,
and optionally transitions the execution from suspended to runnable.
Forwards through
EngineBackend::deliver_signal
— the trait-level trigger surface landed in issue #150.
Auto Trait Implementations§
impl Freeze for FlowFabricWorker
impl !RefUnwindSafe for FlowFabricWorker
impl Send for FlowFabricWorker
impl Sync for FlowFabricWorker
impl Unpin for FlowFabricWorker
impl UnsafeUnpin for FlowFabricWorker
impl !UnwindSafe for FlowFabricWorker
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more