Skip to main content

ff_core/
backend.rs

1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//!   [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//!   annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//!   ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//!   at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ReclaimGrant;
24use crate::types::{Namespace, TimestampMs, WaitpointToken};
25use std::collections::BTreeMap;
26use std::time::Duration;
27
28// ── §4.1 Handle trio ────────────────────────────────────────────────────
29
30/// Backend-tag discriminator embedded in every [`Handle`] so ops can
31/// dispatch to the correct backend implementation at runtime.
32///
33/// `#[non_exhaustive]`: new backend variants land additively as impls
34/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
35/// later).
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
37#[non_exhaustive]
38pub enum BackendTag {
39    /// The Valkey FCALL-backed implementation.
40    Valkey,
41}
42
43/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
44/// on entry to each op and return `EngineError::State` on mismatch.
45///
46/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
47/// `SuspendToken` type split (RFC-012 §4.1).
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
49#[non_exhaustive]
50pub enum HandleKind {
51    /// Fresh claim — returned by `claim` / `claim_from_grant`.
52    Fresh,
53    /// Resumed from reclaim — returned by `claim_from_reclaim`.
54    Resumed,
55    /// Suspended — returned by `suspend`. Terminal for the lease;
56    /// resumption mints a new Handle via `claim_from_reclaim`.
57    Suspended,
58}
59
60/// Backend-private opaque payload carried inside a [`Handle`].
61///
62/// Encodes backend-specific state (on Valkey: exec id, attempt id,
63/// lease id, lease epoch, capability binding, partition). Consumers do
64/// not construct or inspect the bytes — they are produced by the
65/// backend on claim/resume and consumed by the backend on each op.
66///
67/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
68/// public-type transitive dep on the `bytes` crate.
69#[derive(Clone, Debug, PartialEq, Eq, Hash)]
70pub struct HandleOpaque(Box<[u8]>);
71
72impl HandleOpaque {
73    /// Construct from backend-owned bytes. Only backend impls call this.
74    pub fn new(bytes: Box<[u8]>) -> Self {
75        Self(bytes)
76    }
77
78    /// Borrow the underlying bytes (backend-internal use).
79    pub fn as_bytes(&self) -> &[u8] {
80        &self.0
81    }
82}
83
84/// Opaque attempt cookie held by the worker for the duration of an
85/// attempt. Produced by `claim` / `claim_from_reclaim` / `suspend`;
86/// borrowed by every op (renew, progress, append_frame, complete, fail,
87/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
88///
89/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
90/// than consume so callers can retry after a transport error.
91#[derive(Clone, Debug, PartialEq, Eq, Hash)]
92#[non_exhaustive]
93pub struct Handle {
94    pub backend: BackendTag,
95    pub kind: HandleKind,
96    pub opaque: HandleOpaque,
97}
98
99impl Handle {
100    /// Construct a new Handle. Called by backend impls only; consumer
101    /// code receives Handles from `claim` / `suspend` / `claim_from_reclaim`.
102    pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
103        Self {
104            backend,
105            kind,
106            opaque,
107        }
108    }
109}
110
111// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
112
113/// Worker capability set — the tokens the worker advertises to the
114/// scheduler and to `claim`. Today stored as `Vec<String>` on
115/// `WorkerConfig`; promoted to a named newtype so the trait signatures
116/// can talk about capabilities without committing to a concrete
117/// container shape.
118///
119/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
120/// round-2 lean (newtype over `Vec<String>`).
121#[derive(Clone, Debug, PartialEq, Eq, Default)]
122#[non_exhaustive]
123pub struct CapabilitySet {
124    pub tokens: Vec<String>,
125}
126
127impl CapabilitySet {
128    /// Build from any iterable of string-like capability tokens.
129    pub fn new<I, S>(tokens: I) -> Self
130    where
131        I: IntoIterator<Item = S>,
132        S: Into<String>,
133    {
134        Self {
135            tokens: tokens.into_iter().map(Into::into).collect(),
136        }
137    }
138
139    /// True iff the set contains no tokens.
140    pub fn is_empty(&self) -> bool {
141        self.tokens.is_empty()
142    }
143}
144
145/// Policy hints for `claim`. Minimal at Stage 0 per RFC-012 §3.3.0
146/// ("Bikeshed-prone; keep minimal at Stage 0"). Future fields (retry
147/// count, fairness hints) land additively.
148#[derive(Clone, Debug, PartialEq, Eq, Default)]
149#[non_exhaustive]
150pub struct ClaimPolicy {
151    /// Maximum blocking wait. `None` means backend-default (today:
152    /// non-blocking / immediate return).
153    pub max_wait: Option<Duration>,
154}
155
156impl ClaimPolicy {
157    /// Zero-timeout claim (non-blocking). Matches today's SDK default.
158    pub fn immediate() -> Self {
159        Self { max_wait: None }
160    }
161
162    /// Claim with an explicit blocking bound.
163    pub fn with_max_wait(max_wait: Duration) -> Self {
164        Self {
165            max_wait: Some(max_wait),
166        }
167    }
168}
169
170/// Frame classification for `append_frame`. Mirrors the Lua-side
171/// `ff_append_frame` `frame_type` ARGV.
172#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
173#[non_exhaustive]
174pub enum FrameKind {
175    /// Operator-visible progress output / log line.
176    Stdout,
177    /// Operator-visible error / warning output.
178    Stderr,
179    /// Structured event (JSON payload).
180    Event,
181    /// Binary / opaque payload.
182    Blob,
183}
184
185/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
186/// Today's FCALL takes the byte payload + frame_type + optional seq as
187/// discrete ARGV; Stage 0 collects them into a named type for trait
188/// signatures.
189#[derive(Clone, Debug, PartialEq, Eq)]
190#[non_exhaustive]
191pub struct Frame {
192    pub bytes: Vec<u8>,
193    pub kind: FrameKind,
194    /// Optional monotonic sequence. Set by the caller when the stream
195    /// protocol is sequence-bound; `None` lets the backend assign.
196    pub seq: Option<u64>,
197}
198
199impl Frame {
200    /// Construct a frame. `seq` defaults to `None` (backend-assigned);
201    /// callers that need an explicit sequence use
202    /// [`Frame::with_seq`].
203    pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
204        Self {
205            bytes,
206            kind,
207            seq: None,
208        }
209    }
210
211    /// Construct a frame with an explicit monotonic sequence.
212    pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
213        Self {
214            bytes,
215            kind,
216            seq: Some(seq),
217        }
218    }
219}
220
221// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
222
223/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
224/// — signal name, correlation id, etc.).
225#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
226#[non_exhaustive]
227pub enum WaitpointKind {
228    /// Match by signal name.
229    SignalName,
230    /// Match by correlation id.
231    CorrelationId,
232    /// Generic external-signal waitpoint (external delivery).
233    External,
234}
235
236/// HMAC token that binds a waitpoint to its mint-time identity. Wire
237/// shape `kid:40hex`.
238///
239/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
240/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
241/// through automatically — no derived formatter can leak the raw
242/// digest when a [`WaitpointSpec`] is debug-printed via
243/// `tracing::debug!(spec=?spec)`.
244///
245/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
246/// naming the waitpoint-bound HMAC role explicitly.
247#[derive(Clone, PartialEq, Eq, Hash)]
248pub struct WaitpointHmac(WaitpointToken);
249
250impl WaitpointHmac {
251    pub fn new(token: impl Into<String>) -> Self {
252        Self(WaitpointToken::from(token.into()))
253    }
254
255    /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
256    /// redact — call sites that need the raw digest must explicitly
257    /// call [`WaitpointToken::as_str`].
258    pub fn token(&self) -> &WaitpointToken {
259        &self.0
260    }
261
262    /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
263    /// non-redacted call sites; this method exists only for transport
264    /// / FCALL ARGV construction where the raw wire bytes are required.
265    pub fn as_str(&self) -> &str {
266        self.0.as_str()
267    }
268}
269
270// Forward Debug / Display to the wrapped WaitpointToken so the
271// redaction guarantees on that type extend here. Derived Debug would
272// expose the raw string field and defeat the wrap.
273impl std::fmt::Debug for WaitpointHmac {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        write!(f, "WaitpointHmac({:?})", self.0)
276    }
277}
278
279/// One waitpoint inside a suspend request. `suspend` takes a
280/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
281/// the enclosing suspend args in the Phase-1 contract.
282#[derive(Clone, Debug, PartialEq, Eq)]
283#[non_exhaustive]
284pub struct WaitpointSpec {
285    pub kind: WaitpointKind,
286    pub matcher: Vec<u8>,
287    pub hmac_token: WaitpointHmac,
288}
289
290impl WaitpointSpec {
291    pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
292        Self {
293            kind,
294            matcher,
295            hmac_token,
296        }
297    }
298}
299
300// ── §3.3.0 Failure classification types ─────────────────────────────────
301
302/// Human-readable failure description + optional structured detail.
303/// Replaces today's ad-hoc string arg to `fail`.
304#[derive(Clone, Debug, PartialEq, Eq)]
305#[non_exhaustive]
306pub struct FailureReason {
307    pub message: String,
308    pub detail: Option<Vec<u8>>,
309}
310
311impl FailureReason {
312    pub fn new(message: impl Into<String>) -> Self {
313        Self {
314            message: message.into(),
315            detail: None,
316        }
317    }
318
319    pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
320        Self {
321            message: message.into(),
322            detail: Some(detail),
323        }
324    }
325}
326
327/// Failure classification — determines retry disposition on the Lua
328/// side. Mirrors the Lua-side classification codes.
329#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
330#[non_exhaustive]
331pub enum FailureClass {
332    /// Retryable transient error.
333    Transient,
334    /// Permanent error — no retries.
335    Permanent,
336    /// Crash / process death inferred from lease expiry.
337    InfraCrash,
338    /// Timeout at the attempt or operation level.
339    Timeout,
340    /// Cooperative cancellation by operator or cancel_flow.
341    Cancelled,
342}
343
344// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
345
346/// Usage report for `report_usage`. Mirrors today's
347/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
348/// dimensions.
349#[derive(Clone, Debug, PartialEq, Eq, Default)]
350#[non_exhaustive]
351pub struct UsageDimensions {
352    /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
353    pub input_tokens: u64,
354    /// Output tokens produced (LLM-shaped usage).
355    pub output_tokens: u64,
356    /// Wall-clock duration, in milliseconds, attributable to this
357    /// report. `None` for pure token-count reports.
358    pub wall_ms: Option<u64>,
359    /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
360    /// iteration order (important for dedup-key derivation on some
361    /// budget schemes).
362    pub custom: BTreeMap<String, u64>,
363}
364
365/// Admission outcome returned by `report_usage`.
366#[derive(Clone, Debug, PartialEq, Eq)]
367#[non_exhaustive]
368pub enum AdmissionDecision {
369    /// Usage accepted; caller may continue.
370    Admitted,
371    /// Rate-limited or concurrency-capped; retry after the suggested
372    /// interval.
373    Throttled { retry_after_ms: u64 },
374    /// Rejected outright — budget exhausted or policy violation.
375    Rejected { reason: String },
376}
377
378// ── §3.3.0 Reclaim / lease types ────────────────────────────────────────
379
380/// Opaque cookie returned by the reclaim scanner; consumed by
381/// `claim_from_reclaim` to mint a resumed Handle.
382///
383/// Wraps [`ReclaimGrant`] today (the scanner's existing product).
384/// Kept as a newtype so trait signatures name the reclaim-bound role
385/// explicitly and so the wrapped shape can evolve without breaking the
386/// trait.
387#[derive(Clone, Debug, PartialEq, Eq)]
388#[non_exhaustive]
389pub struct ReclaimToken {
390    pub grant: ReclaimGrant,
391}
392
393impl ReclaimToken {
394    pub fn new(grant: ReclaimGrant) -> Self {
395        Self { grant }
396    }
397}
398
399/// Result of a successful `renew` call.
400#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
401#[non_exhaustive]
402pub struct LeaseRenewal {
403    /// New lease expiry (monotonic ms).
404    pub expires_at_ms: u64,
405    /// Lease epoch after renewal. Monotonic non-decreasing.
406    pub lease_epoch: u64,
407}
408
409impl LeaseRenewal {
410    pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
411        Self {
412            expires_at_ms,
413            lease_epoch,
414        }
415    }
416}
417
418// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
419
420/// Cancel-flow policy — what to do with the flow's members. Today
421/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
422/// policy shape as a typed enum (RFC-012 §3.3.0).
423#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
424#[non_exhaustive]
425pub enum CancelFlowPolicy {
426    /// Cancel only the flow record; leave members alone.
427    FlowOnly,
428    /// Cancel the flow and every non-terminal member execution.
429    CancelAll,
430    /// Cancel the flow and every member currently in `Pending` /
431    /// `Blocked` / `Eligible` — leave `Running` executions alone to
432    /// drain.
433    CancelPending,
434}
435
436/// Caller wait posture for `cancel_flow`.
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
438#[non_exhaustive]
439pub enum CancelFlowWait {
440    /// Return after the flow-state flip; member cancellations dispatch
441    /// asynchronously.
442    NoWait,
443    /// Block until member cancellations complete, up to `timeout`.
444    WaitTimeout(Duration),
445    /// Block until member cancellations complete, no deadline.
446    WaitIndefinite,
447}
448
449// ── §3.3.0 Completion stream payload ────────────────────────────────────
450
451/// One completion event delivered through the `CompletionStream`
452/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
453/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
454///
455/// `flow_id` was added in issue #90 so DAG-dependency routing
456/// (`dispatch_dependency_resolution`) has the partition-routable flow
457/// handle without reparsing the Lua-emitted JSON downstream.
458/// `#[non_exhaustive]` keeps future field additions additive; use
459/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
460/// for construction.
461///
462/// `payload_bytes` / `produced_at_ms` are authorised but not yet
463/// populated by the Valkey Lua emitters — consumers on the
464/// `CompletionStream` read `execution_id` + `flow_id` today and must
465/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
466#[derive(Clone, Debug, PartialEq, Eq)]
467#[non_exhaustive]
468pub struct CompletionPayload {
469    pub execution_id: crate::types::ExecutionId,
470    pub outcome: String,
471    pub payload_bytes: Option<Vec<u8>>,
472    pub produced_at_ms: TimestampMs,
473    /// Flow handle for partition routing. Added in issue #90 (#90);
474    /// `None` for emitters that don't yet surface it.
475    pub flow_id: Option<crate::types::FlowId>,
476}
477
478impl CompletionPayload {
479    pub fn new(
480        execution_id: crate::types::ExecutionId,
481        outcome: impl Into<String>,
482        payload_bytes: Option<Vec<u8>>,
483        produced_at_ms: TimestampMs,
484    ) -> Self {
485        Self {
486            execution_id,
487            outcome: outcome.into(),
488            payload_bytes,
489            produced_at_ms,
490            flow_id: None,
491        }
492    }
493
494    /// Attach a flow handle to the payload. Additive builder so adding
495    /// `flow_id` didn't require a breaking change to [`Self::new`].
496    #[must_use]
497    pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
498        self.flow_id = Some(flow_id);
499        self
500    }
501}
502
503// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
504
505/// Signal that satisfied a waitpoint matcher and is therefore part of
506/// the reason an execution resumed. Returned by `observe_signals`
507/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
508///
509/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
510/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
511///
512/// Returned only for signals whose matcher slot in the waitpoint's
513/// resume condition is marked satisfied. Pre-buffered-but-unmatched
514/// signals are not included.
515///
516/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
517/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
518/// the Stage 0 crate move.
519#[derive(Clone, Debug, PartialEq, Eq)]
520pub struct ResumeSignal {
521    pub signal_id: crate::types::SignalId,
522    pub signal_name: String,
523    pub signal_category: String,
524    pub source_type: String,
525    pub source_identity: String,
526    pub correlation_id: String,
527    /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
528    /// accepted this signal. `0` if the stored `accepted_at` field is
529    /// missing or non-numeric (a Lua-side defect — not expected at
530    /// runtime).
531    pub accepted_at: TimestampMs,
532    /// Raw payload bytes, if the signal was delivered with one. `None`
533    /// for signals delivered without a payload.
534    pub payload: Option<Vec<u8>>,
535}
536
537// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
538
539/// Outcome of a `fail()` call.
540///
541/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
542/// `ff_core::backend::FailOutcome` so it is nameable by the
543/// `EngineBackend` trait signature. `ff_sdk::task` retains a
544/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
545///
546/// Not `#[non_exhaustive]` because existing consumers (ff-test,
547/// ff-readiness-tests) construct and match this enum exhaustively;
548/// the shape has been stable since the `fail()` API landed and any
549/// additive growth would be a follow-up RFC's deliberate break.
550#[derive(Clone, Debug, PartialEq, Eq)]
551pub enum FailOutcome {
552    /// Retry was scheduled — execution is in delayed backoff.
553    RetryScheduled {
554        delay_until: crate::types::TimestampMs,
555    },
556    /// No retries left — execution is terminal failed.
557    TerminalFailed,
558}
559
560// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
561
562/// Pool timing shared across backend connections.
563///
564/// Fields are `#[non_exhaustive]` per project convention — connection
565/// tunables grow as new backends land. Default is the Phase-1 Valkey
566/// client's out-of-box shape (no explicit timeout, no explicit pool
567/// cap; inherits ferriskey defaults).
568#[derive(Clone, Debug, Default, PartialEq, Eq)]
569#[non_exhaustive]
570pub struct BackendTimeouts {
571    /// Per-request timeout. `None` ⇒ backend default.
572    pub request: Option<Duration>,
573}
574
575/// Retry policy shared across backend connections.
576///
577/// Matches ferriskey's `ConnectionRetryStrategy` shape (see
578/// `ferriskey/src/client/types.rs:151`) so Stage 1c's Valkey wiring is a
579/// direct pass-through — we don't reimplement what ferriskey already
580/// provides. The Postgres backend (future) interprets the same fields
581/// under its own retry semantics, or maps `None` to its own defaults.
582///
583/// Each field is `Option<u32>`: `None` ⇒ backend default (for Valkey,
584/// this means `ConnectionRetryStrategy::default()`); `Some(v)` ⇒ pass
585/// `v` straight through.
586#[derive(Clone, Debug, Default, PartialEq, Eq)]
587#[non_exhaustive]
588pub struct BackendRetry {
589    /// Exponent base for the backoff curve. `None` ⇒ backend default.
590    pub exponent_base: Option<u32>,
591    /// Multiplicative factor applied to each backoff step. `None` ⇒
592    /// backend default.
593    pub factor: Option<u32>,
594    /// Maximum number of retry attempts on transient transport errors.
595    /// `None` ⇒ backend default.
596    pub number_of_retries: Option<u32>,
597    /// Jitter as a percentage of the computed backoff. `None` ⇒ backend
598    /// default.
599    pub jitter_percent: Option<u32>,
600}
601
602/// Valkey-specific connection parameters.
603#[derive(Clone, Debug, PartialEq, Eq)]
604#[non_exhaustive]
605pub struct ValkeyConnection {
606    /// Valkey hostname.
607    pub host: String,
608    /// Valkey port.
609    pub port: u16,
610    /// Enable TLS for the Valkey connection.
611    pub tls: bool,
612    /// Enable Valkey cluster mode.
613    pub cluster: bool,
614}
615
616impl ValkeyConnection {
617    pub fn new(host: impl Into<String>, port: u16) -> Self {
618        Self {
619            host: host.into(),
620            port,
621            tls: false,
622            cluster: false,
623        }
624    }
625}
626
627/// Discriminated union over per-backend connection shapes. Stage 1a
628/// ships the Valkey arm; future backends (Postgres) land additively
629/// under the `#[non_exhaustive]` guard.
630#[derive(Clone, Debug, PartialEq, Eq)]
631#[non_exhaustive]
632pub enum BackendConnection {
633    Valkey(ValkeyConnection),
634}
635
636/// Configuration passed to `ValkeyBackend::connect` (and, later, to
637/// other backend `connect` constructors). Carries the connection
638/// details + shared timing/retry policy. Replaces the Valkey-specific
639/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
640///
641/// `BackendConfig` is the replacement target for `WorkerConfig`'s
642/// `host` / `port` / `tls` / `cluster` fields. The full migration
643/// lands across Stage 1a (type introduction) and Stage 1c
644/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
645/// claim poll interval, capability set) stay on `WorkerConfig`.
646#[derive(Clone, Debug, PartialEq, Eq)]
647#[non_exhaustive]
648pub struct BackendConfig {
649    pub connection: BackendConnection,
650    pub timeouts: BackendTimeouts,
651    pub retry: BackendRetry,
652}
653
654impl BackendConfig {
655    /// Build a Valkey BackendConfig from host+port. Other fields take
656    /// backend defaults.
657    pub fn valkey(host: impl Into<String>, port: u16) -> Self {
658        Self {
659            connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
660            timeouts: BackendTimeouts::default(),
661            retry: BackendRetry::default(),
662        }
663    }
664}
665
666// ── Issue #122: ScannerFilter ───────────────────────────────────────────
667
668/// Per-consumer filter applied by FlowFabric's background scanners and
669/// completion subscribers so multiple FlowFabric instances sharing a
670/// single Valkey keyspace can operate on disjoint subsets of
671/// executions without mutual interference (issue #122).
672///
673/// Sibling of [`CompletionPayload`]: the former is a scan *output*
674/// shape, this is the *input* predicate scanners and completion
675/// subscribers consult per candidate.
676///
677/// # Fields & backing storage
678///
679/// * `namespace` — matches against the `namespace` field on
680///   `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
681///   one HGET per candidate when set.
682/// * `instance_tag` — matches against an entry in the execution's
683///   user-supplied tags hash at the canonical key
684///   **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
685///   hash-tag, e.g. `{fp:42}`). Written by the Lua function
686///   `ff_create_execution` (see `lua/execution.lua`) and
687///   `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
688///   the HGET targets `tag_key` on the tags hash and compares the
689///   returned string (if any) byte-for-byte against `tag_value`.
690///   Cost: one HGET per candidate when set.
691///
692/// When both are set, scanners check `namespace` first (short-circuit
693/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
694/// candidate.
695///
696/// # Semantics
697///
698/// * [`Self::is_noop`] returns true when both fields are `None` —
699///   the filter accepts every candidate. Used by the
700///   `subscribe_completions_filtered` default body to fall back to
701///   the unfiltered subscription.
702/// * [`Self::matches`] is the tight in-memory predicate once the
703///   HGET values have been fetched. Scanners fetch the fields
704///   lazily (namespace first) and pass the results in; the helper
705///   returns false as soon as one component mismatches.
706///
707/// # Scope
708///
709/// Today the filter is consulted by execution-shaped scanners
710/// (lease_expiry, attempt_timeout, execution_deadline,
711/// suspension_timeout, pending_wp_expiry, delayed_promoter,
712/// dependency_reconciler, cancel_reconciler, unblock,
713/// index_reconciler, retention_trimmer) and by completion
714/// subscribers. Non-execution scanners (budget_reconciler,
715/// budget_reset, quota_reconciler, flow_projector) accept a filter
716/// for API uniformity but do not apply it — their iteration domains
717/// (budgets, quotas, flows) are not keyed by the
718/// per-execution namespace / instance_tag shape.
719///
720/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
721/// `worker_instance`) can land additively.
722#[derive(Clone, Debug, Default, PartialEq, Eq)]
723#[non_exhaustive]
724pub struct ScannerFilter {
725    /// Tenant / workspace scope. Matches against the `namespace`
726    /// field on `exec_core`.
727    pub namespace: Option<Namespace>,
728    /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
729    /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
730    /// written by `ff_create_execution`).
731    pub instance_tag: Option<(String, String)>,
732}
733
734impl ScannerFilter {
735    /// Shared no-op filter — useful as the default for the
736    /// `Scanner::filter()` trait method so implementors that don't
737    /// override can hand back a `&'static` reference without
738    /// allocating per call.
739    pub const NOOP: Self = Self {
740        namespace: None,
741        instance_tag: None,
742    };
743
744    /// Create an empty filter (equivalent to [`Self::NOOP`]).
745    ///
746    /// Provided for external-crate consumers: `ScannerFilter` is
747    /// `#[non_exhaustive]`, so struct-literal and functional-update
748    /// construction are unavailable across crate boundaries. Start
749    /// from `new()` and chain `with_*` setters to build a filter.
750    pub fn new() -> Self {
751        Self::default()
752    }
753
754    /// Set the tenant-scope namespace filter dimension. Consumes
755    /// and returns `self` for chaining.
756    pub fn with_namespace(mut self, ns: Namespace) -> Self {
757        self.namespace = Some(ns);
758        self
759    }
760
761    /// Set the exact-match exec-tag filter dimension. Consumes and
762    /// returns `self` for chaining. At filter time, scanners read
763    /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
764    /// `key` byte-for-byte against `value`.
765    pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
766        self.instance_tag = Some((key.into(), value.into()));
767        self
768    }
769
770    /// True iff the filter has no dimensions set — every candidate
771    /// passes. Callers use this to short-circuit filtered subscribe
772    /// paths back to the unfiltered ones.
773    pub fn is_noop(&self) -> bool {
774        self.namespace.is_none() && self.instance_tag.is_none()
775    }
776
777    /// Post-HGET in-memory match check.
778    ///
779    /// `core_namespace` should be the `namespace` field read from
780    /// `exec_core` (None if the HGET returned nil / was skipped).
781    /// `tag_value` should be the HGET result for the configured
782    /// `instance_tag.0` key on the execution's tags hash (None if
783    /// the HGET returned nil / was skipped).
784    ///
785    /// When a filter dimension is `None` the corresponding argument
786    /// is ignored — callers may pass `None` to skip the HGET and
787    /// save a round-trip.
788    pub fn matches(
789        &self,
790        core_namespace: Option<&Namespace>,
791        tag_value: Option<&str>,
792    ) -> bool {
793        if let Some(ref want) = self.namespace {
794            match core_namespace {
795                Some(have) if have == want => {}
796                _ => return false,
797            }
798        }
799        if let Some((_, ref want_value)) = self.instance_tag {
800            match tag_value {
801                Some(have) if have == want_value.as_str() => {}
802                _ => return false,
803            }
804        }
805        true
806    }
807}
808
809// ── Tests ───────────────────────────────────────────────────────────────
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use crate::partition::{Partition, PartitionFamily};
815    use crate::types::{ExecutionId, LaneId, SignalId};
816
817    #[test]
818    fn backend_tag_derives() {
819        let a = BackendTag::Valkey;
820        let b = a;
821        assert_eq!(a, b);
822        assert_eq!(format!("{a:?}"), "Valkey");
823    }
824
825    #[test]
826    fn handle_kind_derives() {
827        for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
828            let c = k;
829            assert_eq!(k, c);
830            // Debug formatter reachable
831            let _ = format!("{k:?}");
832        }
833    }
834
835    #[test]
836    fn handle_opaque_roundtrips() {
837        let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
838        let o = HandleOpaque::new(bytes.clone());
839        assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
840        assert_eq!(o, o.clone());
841        let _ = format!("{o:?}");
842    }
843
844    #[test]
845    fn handle_composes() {
846        let h = Handle::new(
847            BackendTag::Valkey,
848            HandleKind::Fresh,
849            HandleOpaque::new(Box::new([0u8; 4])),
850        );
851        assert_eq!(h.backend, BackendTag::Valkey);
852        assert_eq!(h.kind, HandleKind::Fresh);
853        assert_eq!(h.clone(), h);
854    }
855
856    #[test]
857    fn capability_set_derives() {
858        let c1 = CapabilitySet::new(["gpu", "cuda"]);
859        let c2 = CapabilitySet::new(["gpu", "cuda"]);
860        assert_eq!(c1, c2);
861        assert!(!c1.is_empty());
862        assert!(CapabilitySet::default().is_empty());
863        let _ = format!("{c1:?}");
864    }
865
866    #[test]
867    fn claim_policy_derives() {
868        let p = ClaimPolicy::with_max_wait(Duration::from_millis(500));
869        assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
870        assert_eq!(p.clone(), p);
871        assert_eq!(ClaimPolicy::immediate(), ClaimPolicy::default());
872    }
873
874    #[test]
875    fn frame_and_kind_derive() {
876        let f = Frame {
877            bytes: b"hello".to_vec(),
878            kind: FrameKind::Stdout,
879            seq: Some(3),
880        };
881        assert_eq!(f.clone(), f);
882        assert_eq!(f.kind, FrameKind::Stdout);
883        assert_ne!(FrameKind::Stderr, FrameKind::Event);
884        let _ = format!("{f:?}");
885    }
886
887    #[test]
888    fn waitpoint_spec_derives() {
889        let spec = WaitpointSpec {
890            kind: WaitpointKind::SignalName,
891            matcher: b"approved".to_vec(),
892            hmac_token: WaitpointHmac::new("kid1:deadbeef"),
893        };
894        assert_eq!(spec.clone(), spec);
895        assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
896        assert_eq!(
897            WaitpointHmac::new("a"),
898            WaitpointHmac::new(String::from("a"))
899        );
900    }
901
902    #[test]
903    fn failure_reason_and_class() {
904        let r1 = FailureReason::new("boom");
905        let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
906        assert_eq!(r1.message, "boom");
907        assert!(r1.detail.is_none());
908        assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
909        assert_eq!(r1.clone(), r1);
910        assert_ne!(FailureClass::Transient, FailureClass::Permanent);
911    }
912
913    #[test]
914    fn usage_dimensions_default_and_eq() {
915        let u = UsageDimensions {
916            input_tokens: 10,
917            output_tokens: 20,
918            wall_ms: Some(150),
919            custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
920        };
921        assert_eq!(u.clone(), u);
922        assert_eq!(UsageDimensions::default().input_tokens, 0);
923    }
924
925    #[test]
926    fn admission_decision_derives() {
927        let a = AdmissionDecision::Admitted;
928        let t = AdmissionDecision::Throttled { retry_after_ms: 25 };
929        let r = AdmissionDecision::Rejected {
930            reason: "quota".into(),
931        };
932        assert_eq!(a.clone(), a);
933        assert_eq!(t.clone(), t);
934        assert_ne!(a, r);
935    }
936
937    #[test]
938    fn reclaim_token_wraps_grant() {
939        let grant = ReclaimGrant {
940            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
941            partition_key: crate::partition::PartitionKey::from(&Partition {
942                family: PartitionFamily::Flow,
943                index: 0,
944            }),
945            grant_key: "gkey".into(),
946            expires_at_ms: 123,
947            lane_id: LaneId::new("default"),
948        };
949        let t = ReclaimToken::new(grant.clone());
950        assert_eq!(t.grant, grant);
951        assert_eq!(t.clone(), t);
952    }
953
954    #[test]
955    fn lease_renewal_is_copy() {
956        let r = LeaseRenewal {
957            expires_at_ms: 100,
958            lease_epoch: 2,
959        };
960        let s = r; // Copy
961        assert_eq!(r, s);
962    }
963
964    #[test]
965    fn cancel_flow_policy_and_wait() {
966        assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
967        let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
968        assert_eq!(w, w);
969        assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
970    }
971
972    #[test]
973    fn completion_payload_derives() {
974        let c = CompletionPayload {
975            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
976            outcome: "success".into(),
977            payload_bytes: Some(b"ok".to_vec()),
978            produced_at_ms: TimestampMs::from_millis(1234),
979            flow_id: None,
980        };
981        assert_eq!(c.clone(), c);
982        let _ = format!("{c:?}");
983    }
984
985    #[test]
986    fn resume_signal_moved_and_derives() {
987        let s = ResumeSignal {
988            signal_id: SignalId::new(),
989            signal_name: "approve".into(),
990            signal_category: "decision".into(),
991            source_type: "user".into(),
992            source_identity: "u1".into(),
993            correlation_id: "c1".into(),
994            accepted_at: TimestampMs::from_millis(10),
995            payload: None,
996        };
997        assert_eq!(s.clone(), s);
998        let _ = format!("{s:?}");
999    }
1000
1001    #[test]
1002    fn fail_outcome_variants() {
1003        let retry = FailOutcome::RetryScheduled {
1004            delay_until: TimestampMs::from_millis(42),
1005        };
1006        let terminal = FailOutcome::TerminalFailed;
1007        assert_ne!(retry, terminal);
1008        assert_eq!(retry.clone(), retry);
1009    }
1010
1011    #[test]
1012    fn scanner_filter_noop_and_default() {
1013        let f = ScannerFilter::default();
1014        assert!(f.is_noop());
1015        assert_eq!(f, ScannerFilter::NOOP);
1016        // A no-op filter matches any candidate, including ones that
1017        // produced no HGET results.
1018        assert!(f.matches(None, None));
1019        assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1020    }
1021
1022    #[test]
1023    fn scanner_filter_namespace_match() {
1024        let f = ScannerFilter {
1025            namespace: Some(Namespace::new("tenant-a")),
1026            instance_tag: None,
1027        };
1028        assert!(!f.is_noop());
1029        assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1030        assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1031        // Missing core namespace ⇒ no match.
1032        assert!(!f.matches(None, None));
1033    }
1034
1035    #[test]
1036    fn scanner_filter_instance_tag_match() {
1037        let f = ScannerFilter {
1038            namespace: None,
1039            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1040        };
1041        assert!(f.matches(None, Some("i-1")));
1042        assert!(!f.matches(None, Some("i-2")));
1043        assert!(!f.matches(None, None));
1044    }
1045
1046    #[test]
1047    fn scanner_filter_both_dimensions() {
1048        let f = ScannerFilter {
1049            namespace: Some(Namespace::new("tenant-a")),
1050            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1051        };
1052        assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1053        assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1054        assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1055        assert!(!f.matches(None, Some("i-1")));
1056    }
1057
1058    #[test]
1059    fn scanner_filter_builder_construction() {
1060        // Simulates external-crate usage: only public constructors
1061        // and chainable setters (no struct-literal access to the
1062        // `#[non_exhaustive]` fields).
1063        let empty = ScannerFilter::new();
1064        assert!(empty.is_noop());
1065        assert_eq!(empty, ScannerFilter::NOOP);
1066
1067        let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1068        assert!(!ns_only.is_noop());
1069        assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1070
1071        let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1072        assert!(!tag_only.is_noop());
1073        assert!(tag_only.matches(None, Some("i-1")));
1074
1075        let both = ScannerFilter::new()
1076            .with_namespace(Namespace::new("tenant-a"))
1077            .with_instance_tag("cairn.instance_id", "i-1");
1078        assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1079        assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1080    }
1081
1082    #[test]
1083    fn backend_config_valkey_ctor() {
1084        let c = BackendConfig::valkey("host.local", 6379);
1085        // Same-crate match against an otherwise `#[non_exhaustive]`
1086        // enum is irrefutable — no wildcard needed and `let-else`
1087        // would trip `irrefutable_let_patterns`.
1088        let BackendConnection::Valkey(v) = &c.connection;
1089        assert_eq!(v.host, "host.local");
1090        assert_eq!(v.port, 6379);
1091        assert!(!v.tls);
1092        assert!(!v.cluster);
1093        assert_eq!(c.timeouts, BackendTimeouts::default());
1094        assert_eq!(c.retry, BackendRetry::default());
1095        assert_eq!(c.clone(), c);
1096    }
1097}