ff_core/backend.rs
1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//! [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//! annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//! ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//! at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ReclaimGrant;
24use crate::types::{Namespace, TimestampMs, WaitpointToken};
25use std::collections::BTreeMap;
26use std::time::Duration;
27
28// ── §4.1 Handle trio ────────────────────────────────────────────────────
29
30/// Backend-tag discriminator embedded in every [`Handle`] so ops can
31/// dispatch to the correct backend implementation at runtime.
32///
33/// `#[non_exhaustive]`: new backend variants land additively as impls
34/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
35/// later).
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
37#[non_exhaustive]
38pub enum BackendTag {
39 /// The Valkey FCALL-backed implementation.
40 Valkey,
41}
42
43/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
44/// on entry to each op and return `EngineError::State` on mismatch.
45///
46/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
47/// `SuspendToken` type split (RFC-012 §4.1).
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
49#[non_exhaustive]
50pub enum HandleKind {
51 /// Fresh claim — returned by `claim` / `claim_from_grant`.
52 Fresh,
53 /// Resumed from reclaim — returned by `claim_from_reclaim`.
54 Resumed,
55 /// Suspended — returned by `suspend`. Terminal for the lease;
56 /// resumption mints a new Handle via `claim_from_reclaim`.
57 Suspended,
58}
59
60/// Backend-private opaque payload carried inside a [`Handle`].
61///
62/// Encodes backend-specific state (on Valkey: exec id, attempt id,
63/// lease id, lease epoch, capability binding, partition). Consumers do
64/// not construct or inspect the bytes — they are produced by the
65/// backend on claim/resume and consumed by the backend on each op.
66///
67/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
68/// public-type transitive dep on the `bytes` crate.
69#[derive(Clone, Debug, PartialEq, Eq, Hash)]
70pub struct HandleOpaque(Box<[u8]>);
71
72impl HandleOpaque {
73 /// Construct from backend-owned bytes. Only backend impls call this.
74 pub fn new(bytes: Box<[u8]>) -> Self {
75 Self(bytes)
76 }
77
78 /// Borrow the underlying bytes (backend-internal use).
79 pub fn as_bytes(&self) -> &[u8] {
80 &self.0
81 }
82}
83
84/// Opaque attempt cookie held by the worker for the duration of an
85/// attempt. Produced by `claim` / `claim_from_reclaim` / `suspend`;
86/// borrowed by every op (renew, progress, append_frame, complete, fail,
87/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
88///
89/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
90/// than consume so callers can retry after a transport error.
91#[derive(Clone, Debug, PartialEq, Eq, Hash)]
92#[non_exhaustive]
93pub struct Handle {
94 pub backend: BackendTag,
95 pub kind: HandleKind,
96 pub opaque: HandleOpaque,
97}
98
99impl Handle {
100 /// Construct a new Handle. Called by backend impls only; consumer
101 /// code receives Handles from `claim` / `suspend` / `claim_from_reclaim`.
102 pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
103 Self {
104 backend,
105 kind,
106 opaque,
107 }
108 }
109}
110
111// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
112
113/// Worker capability set — the tokens the worker advertises to the
114/// scheduler and to `claim`. Today stored as `Vec<String>` on
115/// `WorkerConfig`; promoted to a named newtype so the trait signatures
116/// can talk about capabilities without committing to a concrete
117/// container shape.
118///
119/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
120/// round-2 lean (newtype over `Vec<String>`).
121#[derive(Clone, Debug, PartialEq, Eq, Default)]
122#[non_exhaustive]
123pub struct CapabilitySet {
124 pub tokens: Vec<String>,
125}
126
127impl CapabilitySet {
128 /// Build from any iterable of string-like capability tokens.
129 pub fn new<I, S>(tokens: I) -> Self
130 where
131 I: IntoIterator<Item = S>,
132 S: Into<String>,
133 {
134 Self {
135 tokens: tokens.into_iter().map(Into::into).collect(),
136 }
137 }
138
139 /// True iff the set contains no tokens.
140 pub fn is_empty(&self) -> bool {
141 self.tokens.is_empty()
142 }
143}
144
145/// Policy hints for `claim`. Minimal at Stage 0 per RFC-012 §3.3.0
146/// ("Bikeshed-prone; keep minimal at Stage 0"). Future fields (retry
147/// count, fairness hints) land additively.
148#[derive(Clone, Debug, PartialEq, Eq, Default)]
149#[non_exhaustive]
150pub struct ClaimPolicy {
151 /// Maximum blocking wait. `None` means backend-default (today:
152 /// non-blocking / immediate return).
153 pub max_wait: Option<Duration>,
154}
155
156impl ClaimPolicy {
157 /// Zero-timeout claim (non-blocking). Matches today's SDK default.
158 pub fn immediate() -> Self {
159 Self { max_wait: None }
160 }
161
162 /// Claim with an explicit blocking bound.
163 pub fn with_max_wait(max_wait: Duration) -> Self {
164 Self {
165 max_wait: Some(max_wait),
166 }
167 }
168}
169
170/// Frame classification for `append_frame`. Mirrors the Lua-side
171/// `ff_append_frame` `frame_type` ARGV.
172#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
173#[non_exhaustive]
174pub enum FrameKind {
175 /// Operator-visible progress output / log line.
176 Stdout,
177 /// Operator-visible error / warning output.
178 Stderr,
179 /// Structured event (JSON payload).
180 Event,
181 /// Binary / opaque payload.
182 Blob,
183}
184
185/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
186/// Today's FCALL takes the byte payload + frame_type + optional seq as
187/// discrete ARGV; Stage 0 collects them into a named type for trait
188/// signatures.
189#[derive(Clone, Debug, PartialEq, Eq)]
190#[non_exhaustive]
191pub struct Frame {
192 pub bytes: Vec<u8>,
193 pub kind: FrameKind,
194 /// Optional monotonic sequence. Set by the caller when the stream
195 /// protocol is sequence-bound; `None` lets the backend assign.
196 pub seq: Option<u64>,
197}
198
199impl Frame {
200 /// Construct a frame. `seq` defaults to `None` (backend-assigned);
201 /// callers that need an explicit sequence use
202 /// [`Frame::with_seq`].
203 pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
204 Self {
205 bytes,
206 kind,
207 seq: None,
208 }
209 }
210
211 /// Construct a frame with an explicit monotonic sequence.
212 pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
213 Self {
214 bytes,
215 kind,
216 seq: Some(seq),
217 }
218 }
219}
220
221// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
222
223/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
224/// — signal name, correlation id, etc.).
225#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
226#[non_exhaustive]
227pub enum WaitpointKind {
228 /// Match by signal name.
229 SignalName,
230 /// Match by correlation id.
231 CorrelationId,
232 /// Generic external-signal waitpoint (external delivery).
233 External,
234}
235
236/// HMAC token that binds a waitpoint to its mint-time identity. Wire
237/// shape `kid:40hex`.
238///
239/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
240/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
241/// through automatically — no derived formatter can leak the raw
242/// digest when a [`WaitpointSpec`] is debug-printed via
243/// `tracing::debug!(spec=?spec)`.
244///
245/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
246/// naming the waitpoint-bound HMAC role explicitly.
247#[derive(Clone, PartialEq, Eq, Hash)]
248pub struct WaitpointHmac(WaitpointToken);
249
250impl WaitpointHmac {
251 pub fn new(token: impl Into<String>) -> Self {
252 Self(WaitpointToken::from(token.into()))
253 }
254
255 /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
256 /// redact — call sites that need the raw digest must explicitly
257 /// call [`WaitpointToken::as_str`].
258 pub fn token(&self) -> &WaitpointToken {
259 &self.0
260 }
261
262 /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
263 /// non-redacted call sites; this method exists only for transport
264 /// / FCALL ARGV construction where the raw wire bytes are required.
265 pub fn as_str(&self) -> &str {
266 self.0.as_str()
267 }
268}
269
270// Forward Debug / Display to the wrapped WaitpointToken so the
271// redaction guarantees on that type extend here. Derived Debug would
272// expose the raw string field and defeat the wrap.
273impl std::fmt::Debug for WaitpointHmac {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 write!(f, "WaitpointHmac({:?})", self.0)
276 }
277}
278
279/// One waitpoint inside a suspend request. `suspend` takes a
280/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
281/// the enclosing suspend args in the Phase-1 contract.
282#[derive(Clone, Debug, PartialEq, Eq)]
283#[non_exhaustive]
284pub struct WaitpointSpec {
285 pub kind: WaitpointKind,
286 pub matcher: Vec<u8>,
287 pub hmac_token: WaitpointHmac,
288}
289
290impl WaitpointSpec {
291 pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
292 Self {
293 kind,
294 matcher,
295 hmac_token,
296 }
297 }
298}
299
300// ── §3.3.0 Failure classification types ─────────────────────────────────
301
302/// Human-readable failure description + optional structured detail.
303/// Replaces today's ad-hoc string arg to `fail`.
304#[derive(Clone, Debug, PartialEq, Eq)]
305#[non_exhaustive]
306pub struct FailureReason {
307 pub message: String,
308 pub detail: Option<Vec<u8>>,
309}
310
311impl FailureReason {
312 pub fn new(message: impl Into<String>) -> Self {
313 Self {
314 message: message.into(),
315 detail: None,
316 }
317 }
318
319 pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
320 Self {
321 message: message.into(),
322 detail: Some(detail),
323 }
324 }
325}
326
327/// Failure classification — determines retry disposition on the Lua
328/// side. Mirrors the Lua-side classification codes.
329#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
330#[non_exhaustive]
331pub enum FailureClass {
332 /// Retryable transient error.
333 Transient,
334 /// Permanent error — no retries.
335 Permanent,
336 /// Crash / process death inferred from lease expiry.
337 InfraCrash,
338 /// Timeout at the attempt or operation level.
339 Timeout,
340 /// Cooperative cancellation by operator or cancel_flow.
341 Cancelled,
342}
343
344// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
345
346/// Usage report for `report_usage`. Mirrors today's
347/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
348/// dimensions.
349#[derive(Clone, Debug, PartialEq, Eq, Default)]
350#[non_exhaustive]
351pub struct UsageDimensions {
352 /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
353 pub input_tokens: u64,
354 /// Output tokens produced (LLM-shaped usage).
355 pub output_tokens: u64,
356 /// Wall-clock duration, in milliseconds, attributable to this
357 /// report. `None` for pure token-count reports.
358 pub wall_ms: Option<u64>,
359 /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
360 /// iteration order (important for dedup-key derivation on some
361 /// budget schemes).
362 pub custom: BTreeMap<String, u64>,
363}
364
365/// Admission outcome returned by `report_usage`.
366#[derive(Clone, Debug, PartialEq, Eq)]
367#[non_exhaustive]
368pub enum AdmissionDecision {
369 /// Usage accepted; caller may continue.
370 Admitted,
371 /// Rate-limited or concurrency-capped; retry after the suggested
372 /// interval.
373 Throttled { retry_after_ms: u64 },
374 /// Rejected outright — budget exhausted or policy violation.
375 Rejected { reason: String },
376}
377
378// ── §3.3.0 Reclaim / lease types ────────────────────────────────────────
379
380/// Opaque cookie returned by the reclaim scanner; consumed by
381/// `claim_from_reclaim` to mint a resumed Handle.
382///
383/// Wraps [`ReclaimGrant`] today (the scanner's existing product).
384/// Kept as a newtype so trait signatures name the reclaim-bound role
385/// explicitly and so the wrapped shape can evolve without breaking the
386/// trait.
387#[derive(Clone, Debug, PartialEq, Eq)]
388#[non_exhaustive]
389pub struct ReclaimToken {
390 pub grant: ReclaimGrant,
391}
392
393impl ReclaimToken {
394 pub fn new(grant: ReclaimGrant) -> Self {
395 Self { grant }
396 }
397}
398
399/// Result of a successful `renew` call.
400#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
401#[non_exhaustive]
402pub struct LeaseRenewal {
403 /// New lease expiry (monotonic ms).
404 pub expires_at_ms: u64,
405 /// Lease epoch after renewal. Monotonic non-decreasing.
406 pub lease_epoch: u64,
407}
408
409impl LeaseRenewal {
410 pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
411 Self {
412 expires_at_ms,
413 lease_epoch,
414 }
415 }
416}
417
418// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
419
420/// Cancel-flow policy — what to do with the flow's members. Today
421/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
422/// policy shape as a typed enum (RFC-012 §3.3.0).
423#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
424#[non_exhaustive]
425pub enum CancelFlowPolicy {
426 /// Cancel only the flow record; leave members alone.
427 FlowOnly,
428 /// Cancel the flow and every non-terminal member execution.
429 CancelAll,
430 /// Cancel the flow and every member currently in `Pending` /
431 /// `Blocked` / `Eligible` — leave `Running` executions alone to
432 /// drain.
433 CancelPending,
434}
435
436/// Caller wait posture for `cancel_flow`.
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
438#[non_exhaustive]
439pub enum CancelFlowWait {
440 /// Return after the flow-state flip; member cancellations dispatch
441 /// asynchronously.
442 NoWait,
443 /// Block until member cancellations complete, up to `timeout`.
444 WaitTimeout(Duration),
445 /// Block until member cancellations complete, no deadline.
446 WaitIndefinite,
447}
448
449// ── §3.3.0 Completion stream payload ────────────────────────────────────
450
451/// One completion event delivered through the `CompletionStream`
452/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
453/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
454///
455/// `flow_id` was added in issue #90 so DAG-dependency routing
456/// (`dispatch_dependency_resolution`) has the partition-routable flow
457/// handle without reparsing the Lua-emitted JSON downstream.
458/// `#[non_exhaustive]` keeps future field additions additive; use
459/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
460/// for construction.
461///
462/// `payload_bytes` / `produced_at_ms` are authorised but not yet
463/// populated by the Valkey Lua emitters — consumers on the
464/// `CompletionStream` read `execution_id` + `flow_id` today and must
465/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
466#[derive(Clone, Debug, PartialEq, Eq)]
467#[non_exhaustive]
468pub struct CompletionPayload {
469 pub execution_id: crate::types::ExecutionId,
470 pub outcome: String,
471 pub payload_bytes: Option<Vec<u8>>,
472 pub produced_at_ms: TimestampMs,
473 /// Flow handle for partition routing. Added in issue #90 (#90);
474 /// `None` for emitters that don't yet surface it.
475 pub flow_id: Option<crate::types::FlowId>,
476}
477
478impl CompletionPayload {
479 pub fn new(
480 execution_id: crate::types::ExecutionId,
481 outcome: impl Into<String>,
482 payload_bytes: Option<Vec<u8>>,
483 produced_at_ms: TimestampMs,
484 ) -> Self {
485 Self {
486 execution_id,
487 outcome: outcome.into(),
488 payload_bytes,
489 produced_at_ms,
490 flow_id: None,
491 }
492 }
493
494 /// Attach a flow handle to the payload. Additive builder so adding
495 /// `flow_id` didn't require a breaking change to [`Self::new`].
496 #[must_use]
497 pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
498 self.flow_id = Some(flow_id);
499 self
500 }
501}
502
503// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
504
505/// Signal that satisfied a waitpoint matcher and is therefore part of
506/// the reason an execution resumed. Returned by `observe_signals`
507/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
508///
509/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
510/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
511///
512/// Returned only for signals whose matcher slot in the waitpoint's
513/// resume condition is marked satisfied. Pre-buffered-but-unmatched
514/// signals are not included.
515///
516/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
517/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
518/// the Stage 0 crate move.
519#[derive(Clone, Debug, PartialEq, Eq)]
520pub struct ResumeSignal {
521 pub signal_id: crate::types::SignalId,
522 pub signal_name: String,
523 pub signal_category: String,
524 pub source_type: String,
525 pub source_identity: String,
526 pub correlation_id: String,
527 /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
528 /// accepted this signal. `0` if the stored `accepted_at` field is
529 /// missing or non-numeric (a Lua-side defect — not expected at
530 /// runtime).
531 pub accepted_at: TimestampMs,
532 /// Raw payload bytes, if the signal was delivered with one. `None`
533 /// for signals delivered without a payload.
534 pub payload: Option<Vec<u8>>,
535}
536
537// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
538
539/// Outcome of a `fail()` call.
540///
541/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
542/// `ff_core::backend::FailOutcome` so it is nameable by the
543/// `EngineBackend` trait signature. `ff_sdk::task` retains a
544/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
545///
546/// Not `#[non_exhaustive]` because existing consumers (ff-test,
547/// ff-readiness-tests) construct and match this enum exhaustively;
548/// the shape has been stable since the `fail()` API landed and any
549/// additive growth would be a follow-up RFC's deliberate break.
550#[derive(Clone, Debug, PartialEq, Eq)]
551pub enum FailOutcome {
552 /// Retry was scheduled — execution is in delayed backoff.
553 RetryScheduled {
554 delay_until: crate::types::TimestampMs,
555 },
556 /// No retries left — execution is terminal failed.
557 TerminalFailed,
558}
559
560// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
561
562/// Pool + keepalive timing shared across backend connections.
563///
564/// Fields are `#[non_exhaustive]` per project convention — connection
565/// tunables grow as new backends land. Default is the Phase-1 Valkey
566/// client's out-of-box shape (no explicit timeout, no explicit pool
567/// cap; inherits ferriskey defaults).
568#[derive(Clone, Debug, Default, PartialEq, Eq)]
569#[non_exhaustive]
570pub struct BackendTimeouts {
571 /// Per-request timeout. `None` ⇒ backend default.
572 pub request: Option<Duration>,
573 /// Idle-connection keepalive interval. `None` ⇒ backend default.
574 pub keepalive: Option<Duration>,
575}
576
577/// Retry policy shared across backend connections. Additive; Stage 1a
578/// ships the minimal shape so the trait signatures can reference it.
579#[derive(Clone, Debug, Default, PartialEq, Eq)]
580#[non_exhaustive]
581pub struct BackendRetry {
582 /// Max retry attempts on transient transport errors. `None` ⇒
583 /// backend default.
584 pub max_attempts: Option<u32>,
585 /// Base backoff. `None` ⇒ backend default.
586 pub base_backoff: Option<Duration>,
587}
588
589/// Valkey-specific connection parameters.
590#[derive(Clone, Debug, PartialEq, Eq)]
591#[non_exhaustive]
592pub struct ValkeyConnection {
593 /// Valkey hostname.
594 pub host: String,
595 /// Valkey port.
596 pub port: u16,
597 /// Enable TLS for the Valkey connection.
598 pub tls: bool,
599 /// Enable Valkey cluster mode.
600 pub cluster: bool,
601}
602
603impl ValkeyConnection {
604 pub fn new(host: impl Into<String>, port: u16) -> Self {
605 Self {
606 host: host.into(),
607 port,
608 tls: false,
609 cluster: false,
610 }
611 }
612}
613
614/// Discriminated union over per-backend connection shapes. Stage 1a
615/// ships the Valkey arm; future backends (Postgres) land additively
616/// under the `#[non_exhaustive]` guard.
617#[derive(Clone, Debug, PartialEq, Eq)]
618#[non_exhaustive]
619pub enum BackendConnection {
620 Valkey(ValkeyConnection),
621}
622
623/// Configuration passed to `ValkeyBackend::connect` (and, later, to
624/// other backend `connect` constructors). Carries the connection
625/// details + shared timing/retry policy. Replaces the Valkey-specific
626/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
627///
628/// `BackendConfig` is the replacement target for `WorkerConfig`'s
629/// `host` / `port` / `tls` / `cluster` fields. The full migration
630/// lands across Stage 1a (type introduction) and Stage 1c
631/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
632/// claim poll interval, capability set) stay on `WorkerConfig`.
633#[derive(Clone, Debug, PartialEq, Eq)]
634#[non_exhaustive]
635pub struct BackendConfig {
636 pub connection: BackendConnection,
637 pub timeouts: BackendTimeouts,
638 pub retry: BackendRetry,
639}
640
641impl BackendConfig {
642 /// Build a Valkey BackendConfig from host+port. Other fields take
643 /// backend defaults.
644 pub fn valkey(host: impl Into<String>, port: u16) -> Self {
645 Self {
646 connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
647 timeouts: BackendTimeouts::default(),
648 retry: BackendRetry::default(),
649 }
650 }
651}
652
653// ── Issue #122: ScannerFilter ───────────────────────────────────────────
654
655/// Per-consumer filter applied by FlowFabric's background scanners and
656/// completion subscribers so multiple FlowFabric instances sharing a
657/// single Valkey keyspace can operate on disjoint subsets of
658/// executions without mutual interference (issue #122).
659///
660/// Sibling of [`CompletionPayload`]: the former is a scan *output*
661/// shape, this is the *input* predicate scanners and completion
662/// subscribers consult per candidate.
663///
664/// # Fields & backing storage
665///
666/// * `namespace` — matches against the `namespace` field on
667/// `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
668/// one HGET per candidate when set.
669/// * `instance_tag` — matches against an entry in the execution's
670/// user-supplied tags hash at the canonical key
671/// **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
672/// hash-tag, e.g. `{fp:42}`). Written by the Lua function
673/// `ff_create_execution` (see `lua/execution.lua`) and
674/// `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
675/// the HGET targets `tag_key` on the tags hash and compares the
676/// returned string (if any) byte-for-byte against `tag_value`.
677/// Cost: one HGET per candidate when set.
678///
679/// When both are set, scanners check `namespace` first (short-circuit
680/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
681/// candidate.
682///
683/// # Semantics
684///
685/// * [`Self::is_noop`] returns true when both fields are `None` —
686/// the filter accepts every candidate. Used by the
687/// `subscribe_completions_filtered` default body to fall back to
688/// the unfiltered subscription.
689/// * [`Self::matches`] is the tight in-memory predicate once the
690/// HGET values have been fetched. Scanners fetch the fields
691/// lazily (namespace first) and pass the results in; the helper
692/// returns false as soon as one component mismatches.
693///
694/// # Scope
695///
696/// Today the filter is consulted by execution-shaped scanners
697/// (lease_expiry, attempt_timeout, execution_deadline,
698/// suspension_timeout, pending_wp_expiry, delayed_promoter,
699/// dependency_reconciler, cancel_reconciler, unblock,
700/// index_reconciler, retention_trimmer) and by completion
701/// subscribers. Non-execution scanners (budget_reconciler,
702/// budget_reset, quota_reconciler, flow_projector) accept a filter
703/// for API uniformity but do not apply it — their iteration domains
704/// (budgets, quotas, flows) are not keyed by the
705/// per-execution namespace / instance_tag shape.
706///
707/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
708/// `worker_instance`) can land additively.
709#[derive(Clone, Debug, Default, PartialEq, Eq)]
710#[non_exhaustive]
711pub struct ScannerFilter {
712 /// Tenant / workspace scope. Matches against the `namespace`
713 /// field on `exec_core`.
714 pub namespace: Option<Namespace>,
715 /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
716 /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
717 /// written by `ff_create_execution`).
718 pub instance_tag: Option<(String, String)>,
719}
720
721impl ScannerFilter {
722 /// Shared no-op filter — useful as the default for the
723 /// `Scanner::filter()` trait method so implementors that don't
724 /// override can hand back a `&'static` reference without
725 /// allocating per call.
726 pub const NOOP: Self = Self {
727 namespace: None,
728 instance_tag: None,
729 };
730
731 /// Create an empty filter (equivalent to [`Self::NOOP`]).
732 ///
733 /// Provided for external-crate consumers: `ScannerFilter` is
734 /// `#[non_exhaustive]`, so struct-literal and functional-update
735 /// construction are unavailable across crate boundaries. Start
736 /// from `new()` and chain `with_*` setters to build a filter.
737 pub fn new() -> Self {
738 Self::default()
739 }
740
741 /// Set the tenant-scope namespace filter dimension. Consumes
742 /// and returns `self` for chaining.
743 pub fn with_namespace(mut self, ns: Namespace) -> Self {
744 self.namespace = Some(ns);
745 self
746 }
747
748 /// Set the exact-match exec-tag filter dimension. Consumes and
749 /// returns `self` for chaining. At filter time, scanners read
750 /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
751 /// `key` byte-for-byte against `value`.
752 pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
753 self.instance_tag = Some((key.into(), value.into()));
754 self
755 }
756
757 /// True iff the filter has no dimensions set — every candidate
758 /// passes. Callers use this to short-circuit filtered subscribe
759 /// paths back to the unfiltered ones.
760 pub fn is_noop(&self) -> bool {
761 self.namespace.is_none() && self.instance_tag.is_none()
762 }
763
764 /// Post-HGET in-memory match check.
765 ///
766 /// `core_namespace` should be the `namespace` field read from
767 /// `exec_core` (None if the HGET returned nil / was skipped).
768 /// `tag_value` should be the HGET result for the configured
769 /// `instance_tag.0` key on the execution's tags hash (None if
770 /// the HGET returned nil / was skipped).
771 ///
772 /// When a filter dimension is `None` the corresponding argument
773 /// is ignored — callers may pass `None` to skip the HGET and
774 /// save a round-trip.
775 pub fn matches(
776 &self,
777 core_namespace: Option<&Namespace>,
778 tag_value: Option<&str>,
779 ) -> bool {
780 if let Some(ref want) = self.namespace {
781 match core_namespace {
782 Some(have) if have == want => {}
783 _ => return false,
784 }
785 }
786 if let Some((_, ref want_value)) = self.instance_tag {
787 match tag_value {
788 Some(have) if have == want_value.as_str() => {}
789 _ => return false,
790 }
791 }
792 true
793 }
794}
795
796// ── Tests ───────────────────────────────────────────────────────────────
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801 use crate::partition::{Partition, PartitionFamily};
802 use crate::types::{ExecutionId, LaneId, SignalId};
803
804 #[test]
805 fn backend_tag_derives() {
806 let a = BackendTag::Valkey;
807 let b = a;
808 assert_eq!(a, b);
809 assert_eq!(format!("{a:?}"), "Valkey");
810 }
811
812 #[test]
813 fn handle_kind_derives() {
814 for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
815 let c = k;
816 assert_eq!(k, c);
817 // Debug formatter reachable
818 let _ = format!("{k:?}");
819 }
820 }
821
822 #[test]
823 fn handle_opaque_roundtrips() {
824 let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
825 let o = HandleOpaque::new(bytes.clone());
826 assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
827 assert_eq!(o, o.clone());
828 let _ = format!("{o:?}");
829 }
830
831 #[test]
832 fn handle_composes() {
833 let h = Handle::new(
834 BackendTag::Valkey,
835 HandleKind::Fresh,
836 HandleOpaque::new(Box::new([0u8; 4])),
837 );
838 assert_eq!(h.backend, BackendTag::Valkey);
839 assert_eq!(h.kind, HandleKind::Fresh);
840 assert_eq!(h.clone(), h);
841 }
842
843 #[test]
844 fn capability_set_derives() {
845 let c1 = CapabilitySet::new(["gpu", "cuda"]);
846 let c2 = CapabilitySet::new(["gpu", "cuda"]);
847 assert_eq!(c1, c2);
848 assert!(!c1.is_empty());
849 assert!(CapabilitySet::default().is_empty());
850 let _ = format!("{c1:?}");
851 }
852
853 #[test]
854 fn claim_policy_derives() {
855 let p = ClaimPolicy::with_max_wait(Duration::from_millis(500));
856 assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
857 assert_eq!(p.clone(), p);
858 assert_eq!(ClaimPolicy::immediate(), ClaimPolicy::default());
859 }
860
861 #[test]
862 fn frame_and_kind_derive() {
863 let f = Frame {
864 bytes: b"hello".to_vec(),
865 kind: FrameKind::Stdout,
866 seq: Some(3),
867 };
868 assert_eq!(f.clone(), f);
869 assert_eq!(f.kind, FrameKind::Stdout);
870 assert_ne!(FrameKind::Stderr, FrameKind::Event);
871 let _ = format!("{f:?}");
872 }
873
874 #[test]
875 fn waitpoint_spec_derives() {
876 let spec = WaitpointSpec {
877 kind: WaitpointKind::SignalName,
878 matcher: b"approved".to_vec(),
879 hmac_token: WaitpointHmac::new("kid1:deadbeef"),
880 };
881 assert_eq!(spec.clone(), spec);
882 assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
883 assert_eq!(
884 WaitpointHmac::new("a"),
885 WaitpointHmac::new(String::from("a"))
886 );
887 }
888
889 #[test]
890 fn failure_reason_and_class() {
891 let r1 = FailureReason::new("boom");
892 let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
893 assert_eq!(r1.message, "boom");
894 assert!(r1.detail.is_none());
895 assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
896 assert_eq!(r1.clone(), r1);
897 assert_ne!(FailureClass::Transient, FailureClass::Permanent);
898 }
899
900 #[test]
901 fn usage_dimensions_default_and_eq() {
902 let u = UsageDimensions {
903 input_tokens: 10,
904 output_tokens: 20,
905 wall_ms: Some(150),
906 custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
907 };
908 assert_eq!(u.clone(), u);
909 assert_eq!(UsageDimensions::default().input_tokens, 0);
910 }
911
912 #[test]
913 fn admission_decision_derives() {
914 let a = AdmissionDecision::Admitted;
915 let t = AdmissionDecision::Throttled { retry_after_ms: 25 };
916 let r = AdmissionDecision::Rejected {
917 reason: "quota".into(),
918 };
919 assert_eq!(a.clone(), a);
920 assert_eq!(t.clone(), t);
921 assert_ne!(a, r);
922 }
923
924 #[test]
925 fn reclaim_token_wraps_grant() {
926 let grant = ReclaimGrant {
927 execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
928 partition_key: crate::partition::PartitionKey::from(&Partition {
929 family: PartitionFamily::Flow,
930 index: 0,
931 }),
932 grant_key: "gkey".into(),
933 expires_at_ms: 123,
934 lane_id: LaneId::new("default"),
935 };
936 let t = ReclaimToken::new(grant.clone());
937 assert_eq!(t.grant, grant);
938 assert_eq!(t.clone(), t);
939 }
940
941 #[test]
942 fn lease_renewal_is_copy() {
943 let r = LeaseRenewal {
944 expires_at_ms: 100,
945 lease_epoch: 2,
946 };
947 let s = r; // Copy
948 assert_eq!(r, s);
949 }
950
951 #[test]
952 fn cancel_flow_policy_and_wait() {
953 assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
954 let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
955 assert_eq!(w, w);
956 assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
957 }
958
959 #[test]
960 fn completion_payload_derives() {
961 let c = CompletionPayload {
962 execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
963 outcome: "success".into(),
964 payload_bytes: Some(b"ok".to_vec()),
965 produced_at_ms: TimestampMs::from_millis(1234),
966 flow_id: None,
967 };
968 assert_eq!(c.clone(), c);
969 let _ = format!("{c:?}");
970 }
971
972 #[test]
973 fn resume_signal_moved_and_derives() {
974 let s = ResumeSignal {
975 signal_id: SignalId::new(),
976 signal_name: "approve".into(),
977 signal_category: "decision".into(),
978 source_type: "user".into(),
979 source_identity: "u1".into(),
980 correlation_id: "c1".into(),
981 accepted_at: TimestampMs::from_millis(10),
982 payload: None,
983 };
984 assert_eq!(s.clone(), s);
985 let _ = format!("{s:?}");
986 }
987
988 #[test]
989 fn fail_outcome_variants() {
990 let retry = FailOutcome::RetryScheduled {
991 delay_until: TimestampMs::from_millis(42),
992 };
993 let terminal = FailOutcome::TerminalFailed;
994 assert_ne!(retry, terminal);
995 assert_eq!(retry.clone(), retry);
996 }
997
998 #[test]
999 fn scanner_filter_noop_and_default() {
1000 let f = ScannerFilter::default();
1001 assert!(f.is_noop());
1002 assert_eq!(f, ScannerFilter::NOOP);
1003 // A no-op filter matches any candidate, including ones that
1004 // produced no HGET results.
1005 assert!(f.matches(None, None));
1006 assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1007 }
1008
1009 #[test]
1010 fn scanner_filter_namespace_match() {
1011 let f = ScannerFilter {
1012 namespace: Some(Namespace::new("tenant-a")),
1013 instance_tag: None,
1014 };
1015 assert!(!f.is_noop());
1016 assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1017 assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1018 // Missing core namespace ⇒ no match.
1019 assert!(!f.matches(None, None));
1020 }
1021
1022 #[test]
1023 fn scanner_filter_instance_tag_match() {
1024 let f = ScannerFilter {
1025 namespace: None,
1026 instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1027 };
1028 assert!(f.matches(None, Some("i-1")));
1029 assert!(!f.matches(None, Some("i-2")));
1030 assert!(!f.matches(None, None));
1031 }
1032
1033 #[test]
1034 fn scanner_filter_both_dimensions() {
1035 let f = ScannerFilter {
1036 namespace: Some(Namespace::new("tenant-a")),
1037 instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1038 };
1039 assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1040 assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1041 assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1042 assert!(!f.matches(None, Some("i-1")));
1043 }
1044
1045 #[test]
1046 fn scanner_filter_builder_construction() {
1047 // Simulates external-crate usage: only public constructors
1048 // and chainable setters (no struct-literal access to the
1049 // `#[non_exhaustive]` fields).
1050 let empty = ScannerFilter::new();
1051 assert!(empty.is_noop());
1052 assert_eq!(empty, ScannerFilter::NOOP);
1053
1054 let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1055 assert!(!ns_only.is_noop());
1056 assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1057
1058 let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1059 assert!(!tag_only.is_noop());
1060 assert!(tag_only.matches(None, Some("i-1")));
1061
1062 let both = ScannerFilter::new()
1063 .with_namespace(Namespace::new("tenant-a"))
1064 .with_instance_tag("cairn.instance_id", "i-1");
1065 assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1066 assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1067 }
1068
1069 #[test]
1070 fn backend_config_valkey_ctor() {
1071 let c = BackendConfig::valkey("host.local", 6379);
1072 // Same-crate match against an otherwise `#[non_exhaustive]`
1073 // enum is irrefutable — no wildcard needed and `let-else`
1074 // would trip `irrefutable_let_patterns`.
1075 let BackendConnection::Valkey(v) = &c.connection;
1076 assert_eq!(v.host, "host.local");
1077 assert_eq!(v.port, 6379);
1078 assert!(!v.tls);
1079 assert!(!v.cluster);
1080 assert_eq!(c.timeouts, BackendTimeouts::default());
1081 assert_eq!(c.retry, BackendRetry::default());
1082 assert_eq!(c.clone(), c);
1083 }
1084}