ff_core/backend.rs
1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//! [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//! annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//! ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//! at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ReclaimGrant;
24use crate::types::{Namespace, TimestampMs, WaitpointToken};
25use std::collections::BTreeMap;
26use std::time::Duration;
27
28// ── §4.1 Handle trio ────────────────────────────────────────────────────
29
30/// Backend-tag discriminator embedded in every [`Handle`] so ops can
31/// dispatch to the correct backend implementation at runtime.
32///
33/// `#[non_exhaustive]`: new backend variants land additively as impls
34/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
35/// later).
36#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
37#[non_exhaustive]
38pub enum BackendTag {
39 /// The Valkey FCALL-backed implementation.
40 Valkey,
41}
42
43/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
44/// on entry to each op and return `EngineError::State` on mismatch.
45///
46/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
47/// `SuspendToken` type split (RFC-012 §4.1).
48#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
49#[non_exhaustive]
50pub enum HandleKind {
51 /// Fresh claim — returned by `claim` / `claim_from_grant`.
52 Fresh,
53 /// Resumed from reclaim — returned by `claim_from_reclaim`.
54 Resumed,
55 /// Suspended — returned by `suspend`. Terminal for the lease;
56 /// resumption mints a new Handle via `claim_from_reclaim`.
57 Suspended,
58}
59
60/// Backend-private opaque payload carried inside a [`Handle`].
61///
62/// Encodes backend-specific state (on Valkey: exec id, attempt id,
63/// lease id, lease epoch, capability binding, partition). Consumers do
64/// not construct or inspect the bytes — they are produced by the
65/// backend on claim/resume and consumed by the backend on each op.
66///
67/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
68/// public-type transitive dep on the `bytes` crate.
69#[derive(Clone, Debug, PartialEq, Eq, Hash)]
70pub struct HandleOpaque(Box<[u8]>);
71
72impl HandleOpaque {
73 /// Construct from backend-owned bytes. Only backend impls call this.
74 pub fn new(bytes: Box<[u8]>) -> Self {
75 Self(bytes)
76 }
77
78 /// Borrow the underlying bytes (backend-internal use).
79 pub fn as_bytes(&self) -> &[u8] {
80 &self.0
81 }
82}
83
84/// Opaque attempt cookie held by the worker for the duration of an
85/// attempt. Produced by `claim` / `claim_from_reclaim` / `suspend`;
86/// borrowed by every op (renew, progress, append_frame, complete, fail,
87/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
88///
89/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
90/// than consume so callers can retry after a transport error.
91#[derive(Clone, Debug, PartialEq, Eq, Hash)]
92#[non_exhaustive]
93pub struct Handle {
94 pub backend: BackendTag,
95 pub kind: HandleKind,
96 pub opaque: HandleOpaque,
97}
98
99impl Handle {
100 /// Construct a new Handle. Called by backend impls only; consumer
101 /// code receives Handles from `claim` / `suspend` / `claim_from_reclaim`.
102 pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
103 Self {
104 backend,
105 kind,
106 opaque,
107 }
108 }
109}
110
111// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
112
113/// Worker capability set — the tokens the worker advertises to the
114/// scheduler and to `claim`. Today stored as `Vec<String>` on
115/// `WorkerConfig`; promoted to a named newtype so the trait signatures
116/// can talk about capabilities without committing to a concrete
117/// container shape.
118///
119/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
120/// round-2 lean (newtype over `Vec<String>`).
121#[derive(Clone, Debug, PartialEq, Eq, Default)]
122#[non_exhaustive]
123pub struct CapabilitySet {
124 pub tokens: Vec<String>,
125}
126
127impl CapabilitySet {
128 /// Build from any iterable of string-like capability tokens.
129 pub fn new<I, S>(tokens: I) -> Self
130 where
131 I: IntoIterator<Item = S>,
132 S: Into<String>,
133 {
134 Self {
135 tokens: tokens.into_iter().map(Into::into).collect(),
136 }
137 }
138
139 /// True iff the set contains no tokens.
140 pub fn is_empty(&self) -> bool {
141 self.tokens.is_empty()
142 }
143}
144
145/// Policy hints for `claim`. Minimal at Stage 0 per RFC-012 §3.3.0
146/// ("Bikeshed-prone; keep minimal at Stage 0"). Future fields (retry
147/// count, fairness hints) land additively.
148#[derive(Clone, Debug, PartialEq, Eq, Default)]
149#[non_exhaustive]
150pub struct ClaimPolicy {
151 /// Maximum blocking wait. `None` means backend-default (today:
152 /// non-blocking / immediate return).
153 pub max_wait: Option<Duration>,
154}
155
156impl ClaimPolicy {
157 /// Zero-timeout claim (non-blocking). Matches today's SDK default.
158 pub fn immediate() -> Self {
159 Self { max_wait: None }
160 }
161
162 /// Claim with an explicit blocking bound.
163 pub fn with_max_wait(max_wait: Duration) -> Self {
164 Self {
165 max_wait: Some(max_wait),
166 }
167 }
168}
169
170/// Frame classification for `append_frame`. Mirrors the Lua-side
171/// `ff_append_frame` `frame_type` ARGV.
172#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
173#[non_exhaustive]
174pub enum FrameKind {
175 /// Operator-visible progress output / log line.
176 Stdout,
177 /// Operator-visible error / warning output.
178 Stderr,
179 /// Structured event (JSON payload).
180 Event,
181 /// Binary / opaque payload.
182 Blob,
183}
184
185/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
186/// Today's FCALL takes the byte payload + frame_type + optional seq as
187/// discrete ARGV; Stage 0 collects them into a named type for trait
188/// signatures.
189#[derive(Clone, Debug, PartialEq, Eq)]
190#[non_exhaustive]
191pub struct Frame {
192 pub bytes: Vec<u8>,
193 pub kind: FrameKind,
194 /// Optional monotonic sequence. Set by the caller when the stream
195 /// protocol is sequence-bound; `None` lets the backend assign.
196 pub seq: Option<u64>,
197}
198
199impl Frame {
200 /// Construct a frame. `seq` defaults to `None` (backend-assigned);
201 /// callers that need an explicit sequence use
202 /// [`Frame::with_seq`].
203 pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
204 Self {
205 bytes,
206 kind,
207 seq: None,
208 }
209 }
210
211 /// Construct a frame with an explicit monotonic sequence.
212 pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
213 Self {
214 bytes,
215 kind,
216 seq: Some(seq),
217 }
218 }
219}
220
221// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
222
223/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
224/// — signal name, correlation id, etc.).
225#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
226#[non_exhaustive]
227pub enum WaitpointKind {
228 /// Match by signal name.
229 SignalName,
230 /// Match by correlation id.
231 CorrelationId,
232 /// Generic external-signal waitpoint (external delivery).
233 External,
234}
235
236/// HMAC token that binds a waitpoint to its mint-time identity. Wire
237/// shape `kid:40hex`.
238///
239/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
240/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
241/// through automatically — no derived formatter can leak the raw
242/// digest when a [`WaitpointSpec`] is debug-printed via
243/// `tracing::debug!(spec=?spec)`.
244///
245/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
246/// naming the waitpoint-bound HMAC role explicitly.
247#[derive(Clone, PartialEq, Eq, Hash)]
248pub struct WaitpointHmac(WaitpointToken);
249
250impl WaitpointHmac {
251 pub fn new(token: impl Into<String>) -> Self {
252 Self(WaitpointToken::from(token.into()))
253 }
254
255 /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
256 /// redact — call sites that need the raw digest must explicitly
257 /// call [`WaitpointToken::as_str`].
258 pub fn token(&self) -> &WaitpointToken {
259 &self.0
260 }
261
262 /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
263 /// non-redacted call sites; this method exists only for transport
264 /// / FCALL ARGV construction where the raw wire bytes are required.
265 pub fn as_str(&self) -> &str {
266 self.0.as_str()
267 }
268}
269
270// Forward Debug / Display to the wrapped WaitpointToken so the
271// redaction guarantees on that type extend here. Derived Debug would
272// expose the raw string field and defeat the wrap.
273impl std::fmt::Debug for WaitpointHmac {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 write!(f, "WaitpointHmac({:?})", self.0)
276 }
277}
278
279/// One waitpoint inside a suspend request. `suspend` takes a
280/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
281/// the enclosing suspend args in the Phase-1 contract.
282#[derive(Clone, Debug, PartialEq, Eq)]
283#[non_exhaustive]
284pub struct WaitpointSpec {
285 pub kind: WaitpointKind,
286 pub matcher: Vec<u8>,
287 pub hmac_token: WaitpointHmac,
288}
289
290impl WaitpointSpec {
291 pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
292 Self {
293 kind,
294 matcher,
295 hmac_token,
296 }
297 }
298}
299
300// ── §3.3.0 Failure classification types ─────────────────────────────────
301
302/// Human-readable failure description + optional structured detail.
303/// Replaces today's ad-hoc string arg to `fail`.
304#[derive(Clone, Debug, PartialEq, Eq)]
305#[non_exhaustive]
306pub struct FailureReason {
307 pub message: String,
308 pub detail: Option<Vec<u8>>,
309}
310
311impl FailureReason {
312 pub fn new(message: impl Into<String>) -> Self {
313 Self {
314 message: message.into(),
315 detail: None,
316 }
317 }
318
319 pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
320 Self {
321 message: message.into(),
322 detail: Some(detail),
323 }
324 }
325}
326
327/// Failure classification — determines retry disposition on the Lua
328/// side. Mirrors the Lua-side classification codes.
329#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
330#[non_exhaustive]
331pub enum FailureClass {
332 /// Retryable transient error.
333 Transient,
334 /// Permanent error — no retries.
335 Permanent,
336 /// Crash / process death inferred from lease expiry.
337 InfraCrash,
338 /// Timeout at the attempt or operation level.
339 Timeout,
340 /// Cooperative cancellation by operator or cancel_flow.
341 Cancelled,
342}
343
344// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
345
346/// Usage report for `report_usage`. Mirrors today's
347/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
348/// dimensions.
349#[derive(Clone, Debug, PartialEq, Eq, Default)]
350#[non_exhaustive]
351pub struct UsageDimensions {
352 /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
353 pub input_tokens: u64,
354 /// Output tokens produced (LLM-shaped usage).
355 pub output_tokens: u64,
356 /// Wall-clock duration, in milliseconds, attributable to this
357 /// report. `None` for pure token-count reports.
358 pub wall_ms: Option<u64>,
359 /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
360 /// iteration order (important for dedup-key derivation on some
361 /// budget schemes).
362 pub custom: BTreeMap<String, u64>,
363}
364
365/// Admission outcome returned by `report_usage`.
366#[derive(Clone, Debug, PartialEq, Eq)]
367#[non_exhaustive]
368pub enum AdmissionDecision {
369 /// Usage accepted; caller may continue.
370 Admitted,
371 /// Rate-limited or concurrency-capped; retry after the suggested
372 /// interval.
373 Throttled { retry_after_ms: u64 },
374 /// Rejected outright — budget exhausted or policy violation.
375 Rejected { reason: String },
376}
377
378// ── §3.3.0 Reclaim / lease types ────────────────────────────────────────
379
380/// Opaque cookie returned by the reclaim scanner; consumed by
381/// `claim_from_reclaim` to mint a resumed Handle.
382///
383/// Wraps [`ReclaimGrant`] today (the scanner's existing product).
384/// Kept as a newtype so trait signatures name the reclaim-bound role
385/// explicitly and so the wrapped shape can evolve without breaking the
386/// trait.
387#[derive(Clone, Debug, PartialEq, Eq)]
388#[non_exhaustive]
389pub struct ReclaimToken {
390 pub grant: ReclaimGrant,
391}
392
393impl ReclaimToken {
394 pub fn new(grant: ReclaimGrant) -> Self {
395 Self { grant }
396 }
397}
398
399/// Result of a successful `renew` call.
400#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
401#[non_exhaustive]
402pub struct LeaseRenewal {
403 /// New lease expiry (monotonic ms).
404 pub expires_at_ms: u64,
405 /// Lease epoch after renewal. Monotonic non-decreasing.
406 pub lease_epoch: u64,
407}
408
409impl LeaseRenewal {
410 pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
411 Self {
412 expires_at_ms,
413 lease_epoch,
414 }
415 }
416}
417
418// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
419
420/// Cancel-flow policy — what to do with the flow's members. Today
421/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
422/// policy shape as a typed enum (RFC-012 §3.3.0).
423#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
424#[non_exhaustive]
425pub enum CancelFlowPolicy {
426 /// Cancel only the flow record; leave members alone.
427 FlowOnly,
428 /// Cancel the flow and every non-terminal member execution.
429 CancelAll,
430 /// Cancel the flow and every member currently in `Pending` /
431 /// `Blocked` / `Eligible` — leave `Running` executions alone to
432 /// drain.
433 CancelPending,
434}
435
436/// Caller wait posture for `cancel_flow`.
437#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
438#[non_exhaustive]
439pub enum CancelFlowWait {
440 /// Return after the flow-state flip; member cancellations dispatch
441 /// asynchronously.
442 NoWait,
443 /// Block until member cancellations complete, up to `timeout`.
444 WaitTimeout(Duration),
445 /// Block until member cancellations complete, no deadline.
446 WaitIndefinite,
447}
448
449// ── §3.3.0 Completion stream payload ────────────────────────────────────
450
451/// One completion event delivered through the `CompletionStream`
452/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
453/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
454///
455/// `flow_id` was added in issue #90 so DAG-dependency routing
456/// (`dispatch_dependency_resolution`) has the partition-routable flow
457/// handle without reparsing the Lua-emitted JSON downstream.
458/// `#[non_exhaustive]` keeps future field additions additive; use
459/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
460/// for construction.
461///
462/// `payload_bytes` / `produced_at_ms` are authorised but not yet
463/// populated by the Valkey Lua emitters — consumers on the
464/// `CompletionStream` read `execution_id` + `flow_id` today and must
465/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
466#[derive(Clone, Debug, PartialEq, Eq)]
467#[non_exhaustive]
468pub struct CompletionPayload {
469 pub execution_id: crate::types::ExecutionId,
470 pub outcome: String,
471 pub payload_bytes: Option<Vec<u8>>,
472 pub produced_at_ms: TimestampMs,
473 /// Flow handle for partition routing. Added in issue #90 (#90);
474 /// `None` for emitters that don't yet surface it.
475 pub flow_id: Option<crate::types::FlowId>,
476}
477
478impl CompletionPayload {
479 pub fn new(
480 execution_id: crate::types::ExecutionId,
481 outcome: impl Into<String>,
482 payload_bytes: Option<Vec<u8>>,
483 produced_at_ms: TimestampMs,
484 ) -> Self {
485 Self {
486 execution_id,
487 outcome: outcome.into(),
488 payload_bytes,
489 produced_at_ms,
490 flow_id: None,
491 }
492 }
493
494 /// Attach a flow handle to the payload. Additive builder so adding
495 /// `flow_id` didn't require a breaking change to [`Self::new`].
496 #[must_use]
497 pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
498 self.flow_id = Some(flow_id);
499 self
500 }
501}
502
503// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
504
505/// Signal that satisfied a waitpoint matcher and is therefore part of
506/// the reason an execution resumed. Returned by `observe_signals`
507/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
508///
509/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
510/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
511///
512/// Returned only for signals whose matcher slot in the waitpoint's
513/// resume condition is marked satisfied. Pre-buffered-but-unmatched
514/// signals are not included.
515///
516/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
517/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
518/// the Stage 0 crate move.
519#[derive(Clone, Debug, PartialEq, Eq)]
520pub struct ResumeSignal {
521 pub signal_id: crate::types::SignalId,
522 pub signal_name: String,
523 pub signal_category: String,
524 pub source_type: String,
525 pub source_identity: String,
526 pub correlation_id: String,
527 /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
528 /// accepted this signal. `0` if the stored `accepted_at` field is
529 /// missing or non-numeric (a Lua-side defect — not expected at
530 /// runtime).
531 pub accepted_at: TimestampMs,
532 /// Raw payload bytes, if the signal was delivered with one. `None`
533 /// for signals delivered without a payload.
534 pub payload: Option<Vec<u8>>,
535}
536
537// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
538
539/// Outcome of a `fail()` call.
540///
541/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
542/// `ff_core::backend::FailOutcome` so it is nameable by the
543/// `EngineBackend` trait signature. `ff_sdk::task` retains a
544/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
545///
546/// Not `#[non_exhaustive]` because existing consumers (ff-test,
547/// ff-readiness-tests) construct and match this enum exhaustively;
548/// the shape has been stable since the `fail()` API landed and any
549/// additive growth would be a follow-up RFC's deliberate break.
550#[derive(Clone, Debug, PartialEq, Eq)]
551pub enum FailOutcome {
552 /// Retry was scheduled — execution is in delayed backoff.
553 RetryScheduled {
554 delay_until: crate::types::TimestampMs,
555 },
556 /// No retries left — execution is terminal failed.
557 TerminalFailed,
558}
559
560// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
561
562/// Pool timing shared across backend connections.
563///
564/// Fields are `#[non_exhaustive]` per project convention — connection
565/// tunables grow as new backends land. Default is the Phase-1 Valkey
566/// client's out-of-box shape (no explicit timeout, no explicit pool
567/// cap; inherits ferriskey defaults).
568#[derive(Clone, Debug, Default, PartialEq, Eq)]
569#[non_exhaustive]
570pub struct BackendTimeouts {
571 /// Per-request timeout. `None` ⇒ backend default.
572 pub request: Option<Duration>,
573}
574
575/// Retry policy shared across backend connections.
576///
577/// Matches ferriskey's `ConnectionRetryStrategy` shape (see
578/// `ferriskey/src/client/types.rs:151`) so Stage 1c's Valkey wiring is a
579/// direct pass-through — we don't reimplement what ferriskey already
580/// provides. The Postgres backend (future) interprets the same fields
581/// under its own retry semantics, or maps `None` to its own defaults.
582///
583/// Each field is `Option<u32>`: `None` ⇒ backend default (for Valkey,
584/// this means `ConnectionRetryStrategy::default()`); `Some(v)` ⇒ pass
585/// `v` straight through.
586#[derive(Clone, Debug, Default, PartialEq, Eq)]
587#[non_exhaustive]
588pub struct BackendRetry {
589 /// Exponent base for the backoff curve. `None` ⇒ backend default.
590 pub exponent_base: Option<u32>,
591 /// Multiplicative factor applied to each backoff step. `None` ⇒
592 /// backend default.
593 pub factor: Option<u32>,
594 /// Maximum number of retry attempts on transient transport errors.
595 /// `None` ⇒ backend default.
596 pub number_of_retries: Option<u32>,
597 /// Jitter as a percentage of the computed backoff. `None` ⇒ backend
598 /// default.
599 pub jitter_percent: Option<u32>,
600}
601
602/// Valkey-specific connection parameters.
603#[derive(Clone, Debug, PartialEq, Eq)]
604#[non_exhaustive]
605pub struct ValkeyConnection {
606 /// Valkey hostname.
607 pub host: String,
608 /// Valkey port.
609 pub port: u16,
610 /// Enable TLS for the Valkey connection.
611 pub tls: bool,
612 /// Enable Valkey cluster mode.
613 pub cluster: bool,
614}
615
616impl ValkeyConnection {
617 pub fn new(host: impl Into<String>, port: u16) -> Self {
618 Self {
619 host: host.into(),
620 port,
621 tls: false,
622 cluster: false,
623 }
624 }
625}
626
627/// Discriminated union over per-backend connection shapes. Stage 1a
628/// ships the Valkey arm; future backends (Postgres) land additively
629/// under the `#[non_exhaustive]` guard.
630#[derive(Clone, Debug, PartialEq, Eq)]
631#[non_exhaustive]
632pub enum BackendConnection {
633 Valkey(ValkeyConnection),
634}
635
636/// Configuration passed to `ValkeyBackend::connect` (and, later, to
637/// other backend `connect` constructors). Carries the connection
638/// details + shared timing/retry policy. Replaces the Valkey-specific
639/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
640///
641/// `BackendConfig` is the replacement target for `WorkerConfig`'s
642/// `host` / `port` / `tls` / `cluster` fields. The full migration
643/// lands across Stage 1a (type introduction) and Stage 1c
644/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
645/// claim poll interval, capability set) stay on `WorkerConfig`.
646#[derive(Clone, Debug, PartialEq, Eq)]
647#[non_exhaustive]
648pub struct BackendConfig {
649 pub connection: BackendConnection,
650 pub timeouts: BackendTimeouts,
651 pub retry: BackendRetry,
652}
653
654impl BackendConfig {
655 /// Build a Valkey BackendConfig from host+port. Other fields take
656 /// backend defaults.
657 pub fn valkey(host: impl Into<String>, port: u16) -> Self {
658 Self {
659 connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
660 timeouts: BackendTimeouts::default(),
661 retry: BackendRetry::default(),
662 }
663 }
664}
665
666// ── Issue #122: ScannerFilter ───────────────────────────────────────────
667
668/// Per-consumer filter applied by FlowFabric's background scanners and
669/// completion subscribers so multiple FlowFabric instances sharing a
670/// single Valkey keyspace can operate on disjoint subsets of
671/// executions without mutual interference (issue #122).
672///
673/// Sibling of [`CompletionPayload`]: the former is a scan *output*
674/// shape, this is the *input* predicate scanners and completion
675/// subscribers consult per candidate.
676///
677/// # Fields & backing storage
678///
679/// * `namespace` — matches against the `namespace` field on
680/// `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
681/// one HGET per candidate when set.
682/// * `instance_tag` — matches against an entry in the execution's
683/// user-supplied tags hash at the canonical key
684/// **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
685/// hash-tag, e.g. `{fp:42}`). Written by the Lua function
686/// `ff_create_execution` (see `lua/execution.lua`) and
687/// `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
688/// the HGET targets `tag_key` on the tags hash and compares the
689/// returned string (if any) byte-for-byte against `tag_value`.
690/// Cost: one HGET per candidate when set.
691///
692/// When both are set, scanners check `namespace` first (short-circuit
693/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
694/// candidate.
695///
696/// # Semantics
697///
698/// * [`Self::is_noop`] returns true when both fields are `None` —
699/// the filter accepts every candidate. Used by the
700/// `subscribe_completions_filtered` default body to fall back to
701/// the unfiltered subscription.
702/// * [`Self::matches`] is the tight in-memory predicate once the
703/// HGET values have been fetched. Scanners fetch the fields
704/// lazily (namespace first) and pass the results in; the helper
705/// returns false as soon as one component mismatches.
706///
707/// # Scope
708///
709/// Today the filter is consulted by execution-shaped scanners
710/// (lease_expiry, attempt_timeout, execution_deadline,
711/// suspension_timeout, pending_wp_expiry, delayed_promoter,
712/// dependency_reconciler, cancel_reconciler, unblock,
713/// index_reconciler, retention_trimmer) and by completion
714/// subscribers. Non-execution scanners (budget_reconciler,
715/// budget_reset, quota_reconciler, flow_projector) accept a filter
716/// for API uniformity but do not apply it — their iteration domains
717/// (budgets, quotas, flows) are not keyed by the
718/// per-execution namespace / instance_tag shape.
719///
720/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
721/// `worker_instance`) can land additively.
722#[derive(Clone, Debug, Default, PartialEq, Eq)]
723#[non_exhaustive]
724pub struct ScannerFilter {
725 /// Tenant / workspace scope. Matches against the `namespace`
726 /// field on `exec_core`.
727 pub namespace: Option<Namespace>,
728 /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
729 /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
730 /// written by `ff_create_execution`).
731 pub instance_tag: Option<(String, String)>,
732}
733
734impl ScannerFilter {
735 /// Shared no-op filter — useful as the default for the
736 /// `Scanner::filter()` trait method so implementors that don't
737 /// override can hand back a `&'static` reference without
738 /// allocating per call.
739 pub const NOOP: Self = Self {
740 namespace: None,
741 instance_tag: None,
742 };
743
744 /// Create an empty filter (equivalent to [`Self::NOOP`]).
745 ///
746 /// Provided for external-crate consumers: `ScannerFilter` is
747 /// `#[non_exhaustive]`, so struct-literal and functional-update
748 /// construction are unavailable across crate boundaries. Start
749 /// from `new()` and chain `with_*` setters to build a filter.
750 pub fn new() -> Self {
751 Self::default()
752 }
753
754 /// Set the tenant-scope namespace filter dimension. Consumes
755 /// and returns `self` for chaining.
756 pub fn with_namespace(mut self, ns: Namespace) -> Self {
757 self.namespace = Some(ns);
758 self
759 }
760
761 /// Set the exact-match exec-tag filter dimension. Consumes and
762 /// returns `self` for chaining. At filter time, scanners read
763 /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
764 /// `key` byte-for-byte against `value`.
765 pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
766 self.instance_tag = Some((key.into(), value.into()));
767 self
768 }
769
770 /// True iff the filter has no dimensions set — every candidate
771 /// passes. Callers use this to short-circuit filtered subscribe
772 /// paths back to the unfiltered ones.
773 pub fn is_noop(&self) -> bool {
774 self.namespace.is_none() && self.instance_tag.is_none()
775 }
776
777 /// Post-HGET in-memory match check.
778 ///
779 /// `core_namespace` should be the `namespace` field read from
780 /// `exec_core` (None if the HGET returned nil / was skipped).
781 /// `tag_value` should be the HGET result for the configured
782 /// `instance_tag.0` key on the execution's tags hash (None if
783 /// the HGET returned nil / was skipped).
784 ///
785 /// When a filter dimension is `None` the corresponding argument
786 /// is ignored — callers may pass `None` to skip the HGET and
787 /// save a round-trip.
788 pub fn matches(
789 &self,
790 core_namespace: Option<&Namespace>,
791 tag_value: Option<&str>,
792 ) -> bool {
793 if let Some(ref want) = self.namespace {
794 match core_namespace {
795 Some(have) if have == want => {}
796 _ => return false,
797 }
798 }
799 if let Some((_, ref want_value)) = self.instance_tag {
800 match tag_value {
801 Some(have) if have == want_value.as_str() => {}
802 _ => return false,
803 }
804 }
805 true
806 }
807}
808
809// ── Tests ───────────────────────────────────────────────────────────────
810
811#[cfg(test)]
812mod tests {
813 use super::*;
814 use crate::partition::{Partition, PartitionFamily};
815 use crate::types::{ExecutionId, LaneId, SignalId};
816
817 #[test]
818 fn backend_tag_derives() {
819 let a = BackendTag::Valkey;
820 let b = a;
821 assert_eq!(a, b);
822 assert_eq!(format!("{a:?}"), "Valkey");
823 }
824
825 #[test]
826 fn handle_kind_derives() {
827 for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
828 let c = k;
829 assert_eq!(k, c);
830 // Debug formatter reachable
831 let _ = format!("{k:?}");
832 }
833 }
834
835 #[test]
836 fn handle_opaque_roundtrips() {
837 let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
838 let o = HandleOpaque::new(bytes.clone());
839 assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
840 assert_eq!(o, o.clone());
841 let _ = format!("{o:?}");
842 }
843
844 #[test]
845 fn handle_composes() {
846 let h = Handle::new(
847 BackendTag::Valkey,
848 HandleKind::Fresh,
849 HandleOpaque::new(Box::new([0u8; 4])),
850 );
851 assert_eq!(h.backend, BackendTag::Valkey);
852 assert_eq!(h.kind, HandleKind::Fresh);
853 assert_eq!(h.clone(), h);
854 }
855
856 #[test]
857 fn capability_set_derives() {
858 let c1 = CapabilitySet::new(["gpu", "cuda"]);
859 let c2 = CapabilitySet::new(["gpu", "cuda"]);
860 assert_eq!(c1, c2);
861 assert!(!c1.is_empty());
862 assert!(CapabilitySet::default().is_empty());
863 let _ = format!("{c1:?}");
864 }
865
866 #[test]
867 fn claim_policy_derives() {
868 let p = ClaimPolicy::with_max_wait(Duration::from_millis(500));
869 assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
870 assert_eq!(p.clone(), p);
871 assert_eq!(ClaimPolicy::immediate(), ClaimPolicy::default());
872 }
873
874 #[test]
875 fn frame_and_kind_derive() {
876 let f = Frame {
877 bytes: b"hello".to_vec(),
878 kind: FrameKind::Stdout,
879 seq: Some(3),
880 };
881 assert_eq!(f.clone(), f);
882 assert_eq!(f.kind, FrameKind::Stdout);
883 assert_ne!(FrameKind::Stderr, FrameKind::Event);
884 let _ = format!("{f:?}");
885 }
886
887 #[test]
888 fn waitpoint_spec_derives() {
889 let spec = WaitpointSpec {
890 kind: WaitpointKind::SignalName,
891 matcher: b"approved".to_vec(),
892 hmac_token: WaitpointHmac::new("kid1:deadbeef"),
893 };
894 assert_eq!(spec.clone(), spec);
895 assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
896 assert_eq!(
897 WaitpointHmac::new("a"),
898 WaitpointHmac::new(String::from("a"))
899 );
900 }
901
902 #[test]
903 fn failure_reason_and_class() {
904 let r1 = FailureReason::new("boom");
905 let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
906 assert_eq!(r1.message, "boom");
907 assert!(r1.detail.is_none());
908 assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
909 assert_eq!(r1.clone(), r1);
910 assert_ne!(FailureClass::Transient, FailureClass::Permanent);
911 }
912
913 #[test]
914 fn usage_dimensions_default_and_eq() {
915 let u = UsageDimensions {
916 input_tokens: 10,
917 output_tokens: 20,
918 wall_ms: Some(150),
919 custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
920 };
921 assert_eq!(u.clone(), u);
922 assert_eq!(UsageDimensions::default().input_tokens, 0);
923 }
924
925 #[test]
926 fn admission_decision_derives() {
927 let a = AdmissionDecision::Admitted;
928 let t = AdmissionDecision::Throttled { retry_after_ms: 25 };
929 let r = AdmissionDecision::Rejected {
930 reason: "quota".into(),
931 };
932 assert_eq!(a.clone(), a);
933 assert_eq!(t.clone(), t);
934 assert_ne!(a, r);
935 }
936
937 #[test]
938 fn reclaim_token_wraps_grant() {
939 let grant = ReclaimGrant {
940 execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
941 partition_key: crate::partition::PartitionKey::from(&Partition {
942 family: PartitionFamily::Flow,
943 index: 0,
944 }),
945 grant_key: "gkey".into(),
946 expires_at_ms: 123,
947 lane_id: LaneId::new("default"),
948 };
949 let t = ReclaimToken::new(grant.clone());
950 assert_eq!(t.grant, grant);
951 assert_eq!(t.clone(), t);
952 }
953
954 #[test]
955 fn lease_renewal_is_copy() {
956 let r = LeaseRenewal {
957 expires_at_ms: 100,
958 lease_epoch: 2,
959 };
960 let s = r; // Copy
961 assert_eq!(r, s);
962 }
963
964 #[test]
965 fn cancel_flow_policy_and_wait() {
966 assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
967 let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
968 assert_eq!(w, w);
969 assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
970 }
971
972 #[test]
973 fn completion_payload_derives() {
974 let c = CompletionPayload {
975 execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
976 outcome: "success".into(),
977 payload_bytes: Some(b"ok".to_vec()),
978 produced_at_ms: TimestampMs::from_millis(1234),
979 flow_id: None,
980 };
981 assert_eq!(c.clone(), c);
982 let _ = format!("{c:?}");
983 }
984
985 #[test]
986 fn resume_signal_moved_and_derives() {
987 let s = ResumeSignal {
988 signal_id: SignalId::new(),
989 signal_name: "approve".into(),
990 signal_category: "decision".into(),
991 source_type: "user".into(),
992 source_identity: "u1".into(),
993 correlation_id: "c1".into(),
994 accepted_at: TimestampMs::from_millis(10),
995 payload: None,
996 };
997 assert_eq!(s.clone(), s);
998 let _ = format!("{s:?}");
999 }
1000
1001 #[test]
1002 fn fail_outcome_variants() {
1003 let retry = FailOutcome::RetryScheduled {
1004 delay_until: TimestampMs::from_millis(42),
1005 };
1006 let terminal = FailOutcome::TerminalFailed;
1007 assert_ne!(retry, terminal);
1008 assert_eq!(retry.clone(), retry);
1009 }
1010
1011 #[test]
1012 fn scanner_filter_noop_and_default() {
1013 let f = ScannerFilter::default();
1014 assert!(f.is_noop());
1015 assert_eq!(f, ScannerFilter::NOOP);
1016 // A no-op filter matches any candidate, including ones that
1017 // produced no HGET results.
1018 assert!(f.matches(None, None));
1019 assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1020 }
1021
1022 #[test]
1023 fn scanner_filter_namespace_match() {
1024 let f = ScannerFilter {
1025 namespace: Some(Namespace::new("tenant-a")),
1026 instance_tag: None,
1027 };
1028 assert!(!f.is_noop());
1029 assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1030 assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1031 // Missing core namespace ⇒ no match.
1032 assert!(!f.matches(None, None));
1033 }
1034
1035 #[test]
1036 fn scanner_filter_instance_tag_match() {
1037 let f = ScannerFilter {
1038 namespace: None,
1039 instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1040 };
1041 assert!(f.matches(None, Some("i-1")));
1042 assert!(!f.matches(None, Some("i-2")));
1043 assert!(!f.matches(None, None));
1044 }
1045
1046 #[test]
1047 fn scanner_filter_both_dimensions() {
1048 let f = ScannerFilter {
1049 namespace: Some(Namespace::new("tenant-a")),
1050 instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1051 };
1052 assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1053 assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1054 assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1055 assert!(!f.matches(None, Some("i-1")));
1056 }
1057
1058 #[test]
1059 fn scanner_filter_builder_construction() {
1060 // Simulates external-crate usage: only public constructors
1061 // and chainable setters (no struct-literal access to the
1062 // `#[non_exhaustive]` fields).
1063 let empty = ScannerFilter::new();
1064 assert!(empty.is_noop());
1065 assert_eq!(empty, ScannerFilter::NOOP);
1066
1067 let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1068 assert!(!ns_only.is_noop());
1069 assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1070
1071 let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1072 assert!(!tag_only.is_noop());
1073 assert!(tag_only.matches(None, Some("i-1")));
1074
1075 let both = ScannerFilter::new()
1076 .with_namespace(Namespace::new("tenant-a"))
1077 .with_instance_tag("cairn.instance_id", "i-1");
1078 assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1079 assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1080 }
1081
1082 #[test]
1083 fn backend_config_valkey_ctor() {
1084 let c = BackendConfig::valkey("host.local", 6379);
1085 // Same-crate match against an otherwise `#[non_exhaustive]`
1086 // enum is irrefutable — no wildcard needed and `let-else`
1087 // would trip `irrefutable_let_patterns`.
1088 let BackendConnection::Valkey(v) = &c.connection;
1089 assert_eq!(v.host, "host.local");
1090 assert_eq!(v.port, 6379);
1091 assert!(!v.tls);
1092 assert!(!v.cluster);
1093 assert_eq!(c.timeouts, BackendTimeouts::default());
1094 assert_eq!(c.retry, BackendRetry::default());
1095 assert_eq!(c.clone(), c);
1096 }
1097}