Skip to main content

ff_core/
backend.rs

1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//!   [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//!   annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//!   ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//!   at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ReclaimGrant;
24use crate::types::{Namespace, TimestampMs, WaitpointToken};
25use std::collections::BTreeMap;
26use std::time::Duration;
27
28// ── §4.1 Handle trio ────────────────────────────────────────────────────
29
30/// Backend-tag discriminator embedded in every [`Handle`] so ops can
31/// dispatch to the correct backend implementation at runtime.
32///
33/// `#[non_exhaustive]`: new backend variants land additively as impls
34/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
35/// later).
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
37#[non_exhaustive]
38pub enum BackendTag {
39    /// The Valkey FCALL-backed implementation.
40    Valkey,
41}
42
43/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
44/// on entry to each op and return `EngineError::State` on mismatch.
45///
46/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
47/// `SuspendToken` type split (RFC-012 §4.1).
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
49#[non_exhaustive]
50pub enum HandleKind {
51    /// Fresh claim — returned by `claim` / `claim_from_grant`.
52    Fresh,
53    /// Resumed from reclaim — returned by `claim_from_reclaim`.
54    Resumed,
55    /// Suspended — returned by `suspend`. Terminal for the lease;
56    /// resumption mints a new Handle via `claim_from_reclaim`.
57    Suspended,
58}
59
60/// Backend-private opaque payload carried inside a [`Handle`].
61///
62/// Encodes backend-specific state (on Valkey: exec id, attempt id,
63/// lease id, lease epoch, capability binding, partition). Consumers do
64/// not construct or inspect the bytes — they are produced by the
65/// backend on claim/resume and consumed by the backend on each op.
66///
67/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
68/// public-type transitive dep on the `bytes` crate.
69#[derive(Clone, Debug, PartialEq, Eq, Hash)]
70pub struct HandleOpaque(Box<[u8]>);
71
72impl HandleOpaque {
73    /// Construct from backend-owned bytes. Only backend impls call this.
74    pub fn new(bytes: Box<[u8]>) -> Self {
75        Self(bytes)
76    }
77
78    /// Borrow the underlying bytes (backend-internal use).
79    pub fn as_bytes(&self) -> &[u8] {
80        &self.0
81    }
82}
83
84/// Opaque attempt cookie held by the worker for the duration of an
85/// attempt. Produced by `claim` / `claim_from_reclaim` / `suspend`;
86/// borrowed by every op (renew, progress, append_frame, complete, fail,
87/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
88///
89/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
90/// than consume so callers can retry after a transport error.
91#[derive(Clone, Debug, PartialEq, Eq, Hash)]
92#[non_exhaustive]
93pub struct Handle {
94    pub backend: BackendTag,
95    pub kind: HandleKind,
96    pub opaque: HandleOpaque,
97}
98
99impl Handle {
100    /// Construct a new Handle. Called by backend impls only; consumer
101    /// code receives Handles from `claim` / `suspend` / `claim_from_reclaim`.
102    pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
103        Self {
104            backend,
105            kind,
106            opaque,
107        }
108    }
109}
110
111// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
112
113/// Worker capability set — the tokens the worker advertises to the
114/// scheduler and to `claim`. Today stored as `Vec<String>` on
115/// `WorkerConfig`; promoted to a named newtype so the trait signatures
116/// can talk about capabilities without committing to a concrete
117/// container shape.
118///
119/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
120/// round-2 lean (newtype over `Vec<String>`).
121#[derive(Clone, Debug, PartialEq, Eq, Default)]
122#[non_exhaustive]
123pub struct CapabilitySet {
124    pub tokens: Vec<String>,
125}
126
127impl CapabilitySet {
128    /// Build from any iterable of string-like capability tokens.
129    pub fn new<I, S>(tokens: I) -> Self
130    where
131        I: IntoIterator<Item = S>,
132        S: Into<String>,
133    {
134        Self {
135            tokens: tokens.into_iter().map(Into::into).collect(),
136        }
137    }
138
139    /// True iff the set contains no tokens.
140    pub fn is_empty(&self) -> bool {
141        self.tokens.is_empty()
142    }
143}
144
145/// Policy hints for `claim`. Minimal at Stage 0 per RFC-012 §3.3.0
146/// ("Bikeshed-prone; keep minimal at Stage 0"). Future fields (retry
147/// count, fairness hints) land additively.
148#[derive(Clone, Debug, PartialEq, Eq, Default)]
149#[non_exhaustive]
150pub struct ClaimPolicy {
151    /// Maximum blocking wait. `None` means backend-default (today:
152    /// non-blocking / immediate return).
153    pub max_wait: Option<Duration>,
154}
155
156impl ClaimPolicy {
157    /// Zero-timeout claim (non-blocking). Matches today's SDK default.
158    pub fn immediate() -> Self {
159        Self { max_wait: None }
160    }
161
162    /// Claim with an explicit blocking bound.
163    pub fn with_max_wait(max_wait: Duration) -> Self {
164        Self {
165            max_wait: Some(max_wait),
166        }
167    }
168}
169
170/// Frame classification for `append_frame`. Mirrors the Lua-side
171/// `ff_append_frame` `frame_type` ARGV.
172#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
173#[non_exhaustive]
174pub enum FrameKind {
175    /// Operator-visible progress output / log line.
176    Stdout,
177    /// Operator-visible error / warning output.
178    Stderr,
179    /// Structured event (JSON payload).
180    Event,
181    /// Binary / opaque payload.
182    Blob,
183}
184
185/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
186/// Today's FCALL takes the byte payload + frame_type + optional seq as
187/// discrete ARGV; Stage 0 collects them into a named type for trait
188/// signatures.
189#[derive(Clone, Debug, PartialEq, Eq)]
190#[non_exhaustive]
191pub struct Frame {
192    pub bytes: Vec<u8>,
193    pub kind: FrameKind,
194    /// Optional monotonic sequence. Set by the caller when the stream
195    /// protocol is sequence-bound; `None` lets the backend assign.
196    pub seq: Option<u64>,
197}
198
199impl Frame {
200    /// Construct a frame. `seq` defaults to `None` (backend-assigned);
201    /// callers that need an explicit sequence use
202    /// [`Frame::with_seq`].
203    pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
204        Self {
205            bytes,
206            kind,
207            seq: None,
208        }
209    }
210
211    /// Construct a frame with an explicit monotonic sequence.
212    pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
213        Self {
214            bytes,
215            kind,
216            seq: Some(seq),
217        }
218    }
219}
220
221// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
222
223/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
224/// — signal name, correlation id, etc.).
225#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
226#[non_exhaustive]
227pub enum WaitpointKind {
228    /// Match by signal name.
229    SignalName,
230    /// Match by correlation id.
231    CorrelationId,
232    /// Generic external-signal waitpoint (external delivery).
233    External,
234}
235
236/// HMAC token that binds a waitpoint to its mint-time identity. Wire
237/// shape `kid:40hex`.
238///
239/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
240/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
241/// through automatically — no derived formatter can leak the raw
242/// digest when a [`WaitpointSpec`] is debug-printed via
243/// `tracing::debug!(spec=?spec)`.
244///
245/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
246/// naming the waitpoint-bound HMAC role explicitly.
247#[derive(Clone, PartialEq, Eq, Hash)]
248pub struct WaitpointHmac(WaitpointToken);
249
250impl WaitpointHmac {
251    pub fn new(token: impl Into<String>) -> Self {
252        Self(WaitpointToken::from(token.into()))
253    }
254
255    /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
256    /// redact — call sites that need the raw digest must explicitly
257    /// call [`WaitpointToken::as_str`].
258    pub fn token(&self) -> &WaitpointToken {
259        &self.0
260    }
261
262    /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
263    /// non-redacted call sites; this method exists only for transport
264    /// / FCALL ARGV construction where the raw wire bytes are required.
265    pub fn as_str(&self) -> &str {
266        self.0.as_str()
267    }
268}
269
270// Forward Debug / Display to the wrapped WaitpointToken so the
271// redaction guarantees on that type extend here. Derived Debug would
272// expose the raw string field and defeat the wrap.
273impl std::fmt::Debug for WaitpointHmac {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        write!(f, "WaitpointHmac({:?})", self.0)
276    }
277}
278
279/// One waitpoint inside a suspend request. `suspend` takes a
280/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
281/// the enclosing suspend args in the Phase-1 contract.
282#[derive(Clone, Debug, PartialEq, Eq)]
283#[non_exhaustive]
284pub struct WaitpointSpec {
285    pub kind: WaitpointKind,
286    pub matcher: Vec<u8>,
287    pub hmac_token: WaitpointHmac,
288}
289
290impl WaitpointSpec {
291    pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
292        Self {
293            kind,
294            matcher,
295            hmac_token,
296        }
297    }
298}
299
300// ── §3.3.0 Failure classification types ─────────────────────────────────
301
302/// Human-readable failure description + optional structured detail.
303/// Replaces today's ad-hoc string arg to `fail`.
304#[derive(Clone, Debug, PartialEq, Eq)]
305#[non_exhaustive]
306pub struct FailureReason {
307    pub message: String,
308    pub detail: Option<Vec<u8>>,
309}
310
311impl FailureReason {
312    pub fn new(message: impl Into<String>) -> Self {
313        Self {
314            message: message.into(),
315            detail: None,
316        }
317    }
318
319    pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
320        Self {
321            message: message.into(),
322            detail: Some(detail),
323        }
324    }
325}
326
327/// Failure classification — determines retry disposition on the Lua
328/// side. Mirrors the Lua-side classification codes.
329#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
330#[non_exhaustive]
331pub enum FailureClass {
332    /// Retryable transient error.
333    Transient,
334    /// Permanent error — no retries.
335    Permanent,
336    /// Crash / process death inferred from lease expiry.
337    InfraCrash,
338    /// Timeout at the attempt or operation level.
339    Timeout,
340    /// Cooperative cancellation by operator or cancel_flow.
341    Cancelled,
342}
343
344// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
345
346/// Usage report for `report_usage`. Mirrors today's
347/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
348/// dimensions.
349#[derive(Clone, Debug, PartialEq, Eq, Default)]
350#[non_exhaustive]
351pub struct UsageDimensions {
352    /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
353    pub input_tokens: u64,
354    /// Output tokens produced (LLM-shaped usage).
355    pub output_tokens: u64,
356    /// Wall-clock duration, in milliseconds, attributable to this
357    /// report. `None` for pure token-count reports.
358    pub wall_ms: Option<u64>,
359    /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
360    /// iteration order (important for dedup-key derivation on some
361    /// budget schemes).
362    pub custom: BTreeMap<String, u64>,
363}
364
365/// Admission outcome returned by `report_usage`.
366#[derive(Clone, Debug, PartialEq, Eq)]
367#[non_exhaustive]
368pub enum AdmissionDecision {
369    /// Usage accepted; caller may continue.
370    Admitted,
371    /// Rate-limited or concurrency-capped; retry after the suggested
372    /// interval.
373    Throttled { retry_after_ms: u64 },
374    /// Rejected outright — budget exhausted or policy violation.
375    Rejected { reason: String },
376}
377
378// ── §3.3.0 Reclaim / lease types ────────────────────────────────────────
379
380/// Opaque cookie returned by the reclaim scanner; consumed by
381/// `claim_from_reclaim` to mint a resumed Handle.
382///
383/// Wraps [`ReclaimGrant`] today (the scanner's existing product).
384/// Kept as a newtype so trait signatures name the reclaim-bound role
385/// explicitly and so the wrapped shape can evolve without breaking the
386/// trait.
387#[derive(Clone, Debug, PartialEq, Eq)]
388#[non_exhaustive]
389pub struct ReclaimToken {
390    pub grant: ReclaimGrant,
391}
392
393impl ReclaimToken {
394    pub fn new(grant: ReclaimGrant) -> Self {
395        Self { grant }
396    }
397}
398
399/// Result of a successful `renew` call.
400#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
401#[non_exhaustive]
402pub struct LeaseRenewal {
403    /// New lease expiry (monotonic ms).
404    pub expires_at_ms: u64,
405    /// Lease epoch after renewal. Monotonic non-decreasing.
406    pub lease_epoch: u64,
407}
408
409impl LeaseRenewal {
410    pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
411        Self {
412            expires_at_ms,
413            lease_epoch,
414        }
415    }
416}
417
418// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
419
420/// Cancel-flow policy — what to do with the flow's members. Today
421/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
422/// policy shape as a typed enum (RFC-012 §3.3.0).
423#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
424#[non_exhaustive]
425pub enum CancelFlowPolicy {
426    /// Cancel only the flow record; leave members alone.
427    FlowOnly,
428    /// Cancel the flow and every non-terminal member execution.
429    CancelAll,
430    /// Cancel the flow and every member currently in `Pending` /
431    /// `Blocked` / `Eligible` — leave `Running` executions alone to
432    /// drain.
433    CancelPending,
434}
435
436/// Caller wait posture for `cancel_flow`.
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
438#[non_exhaustive]
439pub enum CancelFlowWait {
440    /// Return after the flow-state flip; member cancellations dispatch
441    /// asynchronously.
442    NoWait,
443    /// Block until member cancellations complete, up to `timeout`.
444    WaitTimeout(Duration),
445    /// Block until member cancellations complete, no deadline.
446    WaitIndefinite,
447}
448
449// ── §3.3.0 Completion stream payload ────────────────────────────────────
450
451/// One completion event delivered through the `CompletionStream`
452/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
453/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
454///
455/// `flow_id` was added in issue #90 so DAG-dependency routing
456/// (`dispatch_dependency_resolution`) has the partition-routable flow
457/// handle without reparsing the Lua-emitted JSON downstream.
458/// `#[non_exhaustive]` keeps future field additions additive; use
459/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
460/// for construction.
461///
462/// `payload_bytes` / `produced_at_ms` are authorised but not yet
463/// populated by the Valkey Lua emitters — consumers on the
464/// `CompletionStream` read `execution_id` + `flow_id` today and must
465/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
466#[derive(Clone, Debug, PartialEq, Eq)]
467#[non_exhaustive]
468pub struct CompletionPayload {
469    pub execution_id: crate::types::ExecutionId,
470    pub outcome: String,
471    pub payload_bytes: Option<Vec<u8>>,
472    pub produced_at_ms: TimestampMs,
473    /// Flow handle for partition routing. Added in issue #90 (#90);
474    /// `None` for emitters that don't yet surface it.
475    pub flow_id: Option<crate::types::FlowId>,
476}
477
478impl CompletionPayload {
479    pub fn new(
480        execution_id: crate::types::ExecutionId,
481        outcome: impl Into<String>,
482        payload_bytes: Option<Vec<u8>>,
483        produced_at_ms: TimestampMs,
484    ) -> Self {
485        Self {
486            execution_id,
487            outcome: outcome.into(),
488            payload_bytes,
489            produced_at_ms,
490            flow_id: None,
491        }
492    }
493
494    /// Attach a flow handle to the payload. Additive builder so adding
495    /// `flow_id` didn't require a breaking change to [`Self::new`].
496    #[must_use]
497    pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
498        self.flow_id = Some(flow_id);
499        self
500    }
501}
502
503// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
504
505/// Signal that satisfied a waitpoint matcher and is therefore part of
506/// the reason an execution resumed. Returned by `observe_signals`
507/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
508///
509/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
510/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
511///
512/// Returned only for signals whose matcher slot in the waitpoint's
513/// resume condition is marked satisfied. Pre-buffered-but-unmatched
514/// signals are not included.
515///
516/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
517/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
518/// the Stage 0 crate move.
519#[derive(Clone, Debug, PartialEq, Eq)]
520pub struct ResumeSignal {
521    pub signal_id: crate::types::SignalId,
522    pub signal_name: String,
523    pub signal_category: String,
524    pub source_type: String,
525    pub source_identity: String,
526    pub correlation_id: String,
527    /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
528    /// accepted this signal. `0` if the stored `accepted_at` field is
529    /// missing or non-numeric (a Lua-side defect — not expected at
530    /// runtime).
531    pub accepted_at: TimestampMs,
532    /// Raw payload bytes, if the signal was delivered with one. `None`
533    /// for signals delivered without a payload.
534    pub payload: Option<Vec<u8>>,
535}
536
537// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
538
539/// Outcome of a `fail()` call.
540///
541/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
542/// `ff_core::backend::FailOutcome` so it is nameable by the
543/// `EngineBackend` trait signature. `ff_sdk::task` retains a
544/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
545///
546/// Not `#[non_exhaustive]` because existing consumers (ff-test,
547/// ff-readiness-tests) construct and match this enum exhaustively;
548/// the shape has been stable since the `fail()` API landed and any
549/// additive growth would be a follow-up RFC's deliberate break.
550#[derive(Clone, Debug, PartialEq, Eq)]
551pub enum FailOutcome {
552    /// Retry was scheduled — execution is in delayed backoff.
553    RetryScheduled {
554        delay_until: crate::types::TimestampMs,
555    },
556    /// No retries left — execution is terminal failed.
557    TerminalFailed,
558}
559
560// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
561
562/// Pool + keepalive timing shared across backend connections.
563///
564/// Fields are `#[non_exhaustive]` per project convention — connection
565/// tunables grow as new backends land. Default is the Phase-1 Valkey
566/// client's out-of-box shape (no explicit timeout, no explicit pool
567/// cap; inherits ferriskey defaults).
568#[derive(Clone, Debug, Default, PartialEq, Eq)]
569#[non_exhaustive]
570pub struct BackendTimeouts {
571    /// Per-request timeout. `None` ⇒ backend default.
572    pub request: Option<Duration>,
573    /// Idle-connection keepalive interval. `None` ⇒ backend default.
574    pub keepalive: Option<Duration>,
575}
576
577/// Retry policy shared across backend connections. Additive; Stage 1a
578/// ships the minimal shape so the trait signatures can reference it.
579#[derive(Clone, Debug, Default, PartialEq, Eq)]
580#[non_exhaustive]
581pub struct BackendRetry {
582    /// Max retry attempts on transient transport errors. `None` ⇒
583    /// backend default.
584    pub max_attempts: Option<u32>,
585    /// Base backoff. `None` ⇒ backend default.
586    pub base_backoff: Option<Duration>,
587}
588
589/// Valkey-specific connection parameters.
590#[derive(Clone, Debug, PartialEq, Eq)]
591#[non_exhaustive]
592pub struct ValkeyConnection {
593    /// Valkey hostname.
594    pub host: String,
595    /// Valkey port.
596    pub port: u16,
597    /// Enable TLS for the Valkey connection.
598    pub tls: bool,
599    /// Enable Valkey cluster mode.
600    pub cluster: bool,
601}
602
603impl ValkeyConnection {
604    pub fn new(host: impl Into<String>, port: u16) -> Self {
605        Self {
606            host: host.into(),
607            port,
608            tls: false,
609            cluster: false,
610        }
611    }
612}
613
614/// Discriminated union over per-backend connection shapes. Stage 1a
615/// ships the Valkey arm; future backends (Postgres) land additively
616/// under the `#[non_exhaustive]` guard.
617#[derive(Clone, Debug, PartialEq, Eq)]
618#[non_exhaustive]
619pub enum BackendConnection {
620    Valkey(ValkeyConnection),
621}
622
623/// Configuration passed to `ValkeyBackend::connect` (and, later, to
624/// other backend `connect` constructors). Carries the connection
625/// details + shared timing/retry policy. Replaces the Valkey-specific
626/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
627///
628/// `BackendConfig` is the replacement target for `WorkerConfig`'s
629/// `host` / `port` / `tls` / `cluster` fields. The full migration
630/// lands across Stage 1a (type introduction) and Stage 1c
631/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
632/// claim poll interval, capability set) stay on `WorkerConfig`.
633#[derive(Clone, Debug, PartialEq, Eq)]
634#[non_exhaustive]
635pub struct BackendConfig {
636    pub connection: BackendConnection,
637    pub timeouts: BackendTimeouts,
638    pub retry: BackendRetry,
639}
640
641impl BackendConfig {
642    /// Build a Valkey BackendConfig from host+port. Other fields take
643    /// backend defaults.
644    pub fn valkey(host: impl Into<String>, port: u16) -> Self {
645        Self {
646            connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
647            timeouts: BackendTimeouts::default(),
648            retry: BackendRetry::default(),
649        }
650    }
651}
652
653// ── Issue #122: ScannerFilter ───────────────────────────────────────────
654
655/// Per-consumer filter applied by FlowFabric's background scanners and
656/// completion subscribers so multiple FlowFabric instances sharing a
657/// single Valkey keyspace can operate on disjoint subsets of
658/// executions without mutual interference (issue #122).
659///
660/// Sibling of [`CompletionPayload`]: the former is a scan *output*
661/// shape, this is the *input* predicate scanners and completion
662/// subscribers consult per candidate.
663///
664/// # Fields & backing storage
665///
666/// * `namespace` — matches against the `namespace` field on
667///   `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
668///   one HGET per candidate when set.
669/// * `instance_tag` — matches against an entry in the execution's
670///   user-supplied tags hash at the canonical key
671///   **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
672///   hash-tag, e.g. `{fp:42}`). Written by the Lua function
673///   `ff_create_execution` (see `lua/execution.lua`) and
674///   `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
675///   the HGET targets `tag_key` on the tags hash and compares the
676///   returned string (if any) byte-for-byte against `tag_value`.
677///   Cost: one HGET per candidate when set.
678///
679/// When both are set, scanners check `namespace` first (short-circuit
680/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
681/// candidate.
682///
683/// # Semantics
684///
685/// * [`Self::is_noop`] returns true when both fields are `None` —
686///   the filter accepts every candidate. Used by the
687///   `subscribe_completions_filtered` default body to fall back to
688///   the unfiltered subscription.
689/// * [`Self::matches`] is the tight in-memory predicate once the
690///   HGET values have been fetched. Scanners fetch the fields
691///   lazily (namespace first) and pass the results in; the helper
692///   returns false as soon as one component mismatches.
693///
694/// # Scope
695///
696/// Today the filter is consulted by execution-shaped scanners
697/// (lease_expiry, attempt_timeout, execution_deadline,
698/// suspension_timeout, pending_wp_expiry, delayed_promoter,
699/// dependency_reconciler, cancel_reconciler, unblock,
700/// index_reconciler, retention_trimmer) and by completion
701/// subscribers. Non-execution scanners (budget_reconciler,
702/// budget_reset, quota_reconciler, flow_projector) accept a filter
703/// for API uniformity but do not apply it — their iteration domains
704/// (budgets, quotas, flows) are not keyed by the
705/// per-execution namespace / instance_tag shape.
706///
707/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
708/// `worker_instance`) can land additively.
709#[derive(Clone, Debug, Default, PartialEq, Eq)]
710#[non_exhaustive]
711pub struct ScannerFilter {
712    /// Tenant / workspace scope. Matches against the `namespace`
713    /// field on `exec_core`.
714    pub namespace: Option<Namespace>,
715    /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
716    /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
717    /// written by `ff_create_execution`).
718    pub instance_tag: Option<(String, String)>,
719}
720
721impl ScannerFilter {
722    /// Shared no-op filter — useful as the default for the
723    /// `Scanner::filter()` trait method so implementors that don't
724    /// override can hand back a `&'static` reference without
725    /// allocating per call.
726    pub const NOOP: Self = Self {
727        namespace: None,
728        instance_tag: None,
729    };
730
731    /// Create an empty filter (equivalent to [`Self::NOOP`]).
732    ///
733    /// Provided for external-crate consumers: `ScannerFilter` is
734    /// `#[non_exhaustive]`, so struct-literal and functional-update
735    /// construction are unavailable across crate boundaries. Start
736    /// from `new()` and chain `with_*` setters to build a filter.
737    pub fn new() -> Self {
738        Self::default()
739    }
740
741    /// Set the tenant-scope namespace filter dimension. Consumes
742    /// and returns `self` for chaining.
743    pub fn with_namespace(mut self, ns: Namespace) -> Self {
744        self.namespace = Some(ns);
745        self
746    }
747
748    /// Set the exact-match exec-tag filter dimension. Consumes and
749    /// returns `self` for chaining. At filter time, scanners read
750    /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
751    /// `key` byte-for-byte against `value`.
752    pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
753        self.instance_tag = Some((key.into(), value.into()));
754        self
755    }
756
757    /// True iff the filter has no dimensions set — every candidate
758    /// passes. Callers use this to short-circuit filtered subscribe
759    /// paths back to the unfiltered ones.
760    pub fn is_noop(&self) -> bool {
761        self.namespace.is_none() && self.instance_tag.is_none()
762    }
763
764    /// Post-HGET in-memory match check.
765    ///
766    /// `core_namespace` should be the `namespace` field read from
767    /// `exec_core` (None if the HGET returned nil / was skipped).
768    /// `tag_value` should be the HGET result for the configured
769    /// `instance_tag.0` key on the execution's tags hash (None if
770    /// the HGET returned nil / was skipped).
771    ///
772    /// When a filter dimension is `None` the corresponding argument
773    /// is ignored — callers may pass `None` to skip the HGET and
774    /// save a round-trip.
775    pub fn matches(
776        &self,
777        core_namespace: Option<&Namespace>,
778        tag_value: Option<&str>,
779    ) -> bool {
780        if let Some(ref want) = self.namespace {
781            match core_namespace {
782                Some(have) if have == want => {}
783                _ => return false,
784            }
785        }
786        if let Some((_, ref want_value)) = self.instance_tag {
787            match tag_value {
788                Some(have) if have == want_value.as_str() => {}
789                _ => return false,
790            }
791        }
792        true
793    }
794}
795
796// ── Tests ───────────────────────────────────────────────────────────────
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801    use crate::partition::{Partition, PartitionFamily};
802    use crate::types::{ExecutionId, LaneId, SignalId};
803
804    #[test]
805    fn backend_tag_derives() {
806        let a = BackendTag::Valkey;
807        let b = a;
808        assert_eq!(a, b);
809        assert_eq!(format!("{a:?}"), "Valkey");
810    }
811
812    #[test]
813    fn handle_kind_derives() {
814        for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
815            let c = k;
816            assert_eq!(k, c);
817            // Debug formatter reachable
818            let _ = format!("{k:?}");
819        }
820    }
821
822    #[test]
823    fn handle_opaque_roundtrips() {
824        let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
825        let o = HandleOpaque::new(bytes.clone());
826        assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
827        assert_eq!(o, o.clone());
828        let _ = format!("{o:?}");
829    }
830
831    #[test]
832    fn handle_composes() {
833        let h = Handle::new(
834            BackendTag::Valkey,
835            HandleKind::Fresh,
836            HandleOpaque::new(Box::new([0u8; 4])),
837        );
838        assert_eq!(h.backend, BackendTag::Valkey);
839        assert_eq!(h.kind, HandleKind::Fresh);
840        assert_eq!(h.clone(), h);
841    }
842
843    #[test]
844    fn capability_set_derives() {
845        let c1 = CapabilitySet::new(["gpu", "cuda"]);
846        let c2 = CapabilitySet::new(["gpu", "cuda"]);
847        assert_eq!(c1, c2);
848        assert!(!c1.is_empty());
849        assert!(CapabilitySet::default().is_empty());
850        let _ = format!("{c1:?}");
851    }
852
853    #[test]
854    fn claim_policy_derives() {
855        let p = ClaimPolicy::with_max_wait(Duration::from_millis(500));
856        assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
857        assert_eq!(p.clone(), p);
858        assert_eq!(ClaimPolicy::immediate(), ClaimPolicy::default());
859    }
860
861    #[test]
862    fn frame_and_kind_derive() {
863        let f = Frame {
864            bytes: b"hello".to_vec(),
865            kind: FrameKind::Stdout,
866            seq: Some(3),
867        };
868        assert_eq!(f.clone(), f);
869        assert_eq!(f.kind, FrameKind::Stdout);
870        assert_ne!(FrameKind::Stderr, FrameKind::Event);
871        let _ = format!("{f:?}");
872    }
873
874    #[test]
875    fn waitpoint_spec_derives() {
876        let spec = WaitpointSpec {
877            kind: WaitpointKind::SignalName,
878            matcher: b"approved".to_vec(),
879            hmac_token: WaitpointHmac::new("kid1:deadbeef"),
880        };
881        assert_eq!(spec.clone(), spec);
882        assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
883        assert_eq!(
884            WaitpointHmac::new("a"),
885            WaitpointHmac::new(String::from("a"))
886        );
887    }
888
889    #[test]
890    fn failure_reason_and_class() {
891        let r1 = FailureReason::new("boom");
892        let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
893        assert_eq!(r1.message, "boom");
894        assert!(r1.detail.is_none());
895        assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
896        assert_eq!(r1.clone(), r1);
897        assert_ne!(FailureClass::Transient, FailureClass::Permanent);
898    }
899
900    #[test]
901    fn usage_dimensions_default_and_eq() {
902        let u = UsageDimensions {
903            input_tokens: 10,
904            output_tokens: 20,
905            wall_ms: Some(150),
906            custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
907        };
908        assert_eq!(u.clone(), u);
909        assert_eq!(UsageDimensions::default().input_tokens, 0);
910    }
911
912    #[test]
913    fn admission_decision_derives() {
914        let a = AdmissionDecision::Admitted;
915        let t = AdmissionDecision::Throttled { retry_after_ms: 25 };
916        let r = AdmissionDecision::Rejected {
917            reason: "quota".into(),
918        };
919        assert_eq!(a.clone(), a);
920        assert_eq!(t.clone(), t);
921        assert_ne!(a, r);
922    }
923
924    #[test]
925    fn reclaim_token_wraps_grant() {
926        let grant = ReclaimGrant {
927            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
928            partition_key: crate::partition::PartitionKey::from(&Partition {
929                family: PartitionFamily::Flow,
930                index: 0,
931            }),
932            grant_key: "gkey".into(),
933            expires_at_ms: 123,
934            lane_id: LaneId::new("default"),
935        };
936        let t = ReclaimToken::new(grant.clone());
937        assert_eq!(t.grant, grant);
938        assert_eq!(t.clone(), t);
939    }
940
941    #[test]
942    fn lease_renewal_is_copy() {
943        let r = LeaseRenewal {
944            expires_at_ms: 100,
945            lease_epoch: 2,
946        };
947        let s = r; // Copy
948        assert_eq!(r, s);
949    }
950
951    #[test]
952    fn cancel_flow_policy_and_wait() {
953        assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
954        let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
955        assert_eq!(w, w);
956        assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
957    }
958
959    #[test]
960    fn completion_payload_derives() {
961        let c = CompletionPayload {
962            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
963            outcome: "success".into(),
964            payload_bytes: Some(b"ok".to_vec()),
965            produced_at_ms: TimestampMs::from_millis(1234),
966            flow_id: None,
967        };
968        assert_eq!(c.clone(), c);
969        let _ = format!("{c:?}");
970    }
971
972    #[test]
973    fn resume_signal_moved_and_derives() {
974        let s = ResumeSignal {
975            signal_id: SignalId::new(),
976            signal_name: "approve".into(),
977            signal_category: "decision".into(),
978            source_type: "user".into(),
979            source_identity: "u1".into(),
980            correlation_id: "c1".into(),
981            accepted_at: TimestampMs::from_millis(10),
982            payload: None,
983        };
984        assert_eq!(s.clone(), s);
985        let _ = format!("{s:?}");
986    }
987
988    #[test]
989    fn fail_outcome_variants() {
990        let retry = FailOutcome::RetryScheduled {
991            delay_until: TimestampMs::from_millis(42),
992        };
993        let terminal = FailOutcome::TerminalFailed;
994        assert_ne!(retry, terminal);
995        assert_eq!(retry.clone(), retry);
996    }
997
998    #[test]
999    fn scanner_filter_noop_and_default() {
1000        let f = ScannerFilter::default();
1001        assert!(f.is_noop());
1002        assert_eq!(f, ScannerFilter::NOOP);
1003        // A no-op filter matches any candidate, including ones that
1004        // produced no HGET results.
1005        assert!(f.matches(None, None));
1006        assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1007    }
1008
1009    #[test]
1010    fn scanner_filter_namespace_match() {
1011        let f = ScannerFilter {
1012            namespace: Some(Namespace::new("tenant-a")),
1013            instance_tag: None,
1014        };
1015        assert!(!f.is_noop());
1016        assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1017        assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1018        // Missing core namespace ⇒ no match.
1019        assert!(!f.matches(None, None));
1020    }
1021
1022    #[test]
1023    fn scanner_filter_instance_tag_match() {
1024        let f = ScannerFilter {
1025            namespace: None,
1026            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1027        };
1028        assert!(f.matches(None, Some("i-1")));
1029        assert!(!f.matches(None, Some("i-2")));
1030        assert!(!f.matches(None, None));
1031    }
1032
1033    #[test]
1034    fn scanner_filter_both_dimensions() {
1035        let f = ScannerFilter {
1036            namespace: Some(Namespace::new("tenant-a")),
1037            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1038        };
1039        assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1040        assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1041        assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1042        assert!(!f.matches(None, Some("i-1")));
1043    }
1044
1045    #[test]
1046    fn scanner_filter_builder_construction() {
1047        // Simulates external-crate usage: only public constructors
1048        // and chainable setters (no struct-literal access to the
1049        // `#[non_exhaustive]` fields).
1050        let empty = ScannerFilter::new();
1051        assert!(empty.is_noop());
1052        assert_eq!(empty, ScannerFilter::NOOP);
1053
1054        let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1055        assert!(!ns_only.is_noop());
1056        assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1057
1058        let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1059        assert!(!tag_only.is_noop());
1060        assert!(tag_only.matches(None, Some("i-1")));
1061
1062        let both = ScannerFilter::new()
1063            .with_namespace(Namespace::new("tenant-a"))
1064            .with_instance_tag("cairn.instance_id", "i-1");
1065        assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1066        assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1067    }
1068
1069    #[test]
1070    fn backend_config_valkey_ctor() {
1071        let c = BackendConfig::valkey("host.local", 6379);
1072        // Same-crate match against an otherwise `#[non_exhaustive]`
1073        // enum is irrefutable — no wildcard needed and `let-else`
1074        // would trip `irrefutable_let_patterns`.
1075        let BackendConnection::Valkey(v) = &c.connection;
1076        assert_eq!(v.host, "host.local");
1077        assert_eq!(v.port, 6379);
1078        assert!(!v.tls);
1079        assert!(!v.cluster);
1080        assert_eq!(c.timeouts, BackendTimeouts::default());
1081        assert_eq!(c.retry, BackendRetry::default());
1082        assert_eq!(c.clone(), c);
1083    }
1084}