Skip to main content

ff_core/
backend.rs

1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//!   [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//!   annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//!   ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//!   at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ReclaimGrant;
24use crate::types::{TimestampMs, WaitpointToken};
25
26// DX (HHH v0.3.4 re-smoke): `Namespace` lives in `ff_core::types` but
27// is used on `BackendConfig` + `ScannerFilter` (both defined in this
28// module). Re-export here so consumers already scoped to
29// `ff_core::backend::*` can grab it without a second `use` line
30// crossing into `ff_core::types`. Also brings `Namespace` into local
31// scope for the definitions below.
32pub use crate::types::Namespace;
33
34// RFC-013 Stage 1d — re-export the typed suspend trait surface so
35// external crates using `ff_core::backend::*` can reach the new types
36// without a second `use ff_core::contracts` line. Keeps the trait
37// naming surface coherent (`PendingWaitpoint`, `WaitpointSpec`, and
38// the new `SuspendArgs` / `SuspendOutcome` all live at the same path).
39pub use crate::contracts::{
40    CompositeBody, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget, SignalMatcher,
41    SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
42    SuspensionRequester, TimeoutBehavior, WaitpointBinding,
43};
44use std::collections::BTreeMap;
45use std::time::Duration;
46
47// ── §4.1 Handle trio ────────────────────────────────────────────────────
48
49/// Backend-tag discriminator embedded in every [`Handle`] so ops can
50/// dispatch to the correct backend implementation at runtime.
51///
52/// `#[non_exhaustive]`: new backend variants land additively as impls
53/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
54/// later).
55#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
56#[non_exhaustive]
57pub enum BackendTag {
58    /// The Valkey FCALL-backed implementation.
59    Valkey,
60}
61
62/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
63/// on entry to each op and return `EngineError::State` on mismatch.
64///
65/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
66/// `SuspendToken` type split (RFC-012 §4.1).
67#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
68#[non_exhaustive]
69pub enum HandleKind {
70    /// Fresh claim — returned by `claim` / `claim_from_grant`.
71    Fresh,
72    /// Resumed from reclaim — returned by `claim_from_reclaim`.
73    Resumed,
74    /// Suspended — returned by `suspend`. Terminal for the lease;
75    /// resumption mints a new Handle via `claim_from_reclaim`.
76    Suspended,
77}
78
79/// Backend-private opaque payload carried inside a [`Handle`].
80///
81/// Encodes backend-specific state (on Valkey: exec id, attempt id,
82/// lease id, lease epoch, capability binding, partition). Consumers do
83/// not construct or inspect the bytes — they are produced by the
84/// backend on claim/resume and consumed by the backend on each op.
85///
86/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
87/// public-type transitive dep on the `bytes` crate.
88#[derive(Clone, Debug, PartialEq, Eq, Hash)]
89pub struct HandleOpaque(Box<[u8]>);
90
91impl HandleOpaque {
92    /// Construct from backend-owned bytes. Only backend impls call this.
93    pub fn new(bytes: Box<[u8]>) -> Self {
94        Self(bytes)
95    }
96
97    /// Borrow the underlying bytes (backend-internal use).
98    pub fn as_bytes(&self) -> &[u8] {
99        &self.0
100    }
101}
102
103/// Opaque attempt cookie held by the worker for the duration of an
104/// attempt. Produced by `claim` / `claim_from_reclaim` / `suspend`;
105/// borrowed by every op (renew, progress, append_frame, complete, fail,
106/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
107///
108/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
109/// than consume so callers can retry after a transport error.
110#[derive(Clone, Debug, PartialEq, Eq, Hash)]
111#[non_exhaustive]
112pub struct Handle {
113    pub backend: BackendTag,
114    pub kind: HandleKind,
115    pub opaque: HandleOpaque,
116}
117
118impl Handle {
119    /// Construct a new Handle. Called by backend impls only; consumer
120    /// code receives Handles from `claim` / `suspend` / `claim_from_reclaim`.
121    pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
122        Self {
123            backend,
124            kind,
125            opaque,
126        }
127    }
128}
129
130// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
131
132/// Worker capability set — the tokens the worker advertises to the
133/// scheduler and to `claim`. Today stored as `Vec<String>` on
134/// `WorkerConfig`; promoted to a named newtype so the trait signatures
135/// can talk about capabilities without committing to a concrete
136/// container shape.
137///
138/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
139/// round-2 lean (newtype over `Vec<String>`).
140#[derive(Clone, Debug, PartialEq, Eq, Default)]
141#[non_exhaustive]
142pub struct CapabilitySet {
143    pub tokens: Vec<String>,
144}
145
146impl CapabilitySet {
147    /// Build from any iterable of string-like capability tokens.
148    pub fn new<I, S>(tokens: I) -> Self
149    where
150        I: IntoIterator<Item = S>,
151        S: Into<String>,
152    {
153        Self {
154            tokens: tokens.into_iter().map(Into::into).collect(),
155        }
156    }
157
158    /// True iff the set contains no tokens.
159    pub fn is_empty(&self) -> bool {
160        self.tokens.is_empty()
161    }
162}
163
164/// Policy hints for `claim`. Minimal at Stage 0 per RFC-012 §3.3.0
165/// ("Bikeshed-prone; keep minimal at Stage 0"). Future fields (retry
166/// count, fairness hints) land additively.
167#[derive(Clone, Debug, PartialEq, Eq, Default)]
168#[non_exhaustive]
169pub struct ClaimPolicy {
170    /// Maximum blocking wait. `None` means backend-default (today:
171    /// non-blocking / immediate return).
172    pub max_wait: Option<Duration>,
173}
174
175impl ClaimPolicy {
176    /// Zero-timeout claim (non-blocking). Matches today's SDK default.
177    pub fn immediate() -> Self {
178        Self { max_wait: None }
179    }
180
181    /// Claim with an explicit blocking bound.
182    pub fn with_max_wait(max_wait: Duration) -> Self {
183        Self {
184            max_wait: Some(max_wait),
185        }
186    }
187}
188
189/// Frame classification for `append_frame`. Mirrors the Lua-side
190/// `ff_append_frame` `frame_type` ARGV.
191#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
192#[non_exhaustive]
193pub enum FrameKind {
194    /// Operator-visible progress output / log line.
195    Stdout,
196    /// Operator-visible error / warning output.
197    Stderr,
198    /// Structured event (JSON payload).
199    Event,
200    /// Binary / opaque payload.
201    Blob,
202}
203
204// ── RFC-015 §1–§6: Stream-durability modes ──────────────────────────────
205
206/// Patch format used by [`StreamMode::DurableSummary`] to apply each
207/// frame's payload against the server-side rolling summary document.
208///
209/// v0.6 ships a single variant, `JsonMergePatch` (RFC 7396). The enum
210/// is `#[non_exhaustive]` so the future `PatchKind::StringAppend`
211/// variant flagged in RFC-015 §11 can land additively without a
212/// breaking change.
213#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
214#[non_exhaustive]
215pub enum PatchKind {
216    /// RFC 7396 JSON Merge Patch. Locked choice for v0.6 (RFC-015 §3.2).
217    JsonMergePatch,
218}
219
220/// Per-call durability mode for [`EngineBackend::append_frame`].
221///
222/// RFC-015 §1. Mode is **per-call** — workers routinely mix frame types
223/// (tokens + progress + final summary) in a single attempt and the right
224/// durability differs per frame type. See RFC-015 §5 for the caveat on
225/// mixed-mode streams.
226///
227/// `#[non_exhaustive]`: new modes land additively (the v0.6 PR deliberately
228/// excludes `KeepAllDeltas`, dropped by owner adjudication; a future
229/// mode for e.g. per-frame-replicated streams would land here).
230///
231/// ## Doc-comment contract on [`Self::Durable`]
232///
233/// If the same stream also receives [`Self::BestEffortLive`] frames,
234/// `Durable` frames are subject to the best-effort MAXLEN trim (see
235/// RFC-015 §5). Callers that need strict retention for `Durable` frames
236/// alongside best-effort telemetry must not mix modes on one stream or
237/// should place the durable frames on a sibling stream.
238#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
239#[non_exhaustive]
240pub enum StreamMode {
241    /// Current default — every frame XADDs as a durable entry.
242    #[default]
243    Durable,
244    /// Server-side rolling-summary collapse. Each frame's payload is a
245    /// delta/patch applied atomically to a summary Hash (per
246    /// [`PatchKind`]); the delta is also XADDed to the stream with
247    /// `mode=summary` fields so live tailers continue to observe change
248    /// events. RFC-015 §3.
249    DurableSummary { patch_kind: PatchKind },
250    /// Short-lived frame. XADDed with `mode=best_effort`; per-stream
251    /// MAXLEN rolls it off and the stream key gets a TTL refresh only
252    /// when the stream has never held a durable frame (RFC-015 §4.1).
253    ///
254    /// The per-stream MAXLEN is computed dynamically in Lua from an
255    /// EMA of observed append rate (RFC-015 §4.2). See
256    /// [`BestEffortLiveConfig`] for the tunable knobs.
257    BestEffortLive { config: BestEffortLiveConfig },
258}
259
260/// Configuration knobs for [`StreamMode::BestEffortLive`] — RFC-015
261/// §4.2 dynamic MAXLEN sizing.
262///
263/// Defaults derived from the Phase 0 benchmark + RFC §4.1/§4.3 final
264/// design:
265///   - `ttl_ms` — caller-supplied visibility target.
266///   - `maxlen_floor = 64` — RFC §4.1 round-2 default, used for
267///     low-rate streams where the EMA formula would under-size.
268///   - `maxlen_ceiling = 16_384` — cap on per-stream retained entries
269///     (raised from the original §4.2 draft of 2048 after Phase 0
270///     showed 200–4000 Hz LLM-token bursts saturate any lower clamp).
271///   - `ema_alpha = 0.2` — RFC §4.3 gate value. Weights the latest
272///     per-append instantaneous rate at 20 %, decays prior samples
273///     at 80 % each append.
274#[derive(Clone, Copy, Debug, PartialEq)]
275#[non_exhaustive]
276pub struct BestEffortLiveConfig {
277    pub ttl_ms: u32,
278    pub maxlen_floor: u32,
279    pub maxlen_ceiling: u32,
280    pub ema_alpha: f64,
281}
282
283impl BestEffortLiveConfig {
284    /// Construct with the given `ttl_ms` and defaults for the other
285    /// knobs. Matches the shorthand used by
286    /// [`StreamMode::best_effort_live`].
287    pub fn with_ttl(ttl_ms: u32) -> Self {
288        Self {
289            ttl_ms,
290            maxlen_floor: 64,
291            maxlen_ceiling: 16_384,
292            ema_alpha: 0.2,
293        }
294    }
295
296    /// Builder: override [`Self::maxlen_floor`]. Chainable.
297    pub fn with_maxlen_floor(mut self, floor: u32) -> Self {
298        self.maxlen_floor = floor;
299        self
300    }
301
302    /// Builder: override [`Self::maxlen_ceiling`]. Chainable.
303    pub fn with_maxlen_ceiling(mut self, ceiling: u32) -> Self {
304        self.maxlen_ceiling = ceiling;
305        self
306    }
307
308    /// Builder: override [`Self::ema_alpha`]. Chainable. Callers are
309    /// expected to keep α in `(0.0, 1.0]`; the Lua side clamps to that
310    /// range defensively.
311    pub fn with_ema_alpha(mut self, alpha: f64) -> Self {
312        self.ema_alpha = alpha;
313        self
314    }
315}
316
317// Eq/Hash on an f64 field is unsafe (NaN); derive only what's sound.
318impl Eq for BestEffortLiveConfig {}
319impl std::hash::Hash for BestEffortLiveConfig {
320    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
321        self.ttl_ms.hash(state);
322        self.maxlen_floor.hash(state);
323        self.maxlen_ceiling.hash(state);
324        self.ema_alpha.to_bits().hash(state);
325    }
326}
327
328impl StreamMode {
329    /// The v0.6 default — [`Self::Durable`]. Provided for symmetry with
330    /// the other constructors (the enum is `#[non_exhaustive]` so
331    /// cross-crate consumers cannot construct variants by name).
332    pub fn durable() -> Self {
333        StreamMode::Durable
334    }
335
336    /// [`Self::DurableSummary`] with [`PatchKind::JsonMergePatch`] —
337    /// the only supported `PatchKind` in v0.6.
338    pub fn durable_summary() -> Self {
339        StreamMode::DurableSummary {
340            patch_kind: PatchKind::JsonMergePatch,
341        }
342    }
343
344    /// [`Self::BestEffortLive`] with the caller-supplied `ttl_ms` and
345    /// default [`BestEffortLiveConfig`] knobs.
346    /// RFC-015 §4 guidance: `5_000..=30_000` ms. Below ~1000 ms a live
347    /// tailer may not connect in time; above ~60_000 ms the memory
348    /// argument against plain [`Self::Durable`] weakens.
349    pub fn best_effort_live(ttl_ms: u32) -> Self {
350        StreamMode::BestEffortLive {
351            config: BestEffortLiveConfig::with_ttl(ttl_ms),
352        }
353    }
354
355    /// [`Self::BestEffortLive`] with a fully-specified
356    /// [`BestEffortLiveConfig`]. Use for per-workload tuning of α, the
357    /// MAXLEN clamp, or the TTL — defaults are wired from
358    /// [`BestEffortLiveConfig::with_ttl`].
359    pub fn best_effort_live_with_config(config: BestEffortLiveConfig) -> Self {
360        StreamMode::BestEffortLive { config }
361    }
362
363    /// Stable wire-level token for this mode, written to the XADD entry
364    /// `mode` field (RFC-015 §6.1). Tail filters compare against these
365    /// string values server-side.
366    pub fn wire_str(&self) -> &'static str {
367        match self {
368            StreamMode::Durable => "durable",
369            StreamMode::DurableSummary { .. } => "summary",
370            StreamMode::BestEffortLive { .. } => "best_effort",
371        }
372    }
373}
374
375/// Tail-stream visibility filter (RFC-015 §6).
376///
377/// Default [`Self::All`] preserves v1 behaviour; opt-in
378/// [`Self::ExcludeBestEffort`] filters out `BestEffortLive` frames on
379/// the server side (the XADD `mode` field is a cheap field check in
380/// `ff_read_attempt_stream` / `xread_block`).
381#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
382#[non_exhaustive]
383pub enum TailVisibility {
384    /// Default. Returns every XADD entry in the stream regardless of
385    /// mode.
386    #[default]
387    All,
388    /// Returns only frames appended under [`StreamMode::Durable`] or
389    /// [`StreamMode::DurableSummary`] (i.e. filters out
390    /// [`StreamMode::BestEffortLive`]). Named to be self-describing:
391    /// the filter excludes best-effort frames, not "only Durable" —
392    /// `DurableSummary` deltas are included because they have a
393    /// durable backing (the summary Hash).
394    ExcludeBestEffort,
395}
396
397impl TailVisibility {
398    /// Stable wire-level token pushed as an ARGV to the Lua tail/read
399    /// implementations. `""` (empty) means default = `All` (no filter).
400    pub fn wire_str(&self) -> &'static str {
401        match self {
402            TailVisibility::All => "",
403            TailVisibility::ExcludeBestEffort => "exclude_best_effort",
404        }
405    }
406}
407
408/// Byte-exact null-sentinel used by [`StreamMode::DurableSummary`] +
409/// [`PatchKind::JsonMergePatch`] to set a field to JSON `null` (RFC-015
410/// §3.2).
411///
412/// RFC 7396 treats `null` as "delete the key", so callers that actually
413/// want a literal JSON `null` leaf send this sentinel string as the
414/// scalar leaf value; the Lua apply-path rewrites the sentinel to JSON
415/// `null` after the merge.
416///
417/// # Round-trip invariant
418///
419/// The summary document — as returned by a `read_summary` call — NEVER
420/// contains the sentinel string. Any `null` observed always means "this
421/// field is explicitly null." See [`SummaryDocument`].
422pub const SUMMARY_NULL_SENTINEL: &str = "__ff_null__";
423
424/// Materialised rolling summary document returned by a summary-read
425/// call (RFC-015 §6.3).
426///
427/// `document` is the caller-owned JSON value; scalar leaves that the
428/// caller wrote through the [`SUMMARY_NULL_SENTINEL`] contract appear
429/// here as JSON `null` (not as the sentinel string — the round-trip
430/// invariant erases the sentinel on read).
431///
432/// `#[non_exhaustive]` keeps future additive fields (e.g. a compacted
433/// delta cursor, a schema-version tag) additive. Construct via
434/// [`Self::new`].
435#[derive(Clone, Debug, PartialEq, Eq)]
436#[non_exhaustive]
437pub struct SummaryDocument {
438    /// The rolling summary as JSON bytes (UTF-8, encoded by the
439    /// server-side applier after merge). Stored as `Vec<u8>` instead
440    /// of `serde_json::Value` to keep `ff_core::backend` free of a
441    /// public `serde_json` type dependency.
442    pub document_json: Vec<u8>,
443    /// Monotonic version bumped on every delta applied. `0` is never
444    /// observed — the first applied delta returns `1`.
445    pub version: u64,
446    /// Which [`PatchKind`] was used to build the document.
447    pub patch_kind: PatchKind,
448    /// Unix millis of the most recent delta application.
449    pub last_updated_ms: u64,
450    /// Unix millis of the first delta applied to this attempt.
451    pub first_applied_ms: u64,
452}
453
454impl SummaryDocument {
455    /// Build a summary snapshot. Used by backends parsing the Hash
456    /// fields; external consumers receive these from `read_summary`.
457    pub fn new(
458        document_json: Vec<u8>,
459        version: u64,
460        patch_kind: PatchKind,
461        last_updated_ms: u64,
462        first_applied_ms: u64,
463    ) -> Self {
464        Self {
465            document_json,
466            version,
467            patch_kind,
468            last_updated_ms,
469            first_applied_ms,
470        }
471    }
472}
473
474/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
475/// Today's FCALL takes the byte payload + frame_type + optional seq as
476/// discrete ARGV; Stage 0 collects them into a named type for trait
477/// signatures.
478///
479/// **Round-7 follow-up (PR #145 → #146):** extended with
480/// `frame_type: String` (the SDK-public free-form classifier — values
481/// like `"delta"`, `"log"`, `"agent_step"`, `"summary_token"`,
482/// `"transcribe_line"`, `"progress"` — distinct from the coarse
483/// [`FrameKind`] enum) and `correlation_id: Option<String>` (the
484/// wire-level `correlation_id` ARGV, surfaced at the SDK as
485/// `metadata: Option<&str>`). Adding these lets
486/// `ClaimedTask::append_frame` forward through the trait without
487/// wire-parity regression.
488///
489/// `frame_type` is free-form and is what the backend writes into the
490/// Lua-side `frame_type` ARGV. [`FrameKind`] remains for typed
491/// classification at the trait surface; when callers populate only
492/// `kind`, the backend falls back to a stable encoding of the enum
493/// variant (see `frame_kind_to_str` in `ff-backend-valkey`).
494#[derive(Clone, Debug, PartialEq, Eq)]
495#[non_exhaustive]
496pub struct Frame {
497    pub bytes: Vec<u8>,
498    pub kind: FrameKind,
499    /// Optional monotonic sequence. Set by the caller when the stream
500    /// protocol is sequence-bound; `None` lets the backend assign.
501    pub seq: Option<u64>,
502    /// Free-form classifier written to the Lua-side `frame_type` ARGV.
503    /// Empty string means "defer to [`FrameKind`]" — the backend
504    /// substitutes the enum-variant encoding.
505    pub frame_type: String,
506    /// Optional correlation id (wire `correlation_id` ARGV). `None`
507    /// encodes as the empty string on the wire.
508    pub correlation_id: Option<String>,
509    /// Durability mode for this frame (RFC-015 §1). Defaults to
510    /// [`StreamMode::Durable`] for v1-caller parity — the field was
511    /// added additively so pre-RFC-015 construction via
512    /// [`Self::new`] / [`Self::with_seq`] sees `Durable` without code
513    /// change.
514    pub mode: StreamMode,
515}
516
517impl Frame {
518    /// Construct a frame. `seq` defaults to `None` (backend-assigned);
519    /// `frame_type` defaults to empty (backend falls back to
520    /// `FrameKind` encoding); `correlation_id` defaults to `None`.
521    /// Callers that need an explicit sequence use [`Frame::with_seq`];
522    /// callers on the SDK forwarder path populate `frame_type` +
523    /// `correlation_id` via [`Frame::with_frame_type`] /
524    /// [`Frame::with_correlation_id`].
525    pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
526        Self {
527            bytes,
528            kind,
529            seq: None,
530            frame_type: String::new(),
531            correlation_id: None,
532            mode: StreamMode::Durable,
533        }
534    }
535
536    /// Construct a frame with an explicit monotonic sequence.
537    pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
538        Self {
539            bytes,
540            kind,
541            seq: Some(seq),
542            frame_type: String::new(),
543            correlation_id: None,
544            mode: StreamMode::Durable,
545        }
546    }
547
548    /// Builder-style setter for the RFC-015 durability [`StreamMode`].
549    /// Defaults to [`StreamMode::Durable`] (v1 parity) when unset.
550    pub fn with_mode(mut self, mode: StreamMode) -> Self {
551        self.mode = mode;
552        self
553    }
554
555    /// Builder-style setter for the free-form `frame_type` classifier.
556    pub fn with_frame_type(mut self, frame_type: impl Into<String>) -> Self {
557        self.frame_type = frame_type.into();
558        self
559    }
560
561    /// Builder-style setter for the optional `correlation_id`.
562    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
563        self.correlation_id = Some(correlation_id.into());
564        self
565    }
566}
567
568// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
569
570/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
571/// — signal name, correlation id, etc.).
572#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
573#[non_exhaustive]
574pub enum WaitpointKind {
575    /// Match by signal name.
576    SignalName,
577    /// Match by correlation id.
578    CorrelationId,
579    /// Generic external-signal waitpoint (external delivery).
580    External,
581}
582
583/// HMAC token that binds a waitpoint to its mint-time identity. Wire
584/// shape `kid:40hex`.
585///
586/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
587/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
588/// through automatically — no derived formatter can leak the raw
589/// digest when a [`WaitpointSpec`] is debug-printed via
590/// `tracing::debug!(spec=?spec)`.
591///
592/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
593/// naming the waitpoint-bound HMAC role explicitly.
594#[derive(Clone, PartialEq, Eq, Hash)]
595pub struct WaitpointHmac(WaitpointToken);
596
597impl WaitpointHmac {
598    pub fn new(token: impl Into<String>) -> Self {
599        Self(WaitpointToken::from(token.into()))
600    }
601
602    /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
603    /// redact — call sites that need the raw digest must explicitly
604    /// call [`WaitpointToken::as_str`].
605    pub fn token(&self) -> &WaitpointToken {
606        &self.0
607    }
608
609    /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
610    /// non-redacted call sites; this method exists only for transport
611    /// / FCALL ARGV construction where the raw wire bytes are required.
612    pub fn as_str(&self) -> &str {
613        self.0.as_str()
614    }
615}
616
617// Forward Debug / Display to the wrapped WaitpointToken so the
618// redaction guarantees on that type extend here. Derived Debug would
619// expose the raw string field and defeat the wrap.
620impl std::fmt::Debug for WaitpointHmac {
621    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
622        write!(f, "WaitpointHmac({:?})", self.0)
623    }
624}
625
626/// Handle returned by `create_waitpoint` — the id of the newly-minted
627/// pending waitpoint plus its HMAC token. Signals targeted at the
628/// waitpoint must present the token; a later `suspend` call transitions
629/// the waitpoint from `pending` to `active` (RFC-012 §R7.2.2).
630///
631/// `WaitpointHmac` redacts on `Debug`/`Display`, so deriving `Debug`
632/// here cannot leak the raw digest.
633#[derive(Clone, Debug, PartialEq, Eq)]
634#[non_exhaustive]
635pub struct PendingWaitpoint {
636    pub waitpoint_id: crate::types::WaitpointId,
637    pub hmac_token: WaitpointHmac,
638}
639
640impl PendingWaitpoint {
641    pub fn new(waitpoint_id: crate::types::WaitpointId, hmac_token: WaitpointHmac) -> Self {
642        Self {
643            waitpoint_id,
644            hmac_token,
645        }
646    }
647}
648
649/// One waitpoint inside a suspend request. `suspend` takes a
650/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
651/// the enclosing suspend args in the Phase-1 contract.
652#[derive(Clone, Debug, PartialEq, Eq)]
653#[non_exhaustive]
654pub struct WaitpointSpec {
655    pub kind: WaitpointKind,
656    pub matcher: Vec<u8>,
657    pub hmac_token: WaitpointHmac,
658}
659
660impl WaitpointSpec {
661    pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
662        Self {
663            kind,
664            matcher,
665            hmac_token,
666        }
667    }
668}
669
670// ── §3.3.0 Failure classification types ─────────────────────────────────
671
672/// Human-readable failure description + optional structured detail.
673/// Replaces today's ad-hoc string arg to `fail`.
674#[derive(Clone, Debug, PartialEq, Eq)]
675#[non_exhaustive]
676pub struct FailureReason {
677    pub message: String,
678    pub detail: Option<Vec<u8>>,
679}
680
681impl FailureReason {
682    pub fn new(message: impl Into<String>) -> Self {
683        Self {
684            message: message.into(),
685            detail: None,
686        }
687    }
688
689    pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
690        Self {
691            message: message.into(),
692            detail: Some(detail),
693        }
694    }
695}
696
697/// Failure classification — determines retry disposition on the Lua
698/// side. Mirrors the Lua-side classification codes.
699#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
700#[non_exhaustive]
701pub enum FailureClass {
702    /// Retryable transient error.
703    Transient,
704    /// Permanent error — no retries.
705    Permanent,
706    /// Crash / process death inferred from lease expiry.
707    InfraCrash,
708    /// Timeout at the attempt or operation level.
709    Timeout,
710    /// Cooperative cancellation by operator or cancel_flow.
711    Cancelled,
712}
713
714// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
715
716/// Usage report for `report_usage`. Mirrors today's
717/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
718/// dimensions.
719#[derive(Clone, Debug, PartialEq, Eq, Default)]
720#[non_exhaustive]
721pub struct UsageDimensions {
722    /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
723    pub input_tokens: u64,
724    /// Output tokens produced (LLM-shaped usage).
725    pub output_tokens: u64,
726    /// Wall-clock duration, in milliseconds, attributable to this
727    /// report. `None` for pure token-count reports.
728    pub wall_ms: Option<u64>,
729    /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
730    /// iteration order (important for dedup-key derivation on some
731    /// budget schemes).
732    pub custom: BTreeMap<String, u64>,
733    /// Optional caller-supplied idempotency key. When set, the backend
734    /// rejects a repeat application of the same key with
735    /// `ReportUsageResult::AlreadyApplied` rather than double-counting
736    /// (RFC-012 §R7.4; Lua `ff_report_usage_and_check` threads this as
737    /// the trailing ARGV). `None` / empty string disables dedup.
738    pub dedup_key: Option<String>,
739}
740
741impl UsageDimensions {
742    /// Create an empty usage report (all dimensions zero / `None`).
743    ///
744    /// Provided for external-crate consumers: `UsageDimensions` is
745    /// `#[non_exhaustive]`, so struct-literal and functional-update
746    /// construction are unavailable across crate boundaries. Start
747    /// from `new()` and chain `with_*` setters to build a report.
748    pub fn new() -> Self {
749        Self::default()
750    }
751
752    /// Set the input-token count dimension. Consumes and returns
753    /// `self` for chaining.
754    pub fn with_input_tokens(mut self, tokens: u64) -> Self {
755        self.input_tokens = tokens;
756        self
757    }
758
759    /// Set the output-token count dimension. Consumes and returns
760    /// `self` for chaining.
761    pub fn with_output_tokens(mut self, tokens: u64) -> Self {
762        self.output_tokens = tokens;
763        self
764    }
765
766    /// Set the wall-clock duration dimension, in milliseconds.
767    /// Consumes and returns `self` for chaining.
768    pub fn with_wall_ms(mut self, ms: u64) -> Self {
769        self.wall_ms = Some(ms);
770        self
771    }
772
773    /// Set the optional caller-supplied idempotency key. When set,
774    /// the backend rejects a repeat application of the same key with
775    /// `ReportUsageResult::AlreadyApplied` rather than double-counting
776    /// (RFC-012 §R7.4). Consumes and returns `self` for chaining.
777    pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
778        self.dedup_key = Some(key.into());
779        self
780    }
781}
782
783// ── §3.3.0 Reclaim / lease types ────────────────────────────────────────
784
785/// Opaque cookie returned by the reclaim scanner; consumed by
786/// `claim_from_reclaim` to mint a resumed Handle.
787///
788/// Wraps [`ReclaimGrant`] today (the scanner's existing product).
789/// Kept as a newtype so trait signatures name the reclaim-bound role
790/// explicitly and so the wrapped shape can evolve without breaking the
791/// trait.
792#[derive(Clone, Debug, PartialEq, Eq)]
793#[non_exhaustive]
794pub struct ReclaimToken {
795    pub grant: ReclaimGrant,
796}
797
798impl ReclaimToken {
799    pub fn new(grant: ReclaimGrant) -> Self {
800        Self { grant }
801    }
802}
803
804/// Result of a successful `renew` call.
805#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
806#[non_exhaustive]
807pub struct LeaseRenewal {
808    /// New lease expiry (monotonic ms).
809    pub expires_at_ms: u64,
810    /// Lease epoch after renewal. Monotonic non-decreasing.
811    pub lease_epoch: u64,
812}
813
814impl LeaseRenewal {
815    pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
816        Self {
817            expires_at_ms,
818            lease_epoch,
819        }
820    }
821}
822
823// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
824
825/// Cancel-flow policy — what to do with the flow's members. Today
826/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
827/// policy shape as a typed enum (RFC-012 §3.3.0).
828#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
829#[non_exhaustive]
830pub enum CancelFlowPolicy {
831    /// Cancel only the flow record; leave members alone.
832    FlowOnly,
833    /// Cancel the flow and every non-terminal member execution.
834    CancelAll,
835    /// Cancel the flow and every member currently in `Pending` /
836    /// `Blocked` / `Eligible` — leave `Running` executions alone to
837    /// drain.
838    CancelPending,
839}
840
841/// Caller wait posture for `cancel_flow`.
842#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
843#[non_exhaustive]
844pub enum CancelFlowWait {
845    /// Return after the flow-state flip; member cancellations dispatch
846    /// asynchronously.
847    NoWait,
848    /// Block until member cancellations complete, up to `timeout`.
849    WaitTimeout(Duration),
850    /// Block until member cancellations complete, no deadline.
851    WaitIndefinite,
852}
853
854// ── §3.3.0 Completion stream payload ────────────────────────────────────
855
856/// One completion event delivered through the `CompletionStream`
857/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
858/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
859///
860/// `flow_id` was added in issue #90 so DAG-dependency routing
861/// (`dispatch_dependency_resolution`) has the partition-routable flow
862/// handle without reparsing the Lua-emitted JSON downstream.
863/// `#[non_exhaustive]` keeps future field additions additive; use
864/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
865/// for construction.
866///
867/// `payload_bytes` / `produced_at_ms` are authorised but not yet
868/// populated by the Valkey Lua emitters — consumers on the
869/// `CompletionStream` read `execution_id` + `flow_id` today and must
870/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
871#[derive(Clone, Debug, PartialEq, Eq)]
872#[non_exhaustive]
873pub struct CompletionPayload {
874    pub execution_id: crate::types::ExecutionId,
875    pub outcome: String,
876    pub payload_bytes: Option<Vec<u8>>,
877    pub produced_at_ms: TimestampMs,
878    /// Flow handle for partition routing. Added in issue #90 (#90);
879    /// `None` for emitters that don't yet surface it.
880    pub flow_id: Option<crate::types::FlowId>,
881}
882
883impl CompletionPayload {
884    pub fn new(
885        execution_id: crate::types::ExecutionId,
886        outcome: impl Into<String>,
887        payload_bytes: Option<Vec<u8>>,
888        produced_at_ms: TimestampMs,
889    ) -> Self {
890        Self {
891            execution_id,
892            outcome: outcome.into(),
893            payload_bytes,
894            produced_at_ms,
895            flow_id: None,
896        }
897    }
898
899    /// Attach a flow handle to the payload. Additive builder so adding
900    /// `flow_id` didn't require a breaking change to [`Self::new`].
901    #[must_use]
902    pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
903        self.flow_id = Some(flow_id);
904        self
905    }
906}
907
908// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
909
910/// Signal that satisfied a waitpoint matcher and is therefore part of
911/// the reason an execution resumed. Returned by `observe_signals`
912/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
913///
914/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
915/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
916///
917/// Returned only for signals whose matcher slot in the waitpoint's
918/// resume condition is marked satisfied. Pre-buffered-but-unmatched
919/// signals are not included.
920///
921/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
922/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
923/// the Stage 0 crate move.
924#[derive(Clone, Debug, PartialEq, Eq)]
925pub struct ResumeSignal {
926    pub signal_id: crate::types::SignalId,
927    pub signal_name: String,
928    pub signal_category: String,
929    pub source_type: String,
930    pub source_identity: String,
931    pub correlation_id: String,
932    /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
933    /// accepted this signal. `0` if the stored `accepted_at` field is
934    /// missing or non-numeric (a Lua-side defect — not expected at
935    /// runtime).
936    pub accepted_at: TimestampMs,
937    /// Raw payload bytes, if the signal was delivered with one. `None`
938    /// for signals delivered without a payload.
939    pub payload: Option<Vec<u8>>,
940}
941
942// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
943
944/// Outcome of a `fail()` call.
945///
946/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
947/// `ff_core::backend::FailOutcome` so it is nameable by the
948/// `EngineBackend` trait signature. `ff_sdk::task` retains a
949/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
950///
951/// Not `#[non_exhaustive]` because existing consumers (ff-test,
952/// ff-readiness-tests) construct and match this enum exhaustively;
953/// the shape has been stable since the `fail()` API landed and any
954/// additive growth would be a follow-up RFC's deliberate break.
955#[derive(Clone, Debug, PartialEq, Eq)]
956pub enum FailOutcome {
957    /// Retry was scheduled — execution is in delayed backoff.
958    RetryScheduled {
959        delay_until: crate::types::TimestampMs,
960    },
961    /// No retries left — execution is terminal failed.
962    TerminalFailed,
963}
964
965// ── RFC-012 §R7: AppendFrameOutcome move ────────────────────────────────
966
967/// Outcome of an `append_frame()` call.
968///
969/// **RFC-012 §R7.2.1:** moved from `ff_sdk::task::AppendFrameOutcome`
970/// to `ff_core::backend::AppendFrameOutcome` so it is nameable by the
971/// `EngineBackend::append_frame` trait return. `ff_sdk::task` retains
972/// a `pub use` shim preserving the `ff_sdk::task::AppendFrameOutcome`
973/// path through 0.4.x.
974///
975/// Derive set matches the `FailOutcome` precedent
976/// (`Clone, Debug, PartialEq, Eq`). Not `#[non_exhaustive]`:
977/// construction is internal to the backend today (parser in
978/// `ff-backend-valkey`), and no external constructors are anticipated
979/// (consumer-shape evidence per §R7.2.1 / MN3).
980///
981/// `stream_id: String` is a stable shape commitment — a future typed
982/// `StreamId` newtype would be its own breaking change (§R7.5.6 / MD2).
983///
984/// RFC-015 §9 made this type `#[non_exhaustive]` so the new
985/// `summary_version: Option<u64>` field (populated only for
986/// [`StreamMode::DurableSummary`] appends) can land additively and
987/// future per-mode outcome fields can follow. Construct via
988/// [`Self::new`] + the chainable setters; cross-crate consumers cannot
989/// use struct-literal construction.
990#[derive(Clone, Debug, PartialEq, Eq, Default)]
991#[non_exhaustive]
992pub struct AppendFrameOutcome {
993    /// Valkey Stream entry ID assigned to this frame (e.g. `1234567890-0`).
994    pub stream_id: String,
995    /// Total frame count in the stream after this append.
996    pub frame_count: u64,
997    /// Rolling summary `version` after the delta applied, populated
998    /// only for [`StreamMode::DurableSummary`] appends (RFC-015 §3.3
999    /// step 6). `None` for [`StreamMode::Durable`] /
1000    /// [`StreamMode::BestEffortLive`] appends — callers that need
1001    /// "total deltas applied" read this field.
1002    pub summary_version: Option<u64>,
1003}
1004
1005impl AppendFrameOutcome {
1006    /// Build an outcome with the mandatory `stream_id` / `frame_count`
1007    /// fields. `summary_version` defaults to `None` — call
1008    /// [`Self::with_summary_version`] for [`StreamMode::DurableSummary`]
1009    /// appends.
1010    pub fn new(stream_id: impl Into<String>, frame_count: u64) -> Self {
1011        Self {
1012            stream_id: stream_id.into(),
1013            frame_count,
1014            summary_version: None,
1015        }
1016    }
1017
1018    /// Attach a rolling summary version (RFC-015 §9).
1019    #[must_use]
1020    pub fn with_summary_version(mut self, version: u64) -> Self {
1021        self.summary_version = Some(version);
1022        self
1023    }
1024}
1025
1026// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
1027
1028/// Pool timing shared across backend connections.
1029///
1030/// Fields are `#[non_exhaustive]` per project convention — connection
1031/// tunables grow as new backends land. Default is the Phase-1 Valkey
1032/// client's out-of-box shape (no explicit timeout, no explicit pool
1033/// cap; inherits ferriskey defaults).
1034#[derive(Clone, Debug, Default, PartialEq, Eq)]
1035#[non_exhaustive]
1036pub struct BackendTimeouts {
1037    /// Per-request timeout. `None` ⇒ backend default.
1038    pub request: Option<Duration>,
1039}
1040
1041/// Retry policy shared across backend connections.
1042///
1043/// Matches ferriskey's `ConnectionRetryStrategy` shape (see
1044/// `ferriskey/src/client/types.rs:151`) so Stage 1c's Valkey wiring is a
1045/// direct pass-through — we don't reimplement what ferriskey already
1046/// provides. The Postgres backend (future) interprets the same fields
1047/// under its own retry semantics, or maps `None` to its own defaults.
1048///
1049/// Each field is `Option<u32>`: `None` ⇒ backend default (for Valkey,
1050/// this means `ConnectionRetryStrategy::default()`); `Some(v)` ⇒ pass
1051/// `v` straight through.
1052#[derive(Clone, Debug, Default, PartialEq, Eq)]
1053#[non_exhaustive]
1054pub struct BackendRetry {
1055    /// Exponent base for the backoff curve. `None` ⇒ backend default.
1056    pub exponent_base: Option<u32>,
1057    /// Multiplicative factor applied to each backoff step. `None` ⇒
1058    /// backend default.
1059    pub factor: Option<u32>,
1060    /// Maximum number of retry attempts on transient transport errors.
1061    /// `None` ⇒ backend default.
1062    pub number_of_retries: Option<u32>,
1063    /// Jitter as a percentage of the computed backoff. `None` ⇒ backend
1064    /// default.
1065    pub jitter_percent: Option<u32>,
1066}
1067
1068/// Valkey-specific connection parameters.
1069#[derive(Clone, Debug, PartialEq, Eq)]
1070#[non_exhaustive]
1071pub struct ValkeyConnection {
1072    /// Valkey hostname.
1073    pub host: String,
1074    /// Valkey port.
1075    pub port: u16,
1076    /// Enable TLS for the Valkey connection.
1077    pub tls: bool,
1078    /// Enable Valkey cluster mode.
1079    pub cluster: bool,
1080}
1081
1082impl ValkeyConnection {
1083    pub fn new(host: impl Into<String>, port: u16) -> Self {
1084        Self {
1085            host: host.into(),
1086            port,
1087            tls: false,
1088            cluster: false,
1089        }
1090    }
1091}
1092
1093/// Discriminated union over per-backend connection shapes. Stage 1a
1094/// ships the Valkey arm; future backends (Postgres) land additively
1095/// under the `#[non_exhaustive]` guard.
1096#[derive(Clone, Debug, PartialEq, Eq)]
1097#[non_exhaustive]
1098pub enum BackendConnection {
1099    Valkey(ValkeyConnection),
1100}
1101
1102/// Configuration passed to `ValkeyBackend::connect` (and, later, to
1103/// other backend `connect` constructors). Carries the connection
1104/// details + shared timing/retry policy. Replaces the Valkey-specific
1105/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
1106///
1107/// `BackendConfig` is the replacement target for `WorkerConfig`'s
1108/// `host` / `port` / `tls` / `cluster` fields. The full migration
1109/// lands across Stage 1a (type introduction) and Stage 1c
1110/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
1111/// claim poll interval, capability set) stay on `WorkerConfig`.
1112#[derive(Clone, Debug, PartialEq, Eq)]
1113#[non_exhaustive]
1114pub struct BackendConfig {
1115    pub connection: BackendConnection,
1116    pub timeouts: BackendTimeouts,
1117    pub retry: BackendRetry,
1118}
1119
1120impl BackendConfig {
1121    /// Build a Valkey BackendConfig from host+port. Other fields take
1122    /// backend defaults.
1123    pub fn valkey(host: impl Into<String>, port: u16) -> Self {
1124        Self {
1125            connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
1126            timeouts: BackendTimeouts::default(),
1127            retry: BackendRetry::default(),
1128        }
1129    }
1130}
1131
1132// ── Issue #122: ScannerFilter ───────────────────────────────────────────
1133
1134/// Per-consumer filter applied by FlowFabric's background scanners and
1135/// completion subscribers so multiple FlowFabric instances sharing a
1136/// single Valkey keyspace can operate on disjoint subsets of
1137/// executions without mutual interference (issue #122).
1138///
1139/// Sibling of [`CompletionPayload`]: the former is a scan *output*
1140/// shape, this is the *input* predicate scanners and completion
1141/// subscribers consult per candidate.
1142///
1143/// # Fields & backing storage
1144///
1145/// * `namespace` — matches against the `namespace` field on
1146///   `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
1147///   one HGET per candidate when set.
1148/// * `instance_tag` — matches against an entry in the execution's
1149///   user-supplied tags hash at the canonical key
1150///   **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
1151///   hash-tag, e.g. `{fp:42}`). Written by the Lua function
1152///   `ff_create_execution` (see `lua/execution.lua`) and
1153///   `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
1154///   the HGET targets `tag_key` on the tags hash and compares the
1155///   returned string (if any) byte-for-byte against `tag_value`.
1156///   Cost: one HGET per candidate when set.
1157///
1158/// When both are set, scanners check `namespace` first (short-circuit
1159/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
1160/// candidate.
1161///
1162/// # Semantics
1163///
1164/// * [`Self::is_noop`] returns true when both fields are `None` —
1165///   the filter accepts every candidate. Used by the
1166///   `subscribe_completions_filtered` default body to fall back to
1167///   the unfiltered subscription.
1168/// * [`Self::matches`] is the tight in-memory predicate once the
1169///   HGET values have been fetched. Scanners fetch the fields
1170///   lazily (namespace first) and pass the results in; the helper
1171///   returns false as soon as one component mismatches.
1172///
1173/// # Scope
1174///
1175/// Today the filter is consulted by execution-shaped scanners
1176/// (lease_expiry, attempt_timeout, execution_deadline,
1177/// suspension_timeout, pending_wp_expiry, delayed_promoter,
1178/// dependency_reconciler, cancel_reconciler, unblock,
1179/// index_reconciler, retention_trimmer) and by completion
1180/// subscribers. Non-execution scanners (budget_reconciler,
1181/// budget_reset, quota_reconciler, flow_projector) accept a filter
1182/// for API uniformity but do not apply it — their iteration domains
1183/// (budgets, quotas, flows) are not keyed by the
1184/// per-execution namespace / instance_tag shape.
1185///
1186/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
1187/// `worker_instance`) can land additively.
1188#[derive(Clone, Debug, Default, PartialEq, Eq)]
1189#[non_exhaustive]
1190pub struct ScannerFilter {
1191    /// Tenant / workspace scope. Matches against the `namespace`
1192    /// field on `exec_core`.
1193    pub namespace: Option<Namespace>,
1194    /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
1195    /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
1196    /// written by `ff_create_execution`).
1197    pub instance_tag: Option<(String, String)>,
1198}
1199
1200impl ScannerFilter {
1201    /// Shared no-op filter — useful as the default for the
1202    /// `Scanner::filter()` trait method so implementors that don't
1203    /// override can hand back a `&'static` reference without
1204    /// allocating per call.
1205    pub const NOOP: Self = Self {
1206        namespace: None,
1207        instance_tag: None,
1208    };
1209
1210    /// Create an empty filter (equivalent to [`Self::NOOP`]).
1211    ///
1212    /// Provided for external-crate consumers: `ScannerFilter` is
1213    /// `#[non_exhaustive]`, so struct-literal and functional-update
1214    /// construction are unavailable across crate boundaries. Start
1215    /// from `new()` and chain `with_*` setters to build a filter.
1216    pub fn new() -> Self {
1217        Self::default()
1218    }
1219
1220    /// Set the tenant-scope namespace filter dimension. Consumes
1221    /// and returns `self` for chaining.
1222    ///
1223    /// Accepts anything that converts into [`Namespace`] so callers
1224    /// can pass `&str` / `String` directly without the
1225    /// `Namespace::new(...)` ceremony (the conversion is infallible;
1226    /// [`Namespace`] has `From<&str>` and `From<String>` via the
1227    /// crate's `string_id!` macro).
1228    pub fn with_namespace(mut self, ns: impl Into<Namespace>) -> Self {
1229        self.namespace = Some(ns.into());
1230        self
1231    }
1232
1233    /// Set the exact-match exec-tag filter dimension. Consumes and
1234    /// returns `self` for chaining. At filter time, scanners read
1235    /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
1236    /// `key` byte-for-byte against `value`.
1237    pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1238        self.instance_tag = Some((key.into(), value.into()));
1239        self
1240    }
1241
1242    /// True iff the filter has no dimensions set — every candidate
1243    /// passes. Callers use this to short-circuit filtered subscribe
1244    /// paths back to the unfiltered ones.
1245    pub fn is_noop(&self) -> bool {
1246        self.namespace.is_none() && self.instance_tag.is_none()
1247    }
1248
1249    /// Post-HGET in-memory match check.
1250    ///
1251    /// `core_namespace` should be the `namespace` field read from
1252    /// `exec_core` (None if the HGET returned nil / was skipped).
1253    /// `tag_value` should be the HGET result for the configured
1254    /// `instance_tag.0` key on the execution's tags hash (None if
1255    /// the HGET returned nil / was skipped).
1256    ///
1257    /// When a filter dimension is `None` the corresponding argument
1258    /// is ignored — callers may pass `None` to skip the HGET and
1259    /// save a round-trip.
1260    pub fn matches(
1261        &self,
1262        core_namespace: Option<&Namespace>,
1263        tag_value: Option<&str>,
1264    ) -> bool {
1265        if let Some(ref want) = self.namespace {
1266            match core_namespace {
1267                Some(have) if have == want => {}
1268                _ => return false,
1269            }
1270        }
1271        if let Some((_, ref want_value)) = self.instance_tag {
1272            match tag_value {
1273                Some(have) if have == want_value.as_str() => {}
1274                _ => return false,
1275            }
1276        }
1277        true
1278    }
1279}
1280
1281// ── Tests ───────────────────────────────────────────────────────────────
1282
1283#[cfg(test)]
1284mod tests {
1285    use super::*;
1286    use crate::partition::{Partition, PartitionFamily};
1287    use crate::types::{ExecutionId, LaneId, SignalId};
1288
1289    #[test]
1290    fn backend_tag_derives() {
1291        let a = BackendTag::Valkey;
1292        let b = a;
1293        assert_eq!(a, b);
1294        assert_eq!(format!("{a:?}"), "Valkey");
1295    }
1296
1297    #[test]
1298    fn handle_kind_derives() {
1299        for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
1300            let c = k;
1301            assert_eq!(k, c);
1302            // Debug formatter reachable
1303            let _ = format!("{k:?}");
1304        }
1305    }
1306
1307    #[test]
1308    fn handle_opaque_roundtrips() {
1309        let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
1310        let o = HandleOpaque::new(bytes.clone());
1311        assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
1312        assert_eq!(o, o.clone());
1313        let _ = format!("{o:?}");
1314    }
1315
1316    #[test]
1317    fn handle_composes() {
1318        let h = Handle::new(
1319            BackendTag::Valkey,
1320            HandleKind::Fresh,
1321            HandleOpaque::new(Box::new([0u8; 4])),
1322        );
1323        assert_eq!(h.backend, BackendTag::Valkey);
1324        assert_eq!(h.kind, HandleKind::Fresh);
1325        assert_eq!(h.clone(), h);
1326    }
1327
1328    #[test]
1329    fn capability_set_derives() {
1330        let c1 = CapabilitySet::new(["gpu", "cuda"]);
1331        let c2 = CapabilitySet::new(["gpu", "cuda"]);
1332        assert_eq!(c1, c2);
1333        assert!(!c1.is_empty());
1334        assert!(CapabilitySet::default().is_empty());
1335        let _ = format!("{c1:?}");
1336    }
1337
1338    #[test]
1339    fn claim_policy_derives() {
1340        let p = ClaimPolicy::with_max_wait(Duration::from_millis(500));
1341        assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
1342        assert_eq!(p.clone(), p);
1343        assert_eq!(ClaimPolicy::immediate(), ClaimPolicy::default());
1344    }
1345
1346    #[test]
1347    fn frame_and_kind_derive() {
1348        let f = Frame {
1349            bytes: b"hello".to_vec(),
1350            kind: FrameKind::Stdout,
1351            seq: Some(3),
1352            frame_type: "delta".to_owned(),
1353            correlation_id: Some("req-42".to_owned()),
1354            mode: StreamMode::Durable,
1355        };
1356        assert_eq!(f.clone(), f);
1357        assert_eq!(f.kind, FrameKind::Stdout);
1358        assert_eq!(f.frame_type, "delta");
1359        assert_eq!(f.correlation_id.as_deref(), Some("req-42"));
1360        assert_ne!(FrameKind::Stderr, FrameKind::Event);
1361        let _ = format!("{f:?}");
1362    }
1363
1364    #[test]
1365    fn frame_builders_populate_extended_fields() {
1366        let f = Frame::new(b"payload".to_vec(), FrameKind::Event)
1367            .with_frame_type("agent_step")
1368            .with_correlation_id("corr-1");
1369        assert_eq!(f.frame_type, "agent_step");
1370        assert_eq!(f.correlation_id.as_deref(), Some("corr-1"));
1371        assert_eq!(f.seq, None);
1372
1373        let bare = Frame::new(b"p".to_vec(), FrameKind::Event);
1374        assert_eq!(bare.frame_type, "");
1375        assert_eq!(bare.correlation_id, None);
1376    }
1377
1378    #[test]
1379    fn waitpoint_spec_derives() {
1380        let spec = WaitpointSpec {
1381            kind: WaitpointKind::SignalName,
1382            matcher: b"approved".to_vec(),
1383            hmac_token: WaitpointHmac::new("kid1:deadbeef"),
1384        };
1385        assert_eq!(spec.clone(), spec);
1386        assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
1387        assert_eq!(
1388            WaitpointHmac::new("a"),
1389            WaitpointHmac::new(String::from("a"))
1390        );
1391    }
1392
1393    #[test]
1394    fn failure_reason_and_class() {
1395        let r1 = FailureReason::new("boom");
1396        let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
1397        assert_eq!(r1.message, "boom");
1398        assert!(r1.detail.is_none());
1399        assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
1400        assert_eq!(r1.clone(), r1);
1401        assert_ne!(FailureClass::Transient, FailureClass::Permanent);
1402    }
1403
1404    #[test]
1405    fn usage_dimensions_default_and_eq() {
1406        let u = UsageDimensions {
1407            input_tokens: 10,
1408            output_tokens: 20,
1409            wall_ms: Some(150),
1410            custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
1411            dedup_key: Some("k1".into()),
1412        };
1413        assert_eq!(u.clone(), u);
1414        assert_eq!(UsageDimensions::default().input_tokens, 0);
1415        assert_eq!(UsageDimensions::default().dedup_key, None);
1416    }
1417
1418    #[test]
1419    fn usage_dimensions_builder_chain() {
1420        let u = UsageDimensions::new()
1421            .with_input_tokens(10)
1422            .with_output_tokens(20)
1423            .with_wall_ms(150)
1424            .with_dedup_key("k1");
1425        assert_eq!(u.input_tokens, 10);
1426        assert_eq!(u.output_tokens, 20);
1427        assert_eq!(u.wall_ms, Some(150));
1428        assert_eq!(u.dedup_key.as_deref(), Some("k1"));
1429        assert!(u.custom.is_empty());
1430        // `new()` is equivalent to `default()`.
1431        assert_eq!(UsageDimensions::new(), UsageDimensions::default());
1432    }
1433
1434    #[test]
1435    fn reclaim_token_wraps_grant() {
1436        let grant = ReclaimGrant {
1437            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
1438            partition_key: crate::partition::PartitionKey::from(&Partition {
1439                family: PartitionFamily::Flow,
1440                index: 0,
1441            }),
1442            grant_key: "gkey".into(),
1443            expires_at_ms: 123,
1444            lane_id: LaneId::new("default"),
1445        };
1446        let t = ReclaimToken::new(grant.clone());
1447        assert_eq!(t.grant, grant);
1448        assert_eq!(t.clone(), t);
1449    }
1450
1451    #[test]
1452    fn lease_renewal_is_copy() {
1453        let r = LeaseRenewal {
1454            expires_at_ms: 100,
1455            lease_epoch: 2,
1456        };
1457        let s = r; // Copy
1458        assert_eq!(r, s);
1459    }
1460
1461    #[test]
1462    fn cancel_flow_policy_and_wait() {
1463        assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
1464        let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
1465        assert_eq!(w, w);
1466        assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
1467    }
1468
1469    #[test]
1470    fn completion_payload_derives() {
1471        let c = CompletionPayload {
1472            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
1473            outcome: "success".into(),
1474            payload_bytes: Some(b"ok".to_vec()),
1475            produced_at_ms: TimestampMs::from_millis(1234),
1476            flow_id: None,
1477        };
1478        assert_eq!(c.clone(), c);
1479        let _ = format!("{c:?}");
1480    }
1481
1482    #[test]
1483    fn resume_signal_moved_and_derives() {
1484        let s = ResumeSignal {
1485            signal_id: SignalId::new(),
1486            signal_name: "approve".into(),
1487            signal_category: "decision".into(),
1488            source_type: "user".into(),
1489            source_identity: "u1".into(),
1490            correlation_id: "c1".into(),
1491            accepted_at: TimestampMs::from_millis(10),
1492            payload: None,
1493        };
1494        assert_eq!(s.clone(), s);
1495        let _ = format!("{s:?}");
1496    }
1497
1498    #[test]
1499    fn fail_outcome_variants() {
1500        let retry = FailOutcome::RetryScheduled {
1501            delay_until: TimestampMs::from_millis(42),
1502        };
1503        let terminal = FailOutcome::TerminalFailed;
1504        assert_ne!(retry, terminal);
1505        assert_eq!(retry.clone(), retry);
1506    }
1507
1508    #[test]
1509    fn scanner_filter_noop_and_default() {
1510        let f = ScannerFilter::default();
1511        assert!(f.is_noop());
1512        assert_eq!(f, ScannerFilter::NOOP);
1513        // A no-op filter matches any candidate, including ones that
1514        // produced no HGET results.
1515        assert!(f.matches(None, None));
1516        assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1517    }
1518
1519    #[test]
1520    fn scanner_filter_namespace_match() {
1521        let f = ScannerFilter {
1522            namespace: Some(Namespace::new("tenant-a")),
1523            instance_tag: None,
1524        };
1525        assert!(!f.is_noop());
1526        assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1527        assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1528        // Missing core namespace ⇒ no match.
1529        assert!(!f.matches(None, None));
1530    }
1531
1532    #[test]
1533    fn scanner_filter_instance_tag_match() {
1534        let f = ScannerFilter {
1535            namespace: None,
1536            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1537        };
1538        assert!(f.matches(None, Some("i-1")));
1539        assert!(!f.matches(None, Some("i-2")));
1540        assert!(!f.matches(None, None));
1541    }
1542
1543    #[test]
1544    fn scanner_filter_both_dimensions() {
1545        let f = ScannerFilter {
1546            namespace: Some(Namespace::new("tenant-a")),
1547            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1548        };
1549        assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1550        assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1551        assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1552        assert!(!f.matches(None, Some("i-1")));
1553    }
1554
1555    #[test]
1556    fn scanner_filter_builder_construction() {
1557        // Simulates external-crate usage: only public constructors
1558        // and chainable setters (no struct-literal access to the
1559        // `#[non_exhaustive]` fields).
1560        let empty = ScannerFilter::new();
1561        assert!(empty.is_noop());
1562        assert_eq!(empty, ScannerFilter::NOOP);
1563
1564        let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1565        assert!(!ns_only.is_noop());
1566        assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1567
1568        let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1569        assert!(!tag_only.is_noop());
1570        assert!(tag_only.matches(None, Some("i-1")));
1571
1572        let both = ScannerFilter::new()
1573            .with_namespace(Namespace::new("tenant-a"))
1574            .with_instance_tag("cairn.instance_id", "i-1");
1575        assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1576        assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1577    }
1578
1579    // ── RFC-015 Stream-durability-mode types ──
1580
1581    #[test]
1582    fn stream_mode_constructors_and_wire_str() {
1583        assert_eq!(StreamMode::durable().wire_str(), "durable");
1584        assert_eq!(StreamMode::durable(), StreamMode::Durable);
1585
1586        let s = StreamMode::durable_summary();
1587        assert_eq!(s.wire_str(), "summary");
1588        match s {
1589            StreamMode::DurableSummary { patch_kind } => {
1590                assert_eq!(patch_kind, PatchKind::JsonMergePatch);
1591            }
1592            _ => panic!("expected DurableSummary"),
1593        }
1594
1595        let b = StreamMode::best_effort_live(15_000);
1596        assert_eq!(b.wire_str(), "best_effort");
1597        match b {
1598            StreamMode::BestEffortLive { config } => {
1599                assert_eq!(config.ttl_ms, 15_000);
1600                assert_eq!(config.maxlen_floor, 64);
1601                assert_eq!(config.maxlen_ceiling, 16_384);
1602                assert!((config.ema_alpha - 0.2).abs() < 1e-9);
1603            }
1604            _ => panic!("expected BestEffortLive"),
1605        }
1606
1607        let cfg = BestEffortLiveConfig::with_ttl(10_000)
1608            .with_maxlen_floor(128)
1609            .with_maxlen_ceiling(8_192)
1610            .with_ema_alpha(0.3);
1611        let b2 = StreamMode::best_effort_live_with_config(cfg);
1612        match b2 {
1613            StreamMode::BestEffortLive { config } => {
1614                assert_eq!(config.ttl_ms, 10_000);
1615                assert_eq!(config.maxlen_floor, 128);
1616                assert_eq!(config.maxlen_ceiling, 8_192);
1617                assert!((config.ema_alpha - 0.3).abs() < 1e-9);
1618            }
1619            _ => panic!("expected BestEffortLive"),
1620        }
1621
1622        assert_eq!(StreamMode::default(), StreamMode::Durable);
1623    }
1624
1625    #[test]
1626    fn tail_visibility_default_and_wire() {
1627        assert_eq!(TailVisibility::default(), TailVisibility::All);
1628        assert_eq!(TailVisibility::All.wire_str(), "");
1629        assert_eq!(
1630            TailVisibility::ExcludeBestEffort.wire_str(),
1631            "exclude_best_effort"
1632        );
1633    }
1634
1635    #[test]
1636    fn append_frame_outcome_summary_version_builder() {
1637        let base = AppendFrameOutcome::new("1713-0", 3);
1638        assert_eq!(base.stream_id, "1713-0");
1639        assert_eq!(base.frame_count, 3);
1640        assert_eq!(base.summary_version, None);
1641
1642        let with_v = AppendFrameOutcome::new("1713-0", 3).with_summary_version(7);
1643        assert_eq!(with_v.summary_version, Some(7));
1644        assert_eq!(with_v.clone(), with_v);
1645    }
1646
1647    /// RFC-015 §3.2 null-sentinel round-trip invariant.
1648    ///
1649    /// The sentinel (`"__ff_null__"`) is the byte-exact string callers
1650    /// use to encode "set this leaf to JSON null" in a JSON Merge Patch
1651    /// frame (because RFC 7396 otherwise treats `null` as delete-key).
1652    /// The invariant: on read, the summary document NEVER contains the
1653    /// sentinel string — it is always rewritten to JSON null.
1654    ///
1655    /// The full end-to-end invariant is exercised against the Lua
1656    /// applier in `crates/ff-test/tests/engine_backend_stream_modes.rs`
1657    /// (integration). Here we just pin the byte sequence + the
1658    /// type-level constant so any accidental edit of the sentinel
1659    /// string fails the unit build before it reaches an integration
1660    /// run.
1661    #[test]
1662    fn summary_null_sentinel_byte_exact() {
1663        assert_eq!(SUMMARY_NULL_SENTINEL, "__ff_null__");
1664        assert_eq!(SUMMARY_NULL_SENTINEL.len(), 11);
1665        assert!(SUMMARY_NULL_SENTINEL.bytes().all(|b| b.is_ascii()));
1666        assert_eq!(SUMMARY_NULL_SENTINEL.trim(), SUMMARY_NULL_SENTINEL);
1667    }
1668
1669    #[test]
1670    fn summary_document_constructor() {
1671        let doc = SummaryDocument::new(
1672            br#"{"tokens":3}"#.to_vec(),
1673            2,
1674            PatchKind::JsonMergePatch,
1675            1_700_000_100,
1676            1_700_000_000,
1677        );
1678        assert_eq!(doc.version, 2);
1679        assert_eq!(doc.patch_kind, PatchKind::JsonMergePatch);
1680        assert_eq!(doc.first_applied_ms, 1_700_000_000);
1681        assert_eq!(doc.clone(), doc);
1682    }
1683
1684    #[test]
1685    fn frame_builder_sets_mode() {
1686        let f = Frame::new(b"p".to_vec(), FrameKind::Event);
1687        assert_eq!(f.mode, StreamMode::Durable);
1688
1689        let f = Frame::new(b"p".to_vec(), FrameKind::Event)
1690            .with_mode(StreamMode::durable_summary());
1691        match f.mode {
1692            StreamMode::DurableSummary { patch_kind } => {
1693                assert_eq!(patch_kind, PatchKind::JsonMergePatch);
1694            }
1695            other => panic!("expected DurableSummary, got {other:?}"),
1696        }
1697    }
1698
1699    #[test]
1700    fn backend_config_valkey_ctor() {
1701        let c = BackendConfig::valkey("host.local", 6379);
1702        // Same-crate match against an otherwise `#[non_exhaustive]`
1703        // enum is irrefutable — no wildcard needed and `let-else`
1704        // would trip `irrefutable_let_patterns`.
1705        let BackendConnection::Valkey(v) = &c.connection;
1706        assert_eq!(v.host, "host.local");
1707        assert_eq!(v.port, 6379);
1708        assert!(!v.tls);
1709        assert!(!v.cluster);
1710        assert_eq!(c.timeouts, BackendTimeouts::default());
1711        assert_eq!(c.retry, BackendRetry::default());
1712        assert_eq!(c.clone(), c);
1713    }
1714}