Skip to main content

ff_core/
backend.rs

1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//!   [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//!   annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//!   ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//!   at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ResumeGrant;
24use crate::types::{TimestampMs, WaitpointToken, WorkerId, WorkerInstanceId};
25use serde::{Deserialize, Serialize};
26
27// DX (HHH v0.3.4 re-smoke): `Namespace` lives in `ff_core::types` but
28// is used on `BackendConfig` + `ScannerFilter` (both defined in this
29// module). Re-export here so consumers already scoped to
30// `ff_core::backend::*` can grab it without a second `use` line
31// crossing into `ff_core::types`. Also brings `Namespace` into local
32// scope for the definitions below.
33pub use crate::types::Namespace;
34
35// RFC-013 Stage 1d — re-export the typed suspend trait surface so
36// external crates using `ff_core::backend::*` can reach the new types
37// without a second `use ff_core::contracts` line. Keeps the trait
38// naming surface coherent (`PendingWaitpoint`, `WaitpointSpec`, and
39// the new `SuspendArgs` / `SuspendOutcome` all live at the same path).
40pub use crate::contracts::{
41    CompositeBody, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget, SignalMatcher,
42    SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
43    SuspensionRequester, TimeoutBehavior, WaitpointBinding,
44};
45use std::collections::BTreeMap;
46use std::time::Duration;
47
48// ── §4.1 Handle trio ────────────────────────────────────────────────────
49
50/// Backend-tag discriminator embedded in every [`Handle`] so ops can
51/// dispatch to the correct backend implementation at runtime.
52///
53/// `#[non_exhaustive]`: new backend variants land additively as impls
54/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
55/// later).
56#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
57#[non_exhaustive]
58pub enum BackendTag {
59    /// The Valkey FCALL-backed implementation.
60    Valkey,
61    /// The Postgres-backed implementation (RFC-v0.7 Wave 1c).
62    Postgres,
63    /// The SQLite-backed implementation — dev-only (RFC-023).
64    Sqlite,
65}
66
67impl BackendTag {
68    /// Stable single-byte wire encoding for the tag. Embedded inside
69    /// [`HandleOpaque`] so cross-backend migration tooling can detect
70    /// which backend minted a given handle without parsing the rest of
71    /// the payload (see [`crate::handle_codec`]).
72    ///
73    /// Wire byte allocations:
74    /// * `0x01` — [`BackendTag::Valkey`]
75    /// * `0x02` — [`BackendTag::Postgres`] (RFC-v0.7 Wave 1c)
76    /// * `0x03` — [`BackendTag::Sqlite`] (RFC-023 Phase 2a.1.5)
77    pub const fn wire_byte(self) -> u8 {
78        match self {
79            BackendTag::Valkey => 0x01,
80            BackendTag::Postgres => 0x02,
81            BackendTag::Sqlite => 0x03,
82        }
83    }
84
85    /// Inverse of [`Self::wire_byte`].
86    pub const fn from_wire_byte(b: u8) -> Option<Self> {
87        match b {
88            0x01 => Some(BackendTag::Valkey),
89            0x02 => Some(BackendTag::Postgres),
90            0x03 => Some(BackendTag::Sqlite),
91            _ => None,
92        }
93    }
94}
95
96/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
97/// on entry to each op and return `EngineError::State` on mismatch.
98///
99/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
100/// `SuspendToken` type split (RFC-012 §4.1).
101#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
102#[non_exhaustive]
103pub enum HandleKind {
104    /// Fresh claim — returned by `claim` / `claim_from_grant`.
105    Fresh,
106    /// Resumed from resume grant — returned by `claim_from_resume_grant`
107    /// (renamed from `claim_from_reclaim` per RFC-024 PR-B+C).
108    Resumed,
109    /// Suspended — returned by `suspend`. Terminal for the lease;
110    /// resumption mints a new Handle via `claim_from_resume_grant`.
111    Suspended,
112    /// Reclaimed from a lease-reclaim grant — returned by the new
113    /// `reclaim_execution` trait method (RFC-024). Distinct from
114    /// [`HandleKind::Resumed`] (suspend/resume path) and
115    /// [`HandleKind::Fresh`] (first claim). The backing lifecycle
116    /// creates a new attempt row and bumps the reclaim counter.
117    Reclaimed,
118}
119
120/// Backend-private opaque payload carried inside a [`Handle`].
121///
122/// Encodes backend-specific state (on Valkey: exec id, attempt id,
123/// lease id, lease epoch, capability binding, partition). Consumers do
124/// not construct or inspect the bytes — they are produced by the
125/// backend on claim/resume and consumed by the backend on each op.
126///
127/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
128/// public-type transitive dep on the `bytes` crate.
129#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
130pub struct HandleOpaque(Box<[u8]>);
131
132impl HandleOpaque {
133    /// Construct from backend-owned bytes. Only backend impls call this.
134    pub fn new(bytes: Box<[u8]>) -> Self {
135        Self(bytes)
136    }
137
138    /// Borrow the underlying bytes (backend-internal use).
139    pub fn as_bytes(&self) -> &[u8] {
140        &self.0
141    }
142}
143
144/// Opaque attempt cookie held by the worker for the duration of an
145/// attempt. Produced by `claim` / `claim_from_resume_grant` / `suspend`;
146/// borrowed by every op (renew, progress, append_frame, complete, fail,
147/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
148///
149/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
150/// than consume so callers can retry after a transport error.
151#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
152#[non_exhaustive]
153pub struct Handle {
154    pub backend: BackendTag,
155    pub kind: HandleKind,
156    pub opaque: HandleOpaque,
157}
158
159impl Handle {
160    /// Construct a new Handle. Called by backend impls only; consumer
161    /// code receives Handles from `claim` / `suspend` / `claim_from_resume_grant`.
162    pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
163        Self {
164            backend,
165            kind,
166            opaque,
167        }
168    }
169}
170
171/// Serde default for a `HandleKind::Fresh` stub handle on
172/// [`crate::contracts::ClaimedExecution::handle`]. Used when
173/// deserializing a legacy payload that predates the v0.12 PR-5.5 field
174/// addition; live backends populate the field directly.
175pub fn stub_handle_fresh() -> Handle {
176    Handle::new(
177        BackendTag::Valkey,
178        HandleKind::Fresh,
179        HandleOpaque::new(Box::new([])),
180    )
181}
182
183/// Serde default for a `HandleKind::Resumed` stub handle on
184/// [`crate::contracts::ClaimedResumedExecution::handle`].
185pub fn stub_handle_resumed() -> Handle {
186    Handle::new(
187        BackendTag::Valkey,
188        HandleKind::Resumed,
189        HandleOpaque::new(Box::new([])),
190    )
191}
192
193// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
194
195/// Worker capability set — the tokens the worker advertises to the
196/// scheduler and to `claim`. Today stored as `Vec<String>` on
197/// `WorkerConfig`; promoted to a named newtype so the trait signatures
198/// can talk about capabilities without committing to a concrete
199/// container shape.
200///
201/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
202/// round-2 lean (newtype over `Vec<String>`).
203#[derive(Clone, Debug, PartialEq, Eq, Default)]
204#[non_exhaustive]
205pub struct CapabilitySet {
206    pub tokens: Vec<String>,
207}
208
209impl CapabilitySet {
210    /// Build from any iterable of string-like capability tokens.
211    pub fn new<I, S>(tokens: I) -> Self
212    where
213        I: IntoIterator<Item = S>,
214        S: Into<String>,
215    {
216        Self {
217            tokens: tokens.into_iter().map(Into::into).collect(),
218        }
219    }
220
221    /// True iff the set contains no tokens.
222    pub fn is_empty(&self) -> bool {
223        self.tokens.is_empty()
224    }
225}
226
227/// Policy hints for `claim`. Carries the worker identity the backend
228/// needs to invoke `ff_claim_execution` (v0.7 Wave 2) plus the
229/// blocking-wait bound.
230///
231/// **Wave 2 extension:** `worker_id` + `worker_instance_id` +
232/// `lease_ttl_ms` are now required so the Valkey backend's `claim`
233/// impl can issue the claim FCALL without a side-channel identity
234/// lookup. The constructor change is a pre-1.0 breaking change,
235/// tracked in the CHANGELOG.
236#[derive(Clone, Debug, PartialEq, Eq)]
237#[non_exhaustive]
238pub struct ClaimPolicy {
239    /// Maximum blocking wait. `None` means backend-default (today:
240    /// non-blocking / immediate return).
241    pub max_wait: Option<Duration>,
242    /// Worker identity (stable across process restarts).
243    pub worker_id: WorkerId,
244    /// Worker instance identity (unique per process).
245    pub worker_instance_id: WorkerInstanceId,
246    /// Lease TTL in milliseconds for the claim about to be minted.
247    pub lease_ttl_ms: u32,
248}
249
250impl ClaimPolicy {
251    /// Build a claim policy. `max_wait = None` means non-blocking /
252    /// immediate return. `lease_ttl_ms` is the TTL the backend
253    /// installs on the minted lease.
254    pub fn new(
255        worker_id: WorkerId,
256        worker_instance_id: WorkerInstanceId,
257        lease_ttl_ms: u32,
258        max_wait: Option<Duration>,
259    ) -> Self {
260        Self {
261            max_wait,
262            worker_id,
263            worker_instance_id,
264            lease_ttl_ms,
265        }
266    }
267}
268
269/// Frame classification for `append_frame`. Mirrors the Lua-side
270/// `ff_append_frame` `frame_type` ARGV.
271#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
272#[non_exhaustive]
273pub enum FrameKind {
274    /// Operator-visible progress output / log line.
275    Stdout,
276    /// Operator-visible error / warning output.
277    Stderr,
278    /// Structured event (JSON payload).
279    Event,
280    /// Binary / opaque payload.
281    Blob,
282}
283
284// ── RFC-015 §1–§6: Stream-durability modes ──────────────────────────────
285
286/// Patch format used by [`StreamMode::DurableSummary`] to apply each
287/// frame's payload against the server-side rolling summary document.
288///
289/// v0.6 ships a single variant, `JsonMergePatch` (RFC 7396). The enum
290/// is `#[non_exhaustive]` so the future `PatchKind::StringAppend`
291/// variant flagged in RFC-015 §11 can land additively without a
292/// breaking change.
293#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
294#[non_exhaustive]
295pub enum PatchKind {
296    /// RFC 7396 JSON Merge Patch. Locked choice for v0.6 (RFC-015 §3.2).
297    JsonMergePatch,
298}
299
300/// Per-call durability mode for [`EngineBackend::append_frame`].
301///
302/// RFC-015 §1. Mode is **per-call** — workers routinely mix frame types
303/// (tokens + progress + final summary) in a single attempt and the right
304/// durability differs per frame type. See RFC-015 §5 for the caveat on
305/// mixed-mode streams.
306///
307/// `#[non_exhaustive]`: new modes land additively (the v0.6 PR deliberately
308/// excludes `KeepAllDeltas`, dropped by owner adjudication; a future
309/// mode for e.g. per-frame-replicated streams would land here).
310///
311/// ## Doc-comment contract on [`Self::Durable`]
312///
313/// If the same stream also receives [`Self::BestEffortLive`] frames,
314/// `Durable` frames are subject to the best-effort MAXLEN trim (see
315/// RFC-015 §5). Callers that need strict retention for `Durable` frames
316/// alongside best-effort telemetry must not mix modes on one stream or
317/// should place the durable frames on a sibling stream.
318#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
319#[non_exhaustive]
320pub enum StreamMode {
321    /// Current default — every frame XADDs as a durable entry.
322    #[default]
323    Durable,
324    /// Server-side rolling-summary collapse. Each frame's payload is a
325    /// delta/patch applied atomically to a summary Hash (per
326    /// [`PatchKind`]); the delta is also XADDed to the stream with
327    /// `mode=summary` fields so live tailers continue to observe change
328    /// events. RFC-015 §3.
329    DurableSummary { patch_kind: PatchKind },
330    /// Short-lived frame. XADDed with `mode=best_effort`; per-stream
331    /// MAXLEN rolls it off and the stream key gets a TTL refresh only
332    /// when the stream has never held a durable frame (RFC-015 §4.1).
333    ///
334    /// The per-stream MAXLEN is computed dynamically in Lua from an
335    /// EMA of observed append rate (RFC-015 §4.2). See
336    /// [`BestEffortLiveConfig`] for the tunable knobs.
337    BestEffortLive { config: BestEffortLiveConfig },
338}
339
340/// Configuration knobs for [`StreamMode::BestEffortLive`] — RFC-015
341/// §4.2 dynamic MAXLEN sizing.
342///
343/// Defaults derived from the Phase 0 benchmark + RFC §4.1/§4.3 final
344/// design:
345///   - `ttl_ms` — caller-supplied visibility target.
346///   - `maxlen_floor = 64` — RFC §4.1 round-2 default, used for
347///     low-rate streams where the EMA formula would under-size.
348///   - `maxlen_ceiling = 16_384` — cap on per-stream retained entries
349///     (raised from the original §4.2 draft of 2048 after Phase 0
350///     showed 200–4000 Hz LLM-token bursts saturate any lower clamp).
351///   - `ema_alpha = 0.2` — RFC §4.3 gate value. Weights the latest
352///     per-append instantaneous rate at 20 %, decays prior samples
353///     at 80 % each append.
354#[derive(Clone, Copy, Debug, PartialEq)]
355#[non_exhaustive]
356pub struct BestEffortLiveConfig {
357    pub ttl_ms: u32,
358    pub maxlen_floor: u32,
359    pub maxlen_ceiling: u32,
360    pub ema_alpha: f64,
361}
362
363impl BestEffortLiveConfig {
364    /// Construct with the given `ttl_ms` and defaults for the other
365    /// knobs. Alias for [`Self::with_ttl`] — provided so external
366    /// consumers have a conventional `new` constructor against this
367    /// `#[non_exhaustive]` struct.
368    pub fn new(ttl_ms: u32) -> Self {
369        Self::with_ttl(ttl_ms)
370    }
371
372    /// Construct with the given `ttl_ms` and defaults for the other
373    /// knobs. Matches the shorthand used by
374    /// [`StreamMode::best_effort_live`].
375    pub fn with_ttl(ttl_ms: u32) -> Self {
376        Self {
377            ttl_ms,
378            maxlen_floor: 64,
379            maxlen_ceiling: 16_384,
380            ema_alpha: 0.2,
381        }
382    }
383
384    /// Builder: override [`Self::maxlen_floor`]. Chainable.
385    pub fn with_maxlen_floor(mut self, floor: u32) -> Self {
386        self.maxlen_floor = floor;
387        self
388    }
389
390    /// Builder: override [`Self::maxlen_ceiling`]. Chainable.
391    pub fn with_maxlen_ceiling(mut self, ceiling: u32) -> Self {
392        self.maxlen_ceiling = ceiling;
393        self
394    }
395
396    /// Builder: override [`Self::ema_alpha`]. Chainable. Callers are
397    /// expected to keep α in `(0.0, 1.0]`; the Lua side clamps to that
398    /// range defensively.
399    pub fn with_ema_alpha(mut self, alpha: f64) -> Self {
400        self.ema_alpha = alpha;
401        self
402    }
403}
404
405// Eq/Hash on an f64 field is unsafe (NaN); derive only what's sound.
406impl Eq for BestEffortLiveConfig {}
407impl std::hash::Hash for BestEffortLiveConfig {
408    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
409        self.ttl_ms.hash(state);
410        self.maxlen_floor.hash(state);
411        self.maxlen_ceiling.hash(state);
412        self.ema_alpha.to_bits().hash(state);
413    }
414}
415
416impl StreamMode {
417    /// The v0.6 default — [`Self::Durable`]. Provided for symmetry with
418    /// the other constructors (the enum is `#[non_exhaustive]` so
419    /// cross-crate consumers cannot construct variants by name).
420    pub fn durable() -> Self {
421        StreamMode::Durable
422    }
423
424    /// [`Self::DurableSummary`] with [`PatchKind::JsonMergePatch`] —
425    /// the only supported `PatchKind` in v0.6.
426    pub fn durable_summary() -> Self {
427        StreamMode::DurableSummary {
428            patch_kind: PatchKind::JsonMergePatch,
429        }
430    }
431
432    /// [`Self::BestEffortLive`] with the caller-supplied `ttl_ms` and
433    /// default [`BestEffortLiveConfig`] knobs.
434    /// RFC-015 §4 guidance: `5_000..=30_000` ms. Below ~1000 ms a live
435    /// tailer may not connect in time; above ~60_000 ms the memory
436    /// argument against plain [`Self::Durable`] weakens.
437    pub fn best_effort_live(ttl_ms: u32) -> Self {
438        StreamMode::BestEffortLive {
439            config: BestEffortLiveConfig::with_ttl(ttl_ms),
440        }
441    }
442
443    /// [`Self::BestEffortLive`] with a fully-specified
444    /// [`BestEffortLiveConfig`]. Use for per-workload tuning of α, the
445    /// MAXLEN clamp, or the TTL — defaults are wired from
446    /// [`BestEffortLiveConfig::with_ttl`].
447    pub fn best_effort_live_with_config(config: BestEffortLiveConfig) -> Self {
448        StreamMode::BestEffortLive { config }
449    }
450
451    /// Stable wire-level token for this mode, written to the XADD entry
452    /// `mode` field (RFC-015 §6.1). Tail filters compare against these
453    /// string values server-side.
454    pub fn wire_str(&self) -> &'static str {
455        match self {
456            StreamMode::Durable => "durable",
457            StreamMode::DurableSummary { .. } => "summary",
458            StreamMode::BestEffortLive { .. } => "best_effort",
459        }
460    }
461}
462
463/// Tail-stream visibility filter (RFC-015 §6).
464///
465/// Default [`Self::All`] preserves v1 behaviour; opt-in
466/// [`Self::ExcludeBestEffort`] filters out `BestEffortLive` frames on
467/// the server side (the XADD `mode` field is a cheap field check in
468/// `ff_read_attempt_stream` / `xread_block`).
469#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
470#[non_exhaustive]
471pub enum TailVisibility {
472    /// Default. Returns every XADD entry in the stream regardless of
473    /// mode.
474    #[default]
475    All,
476    /// Returns only frames appended under [`StreamMode::Durable`] or
477    /// [`StreamMode::DurableSummary`] (i.e. filters out
478    /// [`StreamMode::BestEffortLive`]). Named to be self-describing:
479    /// the filter excludes best-effort frames, not "only Durable" —
480    /// `DurableSummary` deltas are included because they have a
481    /// durable backing (the summary Hash).
482    ExcludeBestEffort,
483}
484
485impl TailVisibility {
486    /// Stable wire-level token pushed as an ARGV to the Lua tail/read
487    /// implementations. `""` (empty) means default = `All` (no filter).
488    pub fn wire_str(&self) -> &'static str {
489        match self {
490            TailVisibility::All => "",
491            TailVisibility::ExcludeBestEffort => "exclude_best_effort",
492        }
493    }
494}
495
496/// Byte-exact null-sentinel used by [`StreamMode::DurableSummary`] +
497/// [`PatchKind::JsonMergePatch`] to set a field to JSON `null` (RFC-015
498/// §3.2).
499///
500/// RFC 7396 treats `null` as "delete the key", so callers that actually
501/// want a literal JSON `null` leaf send this sentinel string as the
502/// scalar leaf value; the Lua apply-path rewrites the sentinel to JSON
503/// `null` after the merge.
504///
505/// # Round-trip invariant
506///
507/// The summary document — as returned by a `read_summary` call — NEVER
508/// contains the sentinel string. Any `null` observed always means "this
509/// field is explicitly null." See [`SummaryDocument`].
510pub const SUMMARY_NULL_SENTINEL: &str = "__ff_null__";
511
512/// Materialised rolling summary document returned by a summary-read
513/// call (RFC-015 §6.3).
514///
515/// `document` is the caller-owned JSON value; scalar leaves that the
516/// caller wrote through the [`SUMMARY_NULL_SENTINEL`] contract appear
517/// here as JSON `null` (not as the sentinel string — the round-trip
518/// invariant erases the sentinel on read).
519///
520/// `#[non_exhaustive]` keeps future additive fields (e.g. a compacted
521/// delta cursor, a schema-version tag) additive. Construct via
522/// [`Self::new`].
523#[derive(Clone, Debug, PartialEq, Eq)]
524#[non_exhaustive]
525pub struct SummaryDocument {
526    /// The rolling summary as JSON bytes (UTF-8, encoded by the
527    /// server-side applier after merge). Stored as `Vec<u8>` instead
528    /// of `serde_json::Value` to keep `ff_core::backend` free of a
529    /// public `serde_json` type dependency.
530    pub document_json: Vec<u8>,
531    /// Monotonic version bumped on every delta applied. `0` is never
532    /// observed — the first applied delta returns `1`.
533    pub version: u64,
534    /// Which [`PatchKind`] was used to build the document.
535    pub patch_kind: PatchKind,
536    /// Unix millis of the most recent delta application.
537    pub last_updated_ms: u64,
538    /// Unix millis of the first delta applied to this attempt.
539    pub first_applied_ms: u64,
540}
541
542impl SummaryDocument {
543    /// Build a summary snapshot. Used by backends parsing the Hash
544    /// fields; external consumers receive these from `read_summary`.
545    pub fn new(
546        document_json: Vec<u8>,
547        version: u64,
548        patch_kind: PatchKind,
549        last_updated_ms: u64,
550        first_applied_ms: u64,
551    ) -> Self {
552        Self {
553            document_json,
554            version,
555            patch_kind,
556            last_updated_ms,
557            first_applied_ms,
558        }
559    }
560}
561
562/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
563/// Today's FCALL takes the byte payload + frame_type + optional seq as
564/// discrete ARGV; Stage 0 collects them into a named type for trait
565/// signatures.
566///
567/// **Round-7 follow-up (PR #145 → #146):** extended with
568/// `frame_type: String` (the SDK-public free-form classifier — values
569/// like `"delta"`, `"log"`, `"agent_step"`, `"summary_token"`,
570/// `"transcribe_line"`, `"progress"` — distinct from the coarse
571/// [`FrameKind`] enum) and `correlation_id: Option<String>` (the
572/// wire-level `correlation_id` ARGV, surfaced at the SDK as
573/// `metadata: Option<&str>`). Adding these lets
574/// `ClaimedTask::append_frame` forward through the trait without
575/// wire-parity regression.
576///
577/// `frame_type` is free-form and is what the backend writes into the
578/// Lua-side `frame_type` ARGV. [`FrameKind`] remains for typed
579/// classification at the trait surface; when callers populate only
580/// `kind`, the backend falls back to a stable encoding of the enum
581/// variant (see `frame_kind_to_str` in `ff-backend-valkey`).
582#[derive(Clone, Debug, PartialEq, Eq)]
583#[non_exhaustive]
584pub struct Frame {
585    pub bytes: Vec<u8>,
586    pub kind: FrameKind,
587    /// Optional monotonic sequence. Set by the caller when the stream
588    /// protocol is sequence-bound; `None` lets the backend assign.
589    pub seq: Option<u64>,
590    /// Free-form classifier written to the Lua-side `frame_type` ARGV.
591    /// Empty string means "defer to [`FrameKind`]" — the backend
592    /// substitutes the enum-variant encoding.
593    pub frame_type: String,
594    /// Optional correlation id (wire `correlation_id` ARGV). `None`
595    /// encodes as the empty string on the wire.
596    pub correlation_id: Option<String>,
597    /// Durability mode for this frame (RFC-015 §1). Defaults to
598    /// [`StreamMode::Durable`] for v1-caller parity — the field was
599    /// added additively so pre-RFC-015 construction via
600    /// [`Self::new`] / [`Self::with_seq`] sees `Durable` without code
601    /// change.
602    pub mode: StreamMode,
603}
604
605impl Frame {
606    /// Construct a frame. `seq` defaults to `None` (backend-assigned);
607    /// `frame_type` defaults to empty (backend falls back to
608    /// `FrameKind` encoding); `correlation_id` defaults to `None`.
609    /// Callers that need an explicit sequence use [`Frame::with_seq`];
610    /// callers on the SDK forwarder path populate `frame_type` +
611    /// `correlation_id` via [`Frame::with_frame_type`] /
612    /// [`Frame::with_correlation_id`].
613    pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
614        Self {
615            bytes,
616            kind,
617            seq: None,
618            frame_type: String::new(),
619            correlation_id: None,
620            mode: StreamMode::Durable,
621        }
622    }
623
624    /// Construct a frame with an explicit monotonic sequence.
625    pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
626        Self {
627            bytes,
628            kind,
629            seq: Some(seq),
630            frame_type: String::new(),
631            correlation_id: None,
632            mode: StreamMode::Durable,
633        }
634    }
635
636    /// Builder-style setter for the RFC-015 durability [`StreamMode`].
637    /// Defaults to [`StreamMode::Durable`] (v1 parity) when unset.
638    pub fn with_mode(mut self, mode: StreamMode) -> Self {
639        self.mode = mode;
640        self
641    }
642
643    /// Builder-style setter for the free-form `frame_type` classifier.
644    pub fn with_frame_type(mut self, frame_type: impl Into<String>) -> Self {
645        self.frame_type = frame_type.into();
646        self
647    }
648
649    /// Builder-style setter for the optional `correlation_id`.
650    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
651        self.correlation_id = Some(correlation_id.into());
652        self
653    }
654}
655
656// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
657
658/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
659/// — signal name, correlation id, etc.).
660#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
661#[non_exhaustive]
662pub enum WaitpointKind {
663    /// Match by signal name.
664    SignalName,
665    /// Match by correlation id.
666    CorrelationId,
667    /// Generic external-signal waitpoint (external delivery).
668    External,
669}
670
671/// HMAC token that binds a waitpoint to its mint-time identity. Wire
672/// shape `kid:40hex`.
673///
674/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
675/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
676/// through automatically — no derived formatter can leak the raw
677/// digest when a [`WaitpointSpec`] is debug-printed via
678/// `tracing::debug!(spec=?spec)`.
679///
680/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
681/// naming the waitpoint-bound HMAC role explicitly.
682#[derive(Clone, PartialEq, Eq, Hash)]
683pub struct WaitpointHmac(WaitpointToken);
684
685impl WaitpointHmac {
686    pub fn new(token: impl Into<String>) -> Self {
687        Self(WaitpointToken::from(token.into()))
688    }
689
690    /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
691    /// redact — call sites that need the raw digest must explicitly
692    /// call [`WaitpointToken::as_str`].
693    pub fn token(&self) -> &WaitpointToken {
694        &self.0
695    }
696
697    /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
698    /// non-redacted call sites; this method exists only for transport
699    /// / FCALL ARGV construction where the raw wire bytes are required.
700    pub fn as_str(&self) -> &str {
701        self.0.as_str()
702    }
703}
704
705// Forward Debug / Display to the wrapped WaitpointToken so the
706// redaction guarantees on that type extend here. Derived Debug would
707// expose the raw string field and defeat the wrap.
708impl std::fmt::Debug for WaitpointHmac {
709    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
710        write!(f, "WaitpointHmac({:?})", self.0)
711    }
712}
713
714/// Handle returned by `create_waitpoint` — the id of the newly-minted
715/// pending waitpoint plus its HMAC token. Signals targeted at the
716/// waitpoint must present the token; a later `suspend` call transitions
717/// the waitpoint from `pending` to `active` (RFC-012 §R7.2.2).
718///
719/// `WaitpointHmac` redacts on `Debug`/`Display`, so deriving `Debug`
720/// here cannot leak the raw digest.
721#[derive(Clone, Debug, PartialEq, Eq)]
722#[non_exhaustive]
723pub struct PendingWaitpoint {
724    pub waitpoint_id: crate::types::WaitpointId,
725    pub hmac_token: WaitpointHmac,
726}
727
728impl PendingWaitpoint {
729    pub fn new(waitpoint_id: crate::types::WaitpointId, hmac_token: WaitpointHmac) -> Self {
730        Self {
731            waitpoint_id,
732            hmac_token,
733        }
734    }
735}
736
737/// One waitpoint inside a suspend request. `suspend` takes a
738/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
739/// the enclosing suspend args in the Phase-1 contract.
740#[derive(Clone, Debug, PartialEq, Eq)]
741#[non_exhaustive]
742pub struct WaitpointSpec {
743    pub kind: WaitpointKind,
744    pub matcher: Vec<u8>,
745    pub hmac_token: WaitpointHmac,
746}
747
748impl WaitpointSpec {
749    pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
750        Self {
751            kind,
752            matcher,
753            hmac_token,
754        }
755    }
756}
757
758// ── §3.3.0 Failure classification types ─────────────────────────────────
759
760/// Human-readable failure description + optional structured detail.
761/// Replaces today's ad-hoc string arg to `fail`.
762#[derive(Clone, Debug, PartialEq, Eq)]
763#[non_exhaustive]
764pub struct FailureReason {
765    pub message: String,
766    pub detail: Option<Vec<u8>>,
767}
768
769impl FailureReason {
770    pub fn new(message: impl Into<String>) -> Self {
771        Self {
772            message: message.into(),
773            detail: None,
774        }
775    }
776
777    pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
778        Self {
779            message: message.into(),
780            detail: Some(detail),
781        }
782    }
783}
784
785/// Failure classification — determines retry disposition on the Lua
786/// side. Mirrors the Lua-side classification codes.
787#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
788#[non_exhaustive]
789pub enum FailureClass {
790    /// Retryable transient error.
791    Transient,
792    /// Permanent error — no retries.
793    Permanent,
794    /// Crash / process death inferred from lease expiry.
795    InfraCrash,
796    /// Timeout at the attempt or operation level.
797    Timeout,
798    /// Cooperative cancellation by operator or cancel_flow.
799    Cancelled,
800}
801
802// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
803
804/// Usage report for `report_usage`. Mirrors today's
805/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
806/// dimensions.
807#[derive(Clone, Debug, PartialEq, Eq, Default)]
808#[non_exhaustive]
809pub struct UsageDimensions {
810    /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
811    pub input_tokens: u64,
812    /// Output tokens produced (LLM-shaped usage).
813    pub output_tokens: u64,
814    /// Wall-clock duration, in milliseconds, attributable to this
815    /// report. `None` for pure token-count reports.
816    pub wall_ms: Option<u64>,
817    /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
818    /// iteration order (important for dedup-key derivation on some
819    /// budget schemes).
820    pub custom: BTreeMap<String, u64>,
821    /// Optional caller-supplied idempotency key. When set, the backend
822    /// rejects a repeat application of the same key with
823    /// `ReportUsageResult::AlreadyApplied` rather than double-counting
824    /// (RFC-012 §R7.4; Lua `ff_report_usage_and_check` threads this as
825    /// the trailing ARGV). `None` / empty string disables dedup.
826    pub dedup_key: Option<String>,
827}
828
829impl UsageDimensions {
830    /// Create an empty usage report (all dimensions zero / `None`).
831    ///
832    /// Provided for external-crate consumers: `UsageDimensions` is
833    /// `#[non_exhaustive]`, so struct-literal and functional-update
834    /// construction are unavailable across crate boundaries. Start
835    /// from `new()` and chain `with_*` setters to build a report.
836    pub fn new() -> Self {
837        Self::default()
838    }
839
840    /// Set the input-token count dimension. Consumes and returns
841    /// `self` for chaining.
842    pub fn with_input_tokens(mut self, tokens: u64) -> Self {
843        self.input_tokens = tokens;
844        self
845    }
846
847    /// Set the output-token count dimension. Consumes and returns
848    /// `self` for chaining.
849    pub fn with_output_tokens(mut self, tokens: u64) -> Self {
850        self.output_tokens = tokens;
851        self
852    }
853
854    /// Set the wall-clock duration dimension, in milliseconds.
855    /// Consumes and returns `self` for chaining.
856    pub fn with_wall_ms(mut self, ms: u64) -> Self {
857        self.wall_ms = Some(ms);
858        self
859    }
860
861    /// Set the optional caller-supplied idempotency key. When set,
862    /// the backend rejects a repeat application of the same key with
863    /// `ReportUsageResult::AlreadyApplied` rather than double-counting
864    /// (RFC-012 §R7.4). Consumes and returns `self` for chaining.
865    pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
866        self.dedup_key = Some(key.into());
867        self
868    }
869}
870
871// ── §3.3.0 Resume / lease types ────────────────────────────────────────
872
873/// Opaque cookie returned by the reclaim scanner; consumed by
874/// [`crate::engine_backend::EngineBackend::claim_from_resume_grant`]
875/// to mint a resumed Handle.
876///
877/// Wraps [`ResumeGrant`] today (the scanner's existing product).
878/// Kept as a newtype so trait signatures name the resume-bound role
879/// explicitly and so the wrapped shape can evolve without breaking the
880/// trait.
881///
882/// **Naming history (RFC-024).** This type was historically called
883/// `ReclaimToken`. Its semantic is resume-after-suspend (it wraps a
884/// `ResumeGrant` and feeds `ff_claim_resumed_execution`), so
885/// RFC-024 Rev 2 renamed it to `ResumeToken`. The transitional
886/// `ReclaimToken` alias lived for one PR and was dropped in PR-B+C;
887/// downstream call sites migrate to `ResumeToken`.
888#[derive(Clone, Debug, PartialEq, Eq)]
889#[non_exhaustive]
890pub struct ResumeToken {
891    pub grant: ResumeGrant,
892    /// Worker identity that will claim the resumed execution.
893    /// Wave 2 additive extension — mirrors the `ClaimPolicy`
894    /// shape since `claim_from_resume_grant` does not take a
895    /// `ClaimPolicy`.
896    pub worker_id: WorkerId,
897    /// Worker instance identity.
898    pub worker_instance_id: WorkerInstanceId,
899    /// Lease TTL (ms) for the resumed attempt's fresh lease.
900    pub lease_ttl_ms: u32,
901}
902
903impl ResumeToken {
904    pub fn new(
905        grant: ResumeGrant,
906        worker_id: WorkerId,
907        worker_instance_id: WorkerInstanceId,
908        lease_ttl_ms: u32,
909    ) -> Self {
910        Self {
911            grant,
912            worker_id,
913            worker_instance_id,
914            lease_ttl_ms,
915        }
916    }
917}
918
919/// Result of a successful `renew` call.
920#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
921#[non_exhaustive]
922pub struct LeaseRenewal {
923    /// New lease expiry (monotonic ms).
924    pub expires_at_ms: u64,
925    /// Lease epoch after renewal. Monotonic non-decreasing.
926    pub lease_epoch: u64,
927}
928
929impl LeaseRenewal {
930    pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
931        Self {
932            expires_at_ms,
933            lease_epoch,
934        }
935    }
936}
937
938// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
939
940/// Cancel-flow policy — what to do with the flow's members. Today
941/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
942/// policy shape as a typed enum (RFC-012 §3.3.0).
943#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
944#[non_exhaustive]
945pub enum CancelFlowPolicy {
946    /// Cancel only the flow record; leave members alone.
947    FlowOnly,
948    /// Cancel the flow and every non-terminal member execution.
949    CancelAll,
950    /// Cancel the flow and every member currently in `Pending` /
951    /// `Blocked` / `Eligible` — leave `Running` executions alone to
952    /// drain.
953    CancelPending,
954}
955
956/// Caller wait posture for `cancel_flow`.
957#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
958#[non_exhaustive]
959pub enum CancelFlowWait {
960    /// Return after the flow-state flip; member cancellations dispatch
961    /// asynchronously.
962    NoWait,
963    /// Block until member cancellations complete, up to `timeout`.
964    WaitTimeout(Duration),
965    /// Block until member cancellations complete, no deadline.
966    WaitIndefinite,
967}
968
969// ── §3.3.0 Completion stream payload ────────────────────────────────────
970
971/// One completion event delivered through the `CompletionStream`
972/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
973/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
974///
975/// `flow_id` was added in issue #90 so DAG-dependency routing
976/// (`dispatch_dependency_resolution`) has the partition-routable flow
977/// handle without reparsing the Lua-emitted JSON downstream.
978/// `#[non_exhaustive]` keeps future field additions additive; use
979/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
980/// for construction.
981///
982/// `payload_bytes` / `produced_at_ms` are authorised but not yet
983/// populated by the Valkey Lua emitters — consumers on the
984/// `CompletionStream` read `execution_id` + `flow_id` today and must
985/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
986#[derive(Clone, Debug, PartialEq, Eq)]
987#[non_exhaustive]
988pub struct CompletionPayload {
989    pub execution_id: crate::types::ExecutionId,
990    pub outcome: String,
991    pub payload_bytes: Option<Vec<u8>>,
992    pub produced_at_ms: TimestampMs,
993    /// Flow handle for partition routing. Added in issue #90 (#90);
994    /// `None` for emitters that don't yet surface it.
995    pub flow_id: Option<crate::types::FlowId>,
996}
997
998impl CompletionPayload {
999    pub fn new(
1000        execution_id: crate::types::ExecutionId,
1001        outcome: impl Into<String>,
1002        payload_bytes: Option<Vec<u8>>,
1003        produced_at_ms: TimestampMs,
1004    ) -> Self {
1005        Self {
1006            execution_id,
1007            outcome: outcome.into(),
1008            payload_bytes,
1009            produced_at_ms,
1010            flow_id: None,
1011        }
1012    }
1013
1014    /// Attach a flow handle to the payload. Additive builder so adding
1015    /// `flow_id` didn't require a breaking change to [`Self::new`].
1016    #[must_use]
1017    pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
1018        self.flow_id = Some(flow_id);
1019        self
1020    }
1021}
1022
1023// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
1024
1025/// Signal that satisfied a waitpoint matcher and is therefore part of
1026/// the reason an execution resumed. Returned by `observe_signals`
1027/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
1028///
1029/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
1030/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
1031///
1032/// Returned only for signals whose matcher slot in the waitpoint's
1033/// resume condition is marked satisfied. Pre-buffered-but-unmatched
1034/// signals are not included.
1035///
1036/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
1037/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
1038/// the Stage 0 crate move.
1039#[derive(Clone, Debug, PartialEq, Eq)]
1040pub struct ResumeSignal {
1041    pub signal_id: crate::types::SignalId,
1042    pub signal_name: String,
1043    pub signal_category: String,
1044    pub source_type: String,
1045    pub source_identity: String,
1046    pub correlation_id: String,
1047    /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
1048    /// accepted this signal. `0` if the stored `accepted_at` field is
1049    /// missing or non-numeric (a Lua-side defect — not expected at
1050    /// runtime).
1051    pub accepted_at: TimestampMs,
1052    /// Raw payload bytes, if the signal was delivered with one. `None`
1053    /// for signals delivered without a payload.
1054    pub payload: Option<Vec<u8>>,
1055}
1056
1057// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
1058
1059/// Outcome of a `fail()` call.
1060///
1061/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
1062/// `ff_core::backend::FailOutcome` so it is nameable by the
1063/// `EngineBackend` trait signature. `ff_sdk::task` retains a
1064/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
1065///
1066/// Not `#[non_exhaustive]` because existing consumers (ff-test,
1067/// ff-readiness-tests) construct and match this enum exhaustively;
1068/// the shape has been stable since the `fail()` API landed and any
1069/// additive growth would be a follow-up RFC's deliberate break.
1070#[derive(Clone, Debug, PartialEq, Eq)]
1071pub enum FailOutcome {
1072    /// Retry was scheduled — execution is in delayed backoff.
1073    RetryScheduled {
1074        delay_until: crate::types::TimestampMs,
1075    },
1076    /// No retries left — execution is terminal failed.
1077    TerminalFailed,
1078}
1079
1080// ── Issue #281: PrepareOutcome (boot-prep trait-method return) ────────
1081
1082/// Outcome of an [`EngineBackend::prepare`](crate::engine_backend::EngineBackend::prepare)
1083/// call — one-time backend-specific boot preparation.
1084///
1085/// Issue #281: moves cairn's `ensure_library` retry loop upstream so
1086/// consumers can boot any backend uniformly via
1087/// `backend.prepare().await?` without knowing whether it is Valkey
1088/// (FUNCTION LOAD) or Postgres (migrations are out-of-band per
1089/// RFC-v0.7 Wave 0 Q12, so `NoOp`). `#[non_exhaustive]` so future
1090/// backends can add variants (e.g. `Skipped { reason }`) additively.
1091#[derive(Clone, Debug, PartialEq, Eq)]
1092#[non_exhaustive]
1093pub enum PrepareOutcome {
1094    /// Backend had nothing to do — either genuinely no-op (Postgres:
1095    /// migrations applied out-of-band) or the requested prep work was
1096    /// already in place and idempotent (Valkey: library already at
1097    /// the expected version; the Valkey impl collapses the
1098    /// "already-present" case into `Applied` to keep one success
1099    /// variant — consumers that want to distinguish can parse
1100    /// `description`).
1101    NoOp,
1102    /// Backend ran preparation work (e.g. Valkey FUNCTION LOAD
1103    /// REPLACE). `description` is a human-readable summary suitable
1104    /// for `info!`-level log lines like
1105    /// `"FUNCTION LOAD (flowfabric lib v<N>)"`; shape is not
1106    /// machine-parseable and MAY change across versions.
1107    Applied {
1108        /// Human-readable summary of what was prepared.
1109        description: String,
1110    },
1111}
1112
1113impl PrepareOutcome {
1114    /// Shorthand constructor for the `Applied` variant.
1115    pub fn applied(description: impl Into<String>) -> Self {
1116        Self::Applied {
1117            description: description.into(),
1118        }
1119    }
1120}
1121
1122// ── RFC-012 §R7: AppendFrameOutcome move ────────────────────────────────
1123
1124/// Outcome of an `append_frame()` call.
1125///
1126/// **RFC-012 §R7.2.1:** moved from `ff_sdk::task::AppendFrameOutcome`
1127/// to `ff_core::backend::AppendFrameOutcome` so it is nameable by the
1128/// `EngineBackend::append_frame` trait return. `ff_sdk::task` retains
1129/// a `pub use` shim preserving the `ff_sdk::task::AppendFrameOutcome`
1130/// path through 0.4.x.
1131///
1132/// Derive set matches the `FailOutcome` precedent
1133/// (`Clone, Debug, PartialEq, Eq`). Not `#[non_exhaustive]`:
1134/// construction is internal to the backend today (parser in
1135/// `ff-backend-valkey`), and no external constructors are anticipated
1136/// (consumer-shape evidence per §R7.2.1 / MN3).
1137///
1138/// `stream_id: String` is a stable shape commitment — a future typed
1139/// `StreamId` newtype would be its own breaking change (§R7.5.6 / MD2).
1140///
1141/// RFC-015 §9 made this type `#[non_exhaustive]` so the new
1142/// `summary_version: Option<u64>` field (populated only for
1143/// [`StreamMode::DurableSummary`] appends) can land additively and
1144/// future per-mode outcome fields can follow. Construct via
1145/// [`Self::new`] + the chainable setters; cross-crate consumers cannot
1146/// use struct-literal construction.
1147#[derive(Clone, Debug, PartialEq, Eq, Default)]
1148#[non_exhaustive]
1149pub struct AppendFrameOutcome {
1150    /// Valkey Stream entry ID assigned to this frame (e.g. `1234567890-0`).
1151    pub stream_id: String,
1152    /// Total frame count in the stream after this append.
1153    pub frame_count: u64,
1154    /// Rolling summary `version` after the delta applied, populated
1155    /// only for [`StreamMode::DurableSummary`] appends (RFC-015 §3.3
1156    /// step 6). `None` for [`StreamMode::Durable`] /
1157    /// [`StreamMode::BestEffortLive`] appends — callers that need
1158    /// "total deltas applied" read this field.
1159    pub summary_version: Option<u64>,
1160}
1161
1162impl AppendFrameOutcome {
1163    /// Build an outcome with the mandatory `stream_id` / `frame_count`
1164    /// fields. `summary_version` defaults to `None` — call
1165    /// [`Self::with_summary_version`] for [`StreamMode::DurableSummary`]
1166    /// appends.
1167    pub fn new(stream_id: impl Into<String>, frame_count: u64) -> Self {
1168        Self {
1169            stream_id: stream_id.into(),
1170            frame_count,
1171            summary_version: None,
1172        }
1173    }
1174
1175    /// Attach a rolling summary version (RFC-015 §9).
1176    #[must_use]
1177    pub fn with_summary_version(mut self, version: u64) -> Self {
1178        self.summary_version = Some(version);
1179        self
1180    }
1181}
1182
1183// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
1184
1185/// Pool timing shared across backend connections.
1186///
1187/// Fields are `#[non_exhaustive]` per project convention — connection
1188/// tunables grow as new backends land. Default is the Phase-1 Valkey
1189/// client's out-of-box shape (no explicit timeout, no explicit pool
1190/// cap; inherits ferriskey defaults).
1191#[derive(Clone, Debug, Default, PartialEq, Eq)]
1192#[non_exhaustive]
1193pub struct BackendTimeouts {
1194    /// Per-request timeout. `None` ⇒ backend default.
1195    pub request: Option<Duration>,
1196}
1197
1198impl BackendTimeouts {
1199    /// Construct a [`BackendTimeouts`] with all fields set to their
1200    /// backend-default sentinel (`None`). Equivalent to
1201    /// [`Self::default`] — provided so external consumers have an
1202    /// explicit constructor against this `#[non_exhaustive]` struct.
1203    pub fn new() -> Self {
1204        Self::default()
1205    }
1206}
1207
1208/// Retry policy shared across backend connections.
1209///
1210/// Matches ferriskey's `ConnectionRetryStrategy` shape (see
1211/// `ferriskey/src/client/types.rs:151`) so Stage 1c's Valkey wiring is a
1212/// direct pass-through — we don't reimplement what ferriskey already
1213/// provides. The Postgres backend (future) interprets the same fields
1214/// under its own retry semantics, or maps `None` to its own defaults.
1215///
1216/// Each field is `Option<u32>`: `None` ⇒ backend default (for Valkey,
1217/// this means `ConnectionRetryStrategy::default()`); `Some(v)` ⇒ pass
1218/// `v` straight through.
1219#[derive(Clone, Debug, Default, PartialEq, Eq)]
1220#[non_exhaustive]
1221pub struct BackendRetry {
1222    /// Exponent base for the backoff curve. `None` ⇒ backend default.
1223    pub exponent_base: Option<u32>,
1224    /// Multiplicative factor applied to each backoff step. `None` ⇒
1225    /// backend default.
1226    pub factor: Option<u32>,
1227    /// Maximum number of retry attempts on transient transport errors.
1228    /// `None` ⇒ backend default.
1229    pub number_of_retries: Option<u32>,
1230    /// Jitter as a percentage of the computed backoff. `None` ⇒ backend
1231    /// default.
1232    pub jitter_percent: Option<u32>,
1233}
1234
1235impl BackendRetry {
1236    /// Construct a [`BackendRetry`] with all fields set to their
1237    /// backend-default sentinel (`None`). Equivalent to
1238    /// [`Self::default`] — provided so external consumers have an
1239    /// explicit constructor against this `#[non_exhaustive]` struct.
1240    pub fn new() -> Self {
1241        Self::default()
1242    }
1243}
1244
1245/// Valkey-specific connection parameters.
1246#[derive(Clone, Debug, PartialEq, Eq)]
1247#[non_exhaustive]
1248pub struct ValkeyConnection {
1249    /// Valkey hostname.
1250    pub host: String,
1251    /// Valkey port.
1252    pub port: u16,
1253    /// Enable TLS for the Valkey connection.
1254    pub tls: bool,
1255    /// Enable Valkey cluster mode.
1256    pub cluster: bool,
1257}
1258
1259impl ValkeyConnection {
1260    pub fn new(host: impl Into<String>, port: u16) -> Self {
1261        Self {
1262            host: host.into(),
1263            port,
1264            tls: false,
1265            cluster: false,
1266        }
1267    }
1268}
1269
1270/// Postgres-specific connection parameters (RFC-v0.7 Wave 0).
1271///
1272/// `url` is a standard libpq/sqlx connection string
1273/// (`postgres://user:pass@host:port/db`). Pool sizing + acquire
1274/// timeout are carried on this struct rather than
1275/// [`BackendTimeouts`] / [`BackendRetry`] because they map directly
1276/// to `sqlx::postgres::PgPoolOptions` and have no Valkey equivalent.
1277#[derive(Clone, Debug, PartialEq, Eq)]
1278#[non_exhaustive]
1279pub struct PostgresConnection {
1280    /// Postgres connection URL.
1281    pub url: String,
1282    /// Maximum number of pool connections.
1283    pub max_connections: u32,
1284    /// Minimum number of idle pool connections.
1285    pub min_connections: u32,
1286    /// Per-acquire pool timeout.
1287    pub acquire_timeout: Duration,
1288}
1289
1290impl PostgresConnection {
1291    /// Build a Postgres connection from a URL with defaults for pool
1292    /// sizing (matches sqlx's out-of-box defaults: max=10, min=0,
1293    /// acquire=30s).
1294    pub fn new(url: impl Into<String>) -> Self {
1295        Self {
1296            url: url.into(),
1297            max_connections: 10,
1298            min_connections: 0,
1299            acquire_timeout: Duration::from_secs(30),
1300        }
1301    }
1302}
1303
1304/// Discriminated union over per-backend connection shapes. Stage 1a
1305/// shipped the Valkey arm; RFC-v0.7 Wave 0 adds the Postgres arm
1306/// additively under the `#[non_exhaustive]` guard.
1307#[derive(Clone, Debug, PartialEq, Eq)]
1308#[non_exhaustive]
1309pub enum BackendConnection {
1310    Valkey(ValkeyConnection),
1311    Postgres(PostgresConnection),
1312}
1313
1314/// Configuration passed to `ValkeyBackend::connect` (and, later, to
1315/// other backend `connect` constructors). Carries the connection
1316/// details + shared timing/retry policy. Replaces the Valkey-specific
1317/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
1318///
1319/// `BackendConfig` is the replacement target for `WorkerConfig`'s
1320/// `host` / `port` / `tls` / `cluster` fields. The full migration
1321/// lands across Stage 1a (type introduction) and Stage 1c
1322/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
1323/// claim poll interval, capability set) stay on `WorkerConfig`.
1324#[derive(Clone, Debug, PartialEq, Eq)]
1325#[non_exhaustive]
1326pub struct BackendConfig {
1327    pub connection: BackendConnection,
1328    pub timeouts: BackendTimeouts,
1329    pub retry: BackendRetry,
1330}
1331
1332impl BackendConfig {
1333    /// Construct a [`BackendConfig`] directly from a
1334    /// [`BackendConnection`] variant, with default timeouts + retry.
1335    /// Provided so external consumers have an explicit constructor
1336    /// against this `#[non_exhaustive]` struct; for the common paths
1337    /// prefer [`Self::valkey`] / [`Self::postgres`].
1338    pub fn new(connection: BackendConnection) -> Self {
1339        Self {
1340            connection,
1341            timeouts: BackendTimeouts::default(),
1342            retry: BackendRetry::default(),
1343        }
1344    }
1345
1346    /// Build a Valkey BackendConfig from host+port. Other fields take
1347    /// backend defaults.
1348    pub fn valkey(host: impl Into<String>, port: u16) -> Self {
1349        Self {
1350            connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
1351            timeouts: BackendTimeouts::default(),
1352            retry: BackendRetry::default(),
1353        }
1354    }
1355
1356    /// Build a Postgres BackendConfig from a URL. Other fields take
1357    /// backend defaults. RFC-v0.7 Wave 0.
1358    pub fn postgres(url: impl Into<String>) -> Self {
1359        Self {
1360            connection: BackendConnection::Postgres(PostgresConnection::new(url)),
1361            timeouts: BackendTimeouts::default(),
1362            retry: BackendRetry::default(),
1363        }
1364    }
1365}
1366
1367// ── Issue #122: ScannerFilter ───────────────────────────────────────────
1368
1369/// Per-consumer filter applied by FlowFabric's background scanners and
1370/// completion subscribers so multiple FlowFabric instances sharing a
1371/// single Valkey keyspace can operate on disjoint subsets of
1372/// executions without mutual interference (issue #122).
1373///
1374/// Sibling of [`CompletionPayload`]: the former is a scan *output*
1375/// shape, this is the *input* predicate scanners and completion
1376/// subscribers consult per candidate.
1377///
1378/// # Fields & backing storage
1379///
1380/// * `namespace` — matches against the `namespace` field on
1381///   `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
1382///   one HGET per candidate when set.
1383/// * `instance_tag` — matches against an entry in the execution's
1384///   user-supplied tags hash at the canonical key
1385///   **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
1386///   hash-tag, e.g. `{fp:42}`). Written by the Lua function
1387///   `ff_create_execution` (see `lua/execution.lua`) and
1388///   `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
1389///   the HGET targets `tag_key` on the tags hash and compares the
1390///   returned string (if any) byte-for-byte against `tag_value`.
1391///   Cost: one HGET per candidate when set.
1392///
1393/// When both are set, scanners check `namespace` first (short-circuit
1394/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
1395/// candidate.
1396///
1397/// # Semantics
1398///
1399/// * [`Self::is_noop`] returns true when both fields are `None` —
1400///   the filter accepts every candidate. Used by the
1401///   `subscribe_completions_filtered` default body to fall back to
1402///   the unfiltered subscription.
1403/// * [`Self::matches`] is the tight in-memory predicate once the
1404///   HGET values have been fetched. Scanners fetch the fields
1405///   lazily (namespace first) and pass the results in; the helper
1406///   returns false as soon as one component mismatches.
1407///
1408/// # Scope
1409///
1410/// Today the filter is consulted by execution-shaped scanners
1411/// (lease_expiry, attempt_timeout, execution_deadline,
1412/// suspension_timeout, pending_wp_expiry, delayed_promoter,
1413/// dependency_reconciler, cancel_reconciler, unblock,
1414/// index_reconciler, retention_trimmer) and by completion
1415/// subscribers. Non-execution scanners (budget_reconciler,
1416/// budget_reset, quota_reconciler, flow_projector) accept a filter
1417/// for API uniformity but do not apply it — their iteration domains
1418/// (budgets, quotas, flows) are not keyed by the
1419/// per-execution namespace / instance_tag shape.
1420///
1421/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
1422/// `worker_instance`) can land additively.
1423#[derive(Clone, Debug, Default, PartialEq, Eq)]
1424#[non_exhaustive]
1425pub struct ScannerFilter {
1426    /// Tenant / workspace scope. Matches against the `namespace`
1427    /// field on `exec_core`.
1428    pub namespace: Option<Namespace>,
1429    /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
1430    /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
1431    /// written by `ff_create_execution`).
1432    pub instance_tag: Option<(String, String)>,
1433}
1434
1435impl ScannerFilter {
1436    /// Shared no-op filter — useful as the default for the
1437    /// `Scanner::filter()` trait method so implementors that don't
1438    /// override can hand back a `&'static` reference without
1439    /// allocating per call.
1440    pub const NOOP: Self = Self {
1441        namespace: None,
1442        instance_tag: None,
1443    };
1444
1445    /// Create an empty filter (equivalent to [`Self::NOOP`]).
1446    ///
1447    /// Provided for external-crate consumers: `ScannerFilter` is
1448    /// `#[non_exhaustive]`, so struct-literal and functional-update
1449    /// construction are unavailable across crate boundaries. Start
1450    /// from `new()` and chain `with_*` setters to build a filter.
1451    pub fn new() -> Self {
1452        Self::default()
1453    }
1454
1455    /// Set the tenant-scope namespace filter dimension. Consumes
1456    /// and returns `self` for chaining.
1457    ///
1458    /// Accepts anything that converts into [`Namespace`] so callers
1459    /// can pass `&str` / `String` directly without the
1460    /// `Namespace::new(...)` ceremony (the conversion is infallible;
1461    /// [`Namespace`] has `From<&str>` and `From<String>` via the
1462    /// crate's `string_id!` macro).
1463    pub fn with_namespace(mut self, ns: impl Into<Namespace>) -> Self {
1464        self.namespace = Some(ns.into());
1465        self
1466    }
1467
1468    /// Set the exact-match exec-tag filter dimension. Consumes and
1469    /// returns `self` for chaining. At filter time, scanners read
1470    /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
1471    /// `key` byte-for-byte against `value`.
1472    pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1473        self.instance_tag = Some((key.into(), value.into()));
1474        self
1475    }
1476
1477    /// True iff the filter has no dimensions set — every candidate
1478    /// passes. Callers use this to short-circuit filtered subscribe
1479    /// paths back to the unfiltered ones.
1480    pub fn is_noop(&self) -> bool {
1481        self.namespace.is_none() && self.instance_tag.is_none()
1482    }
1483
1484    /// Post-HGET in-memory match check.
1485    ///
1486    /// `core_namespace` should be the `namespace` field read from
1487    /// `exec_core` (None if the HGET returned nil / was skipped).
1488    /// `tag_value` should be the HGET result for the configured
1489    /// `instance_tag.0` key on the execution's tags hash (None if
1490    /// the HGET returned nil / was skipped).
1491    ///
1492    /// When a filter dimension is `None` the corresponding argument
1493    /// is ignored — callers may pass `None` to skip the HGET and
1494    /// save a round-trip.
1495    pub fn matches(
1496        &self,
1497        core_namespace: Option<&Namespace>,
1498        tag_value: Option<&str>,
1499    ) -> bool {
1500        if let Some(ref want) = self.namespace {
1501            match core_namespace {
1502                Some(have) if have == want => {}
1503                _ => return false,
1504            }
1505        }
1506        if let Some((_, ref want_value)) = self.instance_tag {
1507            match tag_value {
1508                Some(have) if have == want_value.as_str() => {}
1509                _ => return false,
1510            }
1511        }
1512        true
1513    }
1514}
1515
1516// ── Tests ───────────────────────────────────────────────────────────────
1517
1518#[cfg(test)]
1519mod tests {
1520    use super::*;
1521    use crate::partition::{Partition, PartitionFamily};
1522    use crate::types::{ExecutionId, LaneId, SignalId};
1523
1524    #[test]
1525    fn backend_tag_derives() {
1526        let a = BackendTag::Valkey;
1527        let b = a;
1528        assert_eq!(a, b);
1529        assert_eq!(format!("{a:?}"), "Valkey");
1530    }
1531
1532    #[test]
1533    fn handle_kind_derives() {
1534        for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
1535            let c = k;
1536            assert_eq!(k, c);
1537            // Debug formatter reachable
1538            let _ = format!("{k:?}");
1539        }
1540    }
1541
1542    #[test]
1543    fn handle_opaque_roundtrips() {
1544        let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
1545        let o = HandleOpaque::new(bytes.clone());
1546        assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
1547        assert_eq!(o, o.clone());
1548        let _ = format!("{o:?}");
1549    }
1550
1551    #[test]
1552    fn handle_composes() {
1553        let h = Handle::new(
1554            BackendTag::Valkey,
1555            HandleKind::Fresh,
1556            HandleOpaque::new(Box::new([0u8; 4])),
1557        );
1558        assert_eq!(h.backend, BackendTag::Valkey);
1559        assert_eq!(h.kind, HandleKind::Fresh);
1560        assert_eq!(h.clone(), h);
1561    }
1562
1563    #[test]
1564    fn capability_set_derives() {
1565        let c1 = CapabilitySet::new(["gpu", "cuda"]);
1566        let c2 = CapabilitySet::new(["gpu", "cuda"]);
1567        assert_eq!(c1, c2);
1568        assert!(!c1.is_empty());
1569        assert!(CapabilitySet::default().is_empty());
1570        let _ = format!("{c1:?}");
1571    }
1572
1573    #[test]
1574    fn claim_policy_derives() {
1575        let p = ClaimPolicy::new(
1576            WorkerId::new("w"),
1577            WorkerInstanceId::new("w-1"),
1578            30_000,
1579            Some(Duration::from_millis(500)),
1580        );
1581        assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
1582        assert_eq!(p.lease_ttl_ms, 30_000);
1583        assert_eq!(p.worker_id.as_str(), "w");
1584        assert_eq!(p.worker_instance_id.as_str(), "w-1");
1585        assert_eq!(p.clone(), p);
1586        let immediate = ClaimPolicy::new(
1587            WorkerId::new("w"),
1588            WorkerInstanceId::new("w-1"),
1589            30_000,
1590            None,
1591        );
1592        assert_eq!(immediate.max_wait, None);
1593    }
1594
1595    #[test]
1596    fn frame_and_kind_derive() {
1597        let f = Frame {
1598            bytes: b"hello".to_vec(),
1599            kind: FrameKind::Stdout,
1600            seq: Some(3),
1601            frame_type: "delta".to_owned(),
1602            correlation_id: Some("req-42".to_owned()),
1603            mode: StreamMode::Durable,
1604        };
1605        assert_eq!(f.clone(), f);
1606        assert_eq!(f.kind, FrameKind::Stdout);
1607        assert_eq!(f.frame_type, "delta");
1608        assert_eq!(f.correlation_id.as_deref(), Some("req-42"));
1609        assert_ne!(FrameKind::Stderr, FrameKind::Event);
1610        let _ = format!("{f:?}");
1611    }
1612
1613    #[test]
1614    fn frame_builders_populate_extended_fields() {
1615        let f = Frame::new(b"payload".to_vec(), FrameKind::Event)
1616            .with_frame_type("agent_step")
1617            .with_correlation_id("corr-1");
1618        assert_eq!(f.frame_type, "agent_step");
1619        assert_eq!(f.correlation_id.as_deref(), Some("corr-1"));
1620        assert_eq!(f.seq, None);
1621
1622        let bare = Frame::new(b"p".to_vec(), FrameKind::Event);
1623        assert_eq!(bare.frame_type, "");
1624        assert_eq!(bare.correlation_id, None);
1625    }
1626
1627    #[test]
1628    fn waitpoint_spec_derives() {
1629        let spec = WaitpointSpec {
1630            kind: WaitpointKind::SignalName,
1631            matcher: b"approved".to_vec(),
1632            hmac_token: WaitpointHmac::new("kid1:deadbeef"),
1633        };
1634        assert_eq!(spec.clone(), spec);
1635        assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
1636        assert_eq!(
1637            WaitpointHmac::new("a"),
1638            WaitpointHmac::new(String::from("a"))
1639        );
1640    }
1641
1642    #[test]
1643    fn failure_reason_and_class() {
1644        let r1 = FailureReason::new("boom");
1645        let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
1646        assert_eq!(r1.message, "boom");
1647        assert!(r1.detail.is_none());
1648        assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
1649        assert_eq!(r1.clone(), r1);
1650        assert_ne!(FailureClass::Transient, FailureClass::Permanent);
1651    }
1652
1653    #[test]
1654    fn usage_dimensions_default_and_eq() {
1655        let u = UsageDimensions {
1656            input_tokens: 10,
1657            output_tokens: 20,
1658            wall_ms: Some(150),
1659            custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
1660            dedup_key: Some("k1".into()),
1661        };
1662        assert_eq!(u.clone(), u);
1663        assert_eq!(UsageDimensions::default().input_tokens, 0);
1664        assert_eq!(UsageDimensions::default().dedup_key, None);
1665    }
1666
1667    #[test]
1668    fn usage_dimensions_builder_chain() {
1669        let u = UsageDimensions::new()
1670            .with_input_tokens(10)
1671            .with_output_tokens(20)
1672            .with_wall_ms(150)
1673            .with_dedup_key("k1");
1674        assert_eq!(u.input_tokens, 10);
1675        assert_eq!(u.output_tokens, 20);
1676        assert_eq!(u.wall_ms, Some(150));
1677        assert_eq!(u.dedup_key.as_deref(), Some("k1"));
1678        assert!(u.custom.is_empty());
1679        // `new()` is equivalent to `default()`.
1680        assert_eq!(UsageDimensions::new(), UsageDimensions::default());
1681    }
1682
1683    #[test]
1684    fn reclaim_token_wraps_grant() {
1685        let grant = ResumeGrant {
1686            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
1687            partition_key: crate::partition::PartitionKey::from(&Partition {
1688                family: PartitionFamily::Flow,
1689                index: 0,
1690            }),
1691            grant_key: "gkey".into(),
1692            expires_at_ms: 123,
1693            lane_id: LaneId::new("default"),
1694        };
1695        let t = ResumeToken::new(
1696            grant.clone(),
1697            WorkerId::new("w"),
1698            WorkerInstanceId::new("w-1"),
1699            30_000,
1700        );
1701        assert_eq!(t.grant, grant);
1702        assert_eq!(t.worker_id.as_str(), "w");
1703        assert_eq!(t.worker_instance_id.as_str(), "w-1");
1704        assert_eq!(t.lease_ttl_ms, 30_000);
1705        assert_eq!(t.clone(), t);
1706    }
1707
1708    #[test]
1709    fn lease_renewal_is_copy() {
1710        let r = LeaseRenewal {
1711            expires_at_ms: 100,
1712            lease_epoch: 2,
1713        };
1714        let s = r; // Copy
1715        assert_eq!(r, s);
1716    }
1717
1718    #[test]
1719    fn cancel_flow_policy_and_wait() {
1720        assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
1721        let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
1722        assert_eq!(w, w);
1723        assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
1724    }
1725
1726    #[test]
1727    fn completion_payload_derives() {
1728        let c = CompletionPayload {
1729            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
1730            outcome: "success".into(),
1731            payload_bytes: Some(b"ok".to_vec()),
1732            produced_at_ms: TimestampMs::from_millis(1234),
1733            flow_id: None,
1734        };
1735        assert_eq!(c.clone(), c);
1736        let _ = format!("{c:?}");
1737    }
1738
1739    #[test]
1740    fn resume_signal_moved_and_derives() {
1741        let s = ResumeSignal {
1742            signal_id: SignalId::new(),
1743            signal_name: "approve".into(),
1744            signal_category: "decision".into(),
1745            source_type: "user".into(),
1746            source_identity: "u1".into(),
1747            correlation_id: "c1".into(),
1748            accepted_at: TimestampMs::from_millis(10),
1749            payload: None,
1750        };
1751        assert_eq!(s.clone(), s);
1752        let _ = format!("{s:?}");
1753    }
1754
1755    #[test]
1756    fn fail_outcome_variants() {
1757        let retry = FailOutcome::RetryScheduled {
1758            delay_until: TimestampMs::from_millis(42),
1759        };
1760        let terminal = FailOutcome::TerminalFailed;
1761        assert_ne!(retry, terminal);
1762        assert_eq!(retry.clone(), retry);
1763    }
1764
1765    #[test]
1766    fn scanner_filter_noop_and_default() {
1767        let f = ScannerFilter::default();
1768        assert!(f.is_noop());
1769        assert_eq!(f, ScannerFilter::NOOP);
1770        // A no-op filter matches any candidate, including ones that
1771        // produced no HGET results.
1772        assert!(f.matches(None, None));
1773        assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1774    }
1775
1776    #[test]
1777    fn scanner_filter_namespace_match() {
1778        let f = ScannerFilter {
1779            namespace: Some(Namespace::new("tenant-a")),
1780            instance_tag: None,
1781        };
1782        assert!(!f.is_noop());
1783        assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1784        assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1785        // Missing core namespace ⇒ no match.
1786        assert!(!f.matches(None, None));
1787    }
1788
1789    #[test]
1790    fn scanner_filter_instance_tag_match() {
1791        let f = ScannerFilter {
1792            namespace: None,
1793            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1794        };
1795        assert!(f.matches(None, Some("i-1")));
1796        assert!(!f.matches(None, Some("i-2")));
1797        assert!(!f.matches(None, None));
1798    }
1799
1800    #[test]
1801    fn scanner_filter_both_dimensions() {
1802        let f = ScannerFilter {
1803            namespace: Some(Namespace::new("tenant-a")),
1804            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1805        };
1806        assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1807        assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1808        assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1809        assert!(!f.matches(None, Some("i-1")));
1810    }
1811
1812    #[test]
1813    fn scanner_filter_builder_construction() {
1814        // Simulates external-crate usage: only public constructors
1815        // and chainable setters (no struct-literal access to the
1816        // `#[non_exhaustive]` fields).
1817        let empty = ScannerFilter::new();
1818        assert!(empty.is_noop());
1819        assert_eq!(empty, ScannerFilter::NOOP);
1820
1821        let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1822        assert!(!ns_only.is_noop());
1823        assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1824
1825        let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1826        assert!(!tag_only.is_noop());
1827        assert!(tag_only.matches(None, Some("i-1")));
1828
1829        let both = ScannerFilter::new()
1830            .with_namespace(Namespace::new("tenant-a"))
1831            .with_instance_tag("cairn.instance_id", "i-1");
1832        assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1833        assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1834    }
1835
1836    // ── RFC-015 Stream-durability-mode types ──
1837
1838    #[test]
1839    fn stream_mode_constructors_and_wire_str() {
1840        assert_eq!(StreamMode::durable().wire_str(), "durable");
1841        assert_eq!(StreamMode::durable(), StreamMode::Durable);
1842
1843        let s = StreamMode::durable_summary();
1844        assert_eq!(s.wire_str(), "summary");
1845        match s {
1846            StreamMode::DurableSummary { patch_kind } => {
1847                assert_eq!(patch_kind, PatchKind::JsonMergePatch);
1848            }
1849            _ => panic!("expected DurableSummary"),
1850        }
1851
1852        let b = StreamMode::best_effort_live(15_000);
1853        assert_eq!(b.wire_str(), "best_effort");
1854        match b {
1855            StreamMode::BestEffortLive { config } => {
1856                assert_eq!(config.ttl_ms, 15_000);
1857                assert_eq!(config.maxlen_floor, 64);
1858                assert_eq!(config.maxlen_ceiling, 16_384);
1859                assert!((config.ema_alpha - 0.2).abs() < 1e-9);
1860            }
1861            _ => panic!("expected BestEffortLive"),
1862        }
1863
1864        let cfg = BestEffortLiveConfig::with_ttl(10_000)
1865            .with_maxlen_floor(128)
1866            .with_maxlen_ceiling(8_192)
1867            .with_ema_alpha(0.3);
1868        let b2 = StreamMode::best_effort_live_with_config(cfg);
1869        match b2 {
1870            StreamMode::BestEffortLive { config } => {
1871                assert_eq!(config.ttl_ms, 10_000);
1872                assert_eq!(config.maxlen_floor, 128);
1873                assert_eq!(config.maxlen_ceiling, 8_192);
1874                assert!((config.ema_alpha - 0.3).abs() < 1e-9);
1875            }
1876            _ => panic!("expected BestEffortLive"),
1877        }
1878
1879        assert_eq!(StreamMode::default(), StreamMode::Durable);
1880    }
1881
1882    #[test]
1883    fn tail_visibility_default_and_wire() {
1884        assert_eq!(TailVisibility::default(), TailVisibility::All);
1885        assert_eq!(TailVisibility::All.wire_str(), "");
1886        assert_eq!(
1887            TailVisibility::ExcludeBestEffort.wire_str(),
1888            "exclude_best_effort"
1889        );
1890    }
1891
1892    #[test]
1893    fn append_frame_outcome_summary_version_builder() {
1894        let base = AppendFrameOutcome::new("1713-0", 3);
1895        assert_eq!(base.stream_id, "1713-0");
1896        assert_eq!(base.frame_count, 3);
1897        assert_eq!(base.summary_version, None);
1898
1899        let with_v = AppendFrameOutcome::new("1713-0", 3).with_summary_version(7);
1900        assert_eq!(with_v.summary_version, Some(7));
1901        assert_eq!(with_v.clone(), with_v);
1902    }
1903
1904    /// RFC-015 §3.2 null-sentinel round-trip invariant.
1905    ///
1906    /// The sentinel (`"__ff_null__"`) is the byte-exact string callers
1907    /// use to encode "set this leaf to JSON null" in a JSON Merge Patch
1908    /// frame (because RFC 7396 otherwise treats `null` as delete-key).
1909    /// The invariant: on read, the summary document NEVER contains the
1910    /// sentinel string — it is always rewritten to JSON null.
1911    ///
1912    /// The full end-to-end invariant is exercised against the Lua
1913    /// applier in `crates/ff-test/tests/engine_backend_stream_modes.rs`
1914    /// (integration). Here we just pin the byte sequence + the
1915    /// type-level constant so any accidental edit of the sentinel
1916    /// string fails the unit build before it reaches an integration
1917    /// run.
1918    #[test]
1919    fn summary_null_sentinel_byte_exact() {
1920        assert_eq!(SUMMARY_NULL_SENTINEL, "__ff_null__");
1921        assert_eq!(SUMMARY_NULL_SENTINEL.len(), 11);
1922        assert!(SUMMARY_NULL_SENTINEL.bytes().all(|b| b.is_ascii()));
1923        assert_eq!(SUMMARY_NULL_SENTINEL.trim(), SUMMARY_NULL_SENTINEL);
1924    }
1925
1926    #[test]
1927    fn summary_document_constructor() {
1928        let doc = SummaryDocument::new(
1929            br#"{"tokens":3}"#.to_vec(),
1930            2,
1931            PatchKind::JsonMergePatch,
1932            1_700_000_100,
1933            1_700_000_000,
1934        );
1935        assert_eq!(doc.version, 2);
1936        assert_eq!(doc.patch_kind, PatchKind::JsonMergePatch);
1937        assert_eq!(doc.first_applied_ms, 1_700_000_000);
1938        assert_eq!(doc.clone(), doc);
1939    }
1940
1941    #[test]
1942    fn frame_builder_sets_mode() {
1943        let f = Frame::new(b"p".to_vec(), FrameKind::Event);
1944        assert_eq!(f.mode, StreamMode::Durable);
1945
1946        let f = Frame::new(b"p".to_vec(), FrameKind::Event)
1947            .with_mode(StreamMode::durable_summary());
1948        match f.mode {
1949            StreamMode::DurableSummary { patch_kind } => {
1950                assert_eq!(patch_kind, PatchKind::JsonMergePatch);
1951            }
1952            other => panic!("expected DurableSummary, got {other:?}"),
1953        }
1954    }
1955
1956    #[test]
1957    fn backend_config_valkey_ctor() {
1958        let c = BackendConfig::valkey("host.local", 6379);
1959        // Wave 0 (#230) added `BackendConnection::Postgres`, so the
1960        // match is no longer irrefutable. `let-else` keeps the test
1961        // focused on the Valkey arm without a full `match`.
1962        let BackendConnection::Valkey(v) = &c.connection else {
1963            panic!("expected Valkey connection, got Postgres");
1964        };
1965        assert_eq!(v.host, "host.local");
1966        assert_eq!(v.port, 6379);
1967        assert!(!v.tls);
1968        assert!(!v.cluster);
1969        assert_eq!(c.timeouts, BackendTimeouts::default());
1970        assert_eq!(c.retry, BackendRetry::default());
1971        assert_eq!(c.clone(), c);
1972    }
1973}