Skip to main content

ff_core/
backend.rs

1//! Backend-trait supporting types (RFC-012 Stage 0).
2//!
3//! This module carries the public types referenced by the `EngineBackend`
4//! trait signatures in RFC-012 §3.3. The trait itself lands in Stage 1
5//! (issue #89 follow-up); Stage 0 is strictly type-plumbing and the
6//! `ResumeSignal` crate move.
7//!
8//! Public structs/enums whose fields or variants are expected to grow
9//! are marked `#[non_exhaustive]` per project convention — consumers
10//! must write `_`-terminated matches and use the provided constructors
11//! rather than struct literals. Exceptions:
12//!
13//! * Opaque single-field wrapper newtypes ([`HandleOpaque`],
14//!   [`WaitpointHmac`]) hide their inner field and need no non-exhaustive
15//!   annotation — the wrapped value is unreachable from outside.
16//! * [`ResumeSignal`] is intentionally NOT `#[non_exhaustive]` so the
17//!   ff-sdk crate-move (Stage 0) preserves struct-literal compatibility
18//!   at its existing call site.
19//!
20//! See `rfcs/RFC-012-engine-backend-trait.md` §3.3.0 for the authoritative
21//! type inventory and §4.1-§4.2 for the `Handle` / `EngineError` shapes.
22
23use crate::contracts::ReclaimGrant;
24use crate::types::{TimestampMs, WaitpointToken, WorkerId, WorkerInstanceId};
25
26// DX (HHH v0.3.4 re-smoke): `Namespace` lives in `ff_core::types` but
27// is used on `BackendConfig` + `ScannerFilter` (both defined in this
28// module). Re-export here so consumers already scoped to
29// `ff_core::backend::*` can grab it without a second `use` line
30// crossing into `ff_core::types`. Also brings `Namespace` into local
31// scope for the definitions below.
32pub use crate::types::Namespace;
33
34// RFC-013 Stage 1d — re-export the typed suspend trait surface so
35// external crates using `ff_core::backend::*` can reach the new types
36// without a second `use ff_core::contracts` line. Keeps the trait
37// naming surface coherent (`PendingWaitpoint`, `WaitpointSpec`, and
38// the new `SuspendArgs` / `SuspendOutcome` all live at the same path).
39pub use crate::contracts::{
40    CompositeBody, IdempotencyKey, ResumeCondition, ResumePolicy, ResumeTarget, SignalMatcher,
41    SuspendArgs, SuspendOutcome, SuspendOutcomeDetails, SuspensionReasonCode,
42    SuspensionRequester, TimeoutBehavior, WaitpointBinding,
43};
44use std::collections::BTreeMap;
45use std::time::Duration;
46
47// ── §4.1 Handle trio ────────────────────────────────────────────────────
48
49/// Backend-tag discriminator embedded in every [`Handle`] so ops can
50/// dispatch to the correct backend implementation at runtime.
51///
52/// `#[non_exhaustive]`: new backend variants land additively as impls
53/// come online (e.g. `Postgres` in Stage 2, hypothetical third backends
54/// later).
55#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
56#[non_exhaustive]
57pub enum BackendTag {
58    /// The Valkey FCALL-backed implementation.
59    Valkey,
60    /// The Postgres-backed implementation (RFC-v0.7 Wave 1c).
61    Postgres,
62}
63
64impl BackendTag {
65    /// Stable single-byte wire encoding for the tag. Embedded inside
66    /// [`HandleOpaque`] so cross-backend migration tooling can detect
67    /// which backend minted a given handle without parsing the rest of
68    /// the payload (see [`crate::handle_codec`]).
69    pub const fn wire_byte(self) -> u8 {
70        match self {
71            BackendTag::Valkey => 0x01,
72            BackendTag::Postgres => 0x02,
73        }
74    }
75
76    /// Inverse of [`Self::wire_byte`].
77    pub const fn from_wire_byte(b: u8) -> Option<Self> {
78        match b {
79            0x01 => Some(BackendTag::Valkey),
80            0x02 => Some(BackendTag::Postgres),
81            _ => None,
82        }
83    }
84}
85
86/// Lifecycle kind carried inside a [`Handle`]. Backends validate `kind`
87/// on entry to each op and return `EngineError::State` on mismatch.
88///
89/// Replaces round-1's compile-time `Handle` / `ResumeHandle` /
90/// `SuspendToken` type split (RFC-012 §4.1).
91#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
92#[non_exhaustive]
93pub enum HandleKind {
94    /// Fresh claim — returned by `claim` / `claim_from_grant`.
95    Fresh,
96    /// Resumed from reclaim — returned by `claim_from_reclaim`.
97    Resumed,
98    /// Suspended — returned by `suspend`. Terminal for the lease;
99    /// resumption mints a new Handle via `claim_from_reclaim`.
100    Suspended,
101}
102
103/// Backend-private opaque payload carried inside a [`Handle`].
104///
105/// Encodes backend-specific state (on Valkey: exec id, attempt id,
106/// lease id, lease epoch, capability binding, partition). Consumers do
107/// not construct or inspect the bytes — they are produced by the
108/// backend on claim/resume and consumed by the backend on each op.
109///
110/// `Box<[u8]>` chosen over `bytes::Bytes` (RFC-012 §7.17) to avoid a
111/// public-type transitive dep on the `bytes` crate.
112#[derive(Clone, Debug, PartialEq, Eq, Hash)]
113pub struct HandleOpaque(Box<[u8]>);
114
115impl HandleOpaque {
116    /// Construct from backend-owned bytes. Only backend impls call this.
117    pub fn new(bytes: Box<[u8]>) -> Self {
118        Self(bytes)
119    }
120
121    /// Borrow the underlying bytes (backend-internal use).
122    pub fn as_bytes(&self) -> &[u8] {
123        &self.0
124    }
125}
126
127/// Opaque attempt cookie held by the worker for the duration of an
128/// attempt. Produced by `claim` / `claim_from_reclaim` / `suspend`;
129/// borrowed by every op (renew, progress, append_frame, complete, fail,
130/// cancel, suspend, delay, wait_children, observe_signals, report_usage).
131///
132/// See RFC-012 §4.1 for the round-4 design — terminal ops borrow rather
133/// than consume so callers can retry after a transport error.
134#[derive(Clone, Debug, PartialEq, Eq, Hash)]
135#[non_exhaustive]
136pub struct Handle {
137    pub backend: BackendTag,
138    pub kind: HandleKind,
139    pub opaque: HandleOpaque,
140}
141
142impl Handle {
143    /// Construct a new Handle. Called by backend impls only; consumer
144    /// code receives Handles from `claim` / `suspend` / `claim_from_reclaim`.
145    pub fn new(backend: BackendTag, kind: HandleKind, opaque: HandleOpaque) -> Self {
146        Self {
147            backend,
148            kind,
149            opaque,
150        }
151    }
152}
153
154// ── §3.3.0 Claim / lifecycle supporting types ──────────────────────────
155
156/// Worker capability set — the tokens the worker advertises to the
157/// scheduler and to `claim`. Today stored as `Vec<String>` on
158/// `WorkerConfig`; promoted to a named newtype so the trait signatures
159/// can talk about capabilities without committing to a concrete
160/// container shape.
161///
162/// Bitfield vs stringly-typed is §7.2 open question; Stage 0 keeps the
163/// round-2 lean (newtype over `Vec<String>`).
164#[derive(Clone, Debug, PartialEq, Eq, Default)]
165#[non_exhaustive]
166pub struct CapabilitySet {
167    pub tokens: Vec<String>,
168}
169
170impl CapabilitySet {
171    /// Build from any iterable of string-like capability tokens.
172    pub fn new<I, S>(tokens: I) -> Self
173    where
174        I: IntoIterator<Item = S>,
175        S: Into<String>,
176    {
177        Self {
178            tokens: tokens.into_iter().map(Into::into).collect(),
179        }
180    }
181
182    /// True iff the set contains no tokens.
183    pub fn is_empty(&self) -> bool {
184        self.tokens.is_empty()
185    }
186}
187
188/// Policy hints for `claim`. Carries the worker identity the backend
189/// needs to invoke `ff_claim_execution` (v0.7 Wave 2) plus the
190/// blocking-wait bound.
191///
192/// **Wave 2 extension:** `worker_id` + `worker_instance_id` +
193/// `lease_ttl_ms` are now required so the Valkey backend's `claim`
194/// impl can issue the claim FCALL without a side-channel identity
195/// lookup. The constructor change is a pre-1.0 breaking change,
196/// tracked in the CHANGELOG.
197#[derive(Clone, Debug, PartialEq, Eq)]
198#[non_exhaustive]
199pub struct ClaimPolicy {
200    /// Maximum blocking wait. `None` means backend-default (today:
201    /// non-blocking / immediate return).
202    pub max_wait: Option<Duration>,
203    /// Worker identity (stable across process restarts).
204    pub worker_id: WorkerId,
205    /// Worker instance identity (unique per process).
206    pub worker_instance_id: WorkerInstanceId,
207    /// Lease TTL in milliseconds for the claim about to be minted.
208    pub lease_ttl_ms: u32,
209}
210
211impl ClaimPolicy {
212    /// Build a claim policy. `max_wait = None` means non-blocking /
213    /// immediate return. `lease_ttl_ms` is the TTL the backend
214    /// installs on the minted lease.
215    pub fn new(
216        worker_id: WorkerId,
217        worker_instance_id: WorkerInstanceId,
218        lease_ttl_ms: u32,
219        max_wait: Option<Duration>,
220    ) -> Self {
221        Self {
222            max_wait,
223            worker_id,
224            worker_instance_id,
225            lease_ttl_ms,
226        }
227    }
228}
229
230/// Frame classification for `append_frame`. Mirrors the Lua-side
231/// `ff_append_frame` `frame_type` ARGV.
232#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
233#[non_exhaustive]
234pub enum FrameKind {
235    /// Operator-visible progress output / log line.
236    Stdout,
237    /// Operator-visible error / warning output.
238    Stderr,
239    /// Structured event (JSON payload).
240    Event,
241    /// Binary / opaque payload.
242    Blob,
243}
244
245// ── RFC-015 §1–§6: Stream-durability modes ──────────────────────────────
246
247/// Patch format used by [`StreamMode::DurableSummary`] to apply each
248/// frame's payload against the server-side rolling summary document.
249///
250/// v0.6 ships a single variant, `JsonMergePatch` (RFC 7396). The enum
251/// is `#[non_exhaustive]` so the future `PatchKind::StringAppend`
252/// variant flagged in RFC-015 §11 can land additively without a
253/// breaking change.
254#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
255#[non_exhaustive]
256pub enum PatchKind {
257    /// RFC 7396 JSON Merge Patch. Locked choice for v0.6 (RFC-015 §3.2).
258    JsonMergePatch,
259}
260
261/// Per-call durability mode for [`EngineBackend::append_frame`].
262///
263/// RFC-015 §1. Mode is **per-call** — workers routinely mix frame types
264/// (tokens + progress + final summary) in a single attempt and the right
265/// durability differs per frame type. See RFC-015 §5 for the caveat on
266/// mixed-mode streams.
267///
268/// `#[non_exhaustive]`: new modes land additively (the v0.6 PR deliberately
269/// excludes `KeepAllDeltas`, dropped by owner adjudication; a future
270/// mode for e.g. per-frame-replicated streams would land here).
271///
272/// ## Doc-comment contract on [`Self::Durable`]
273///
274/// If the same stream also receives [`Self::BestEffortLive`] frames,
275/// `Durable` frames are subject to the best-effort MAXLEN trim (see
276/// RFC-015 §5). Callers that need strict retention for `Durable` frames
277/// alongside best-effort telemetry must not mix modes on one stream or
278/// should place the durable frames on a sibling stream.
279#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash)]
280#[non_exhaustive]
281pub enum StreamMode {
282    /// Current default — every frame XADDs as a durable entry.
283    #[default]
284    Durable,
285    /// Server-side rolling-summary collapse. Each frame's payload is a
286    /// delta/patch applied atomically to a summary Hash (per
287    /// [`PatchKind`]); the delta is also XADDed to the stream with
288    /// `mode=summary` fields so live tailers continue to observe change
289    /// events. RFC-015 §3.
290    DurableSummary { patch_kind: PatchKind },
291    /// Short-lived frame. XADDed with `mode=best_effort`; per-stream
292    /// MAXLEN rolls it off and the stream key gets a TTL refresh only
293    /// when the stream has never held a durable frame (RFC-015 §4.1).
294    ///
295    /// The per-stream MAXLEN is computed dynamically in Lua from an
296    /// EMA of observed append rate (RFC-015 §4.2). See
297    /// [`BestEffortLiveConfig`] for the tunable knobs.
298    BestEffortLive { config: BestEffortLiveConfig },
299}
300
301/// Configuration knobs for [`StreamMode::BestEffortLive`] — RFC-015
302/// §4.2 dynamic MAXLEN sizing.
303///
304/// Defaults derived from the Phase 0 benchmark + RFC §4.1/§4.3 final
305/// design:
306///   - `ttl_ms` — caller-supplied visibility target.
307///   - `maxlen_floor = 64` — RFC §4.1 round-2 default, used for
308///     low-rate streams where the EMA formula would under-size.
309///   - `maxlen_ceiling = 16_384` — cap on per-stream retained entries
310///     (raised from the original §4.2 draft of 2048 after Phase 0
311///     showed 200–4000 Hz LLM-token bursts saturate any lower clamp).
312///   - `ema_alpha = 0.2` — RFC §4.3 gate value. Weights the latest
313///     per-append instantaneous rate at 20 %, decays prior samples
314///     at 80 % each append.
315#[derive(Clone, Copy, Debug, PartialEq)]
316#[non_exhaustive]
317pub struct BestEffortLiveConfig {
318    pub ttl_ms: u32,
319    pub maxlen_floor: u32,
320    pub maxlen_ceiling: u32,
321    pub ema_alpha: f64,
322}
323
324impl BestEffortLiveConfig {
325    /// Construct with the given `ttl_ms` and defaults for the other
326    /// knobs. Matches the shorthand used by
327    /// [`StreamMode::best_effort_live`].
328    pub fn with_ttl(ttl_ms: u32) -> Self {
329        Self {
330            ttl_ms,
331            maxlen_floor: 64,
332            maxlen_ceiling: 16_384,
333            ema_alpha: 0.2,
334        }
335    }
336
337    /// Builder: override [`Self::maxlen_floor`]. Chainable.
338    pub fn with_maxlen_floor(mut self, floor: u32) -> Self {
339        self.maxlen_floor = floor;
340        self
341    }
342
343    /// Builder: override [`Self::maxlen_ceiling`]. Chainable.
344    pub fn with_maxlen_ceiling(mut self, ceiling: u32) -> Self {
345        self.maxlen_ceiling = ceiling;
346        self
347    }
348
349    /// Builder: override [`Self::ema_alpha`]. Chainable. Callers are
350    /// expected to keep α in `(0.0, 1.0]`; the Lua side clamps to that
351    /// range defensively.
352    pub fn with_ema_alpha(mut self, alpha: f64) -> Self {
353        self.ema_alpha = alpha;
354        self
355    }
356}
357
358// Eq/Hash on an f64 field is unsafe (NaN); derive only what's sound.
359impl Eq for BestEffortLiveConfig {}
360impl std::hash::Hash for BestEffortLiveConfig {
361    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
362        self.ttl_ms.hash(state);
363        self.maxlen_floor.hash(state);
364        self.maxlen_ceiling.hash(state);
365        self.ema_alpha.to_bits().hash(state);
366    }
367}
368
369impl StreamMode {
370    /// The v0.6 default — [`Self::Durable`]. Provided for symmetry with
371    /// the other constructors (the enum is `#[non_exhaustive]` so
372    /// cross-crate consumers cannot construct variants by name).
373    pub fn durable() -> Self {
374        StreamMode::Durable
375    }
376
377    /// [`Self::DurableSummary`] with [`PatchKind::JsonMergePatch`] —
378    /// the only supported `PatchKind` in v0.6.
379    pub fn durable_summary() -> Self {
380        StreamMode::DurableSummary {
381            patch_kind: PatchKind::JsonMergePatch,
382        }
383    }
384
385    /// [`Self::BestEffortLive`] with the caller-supplied `ttl_ms` and
386    /// default [`BestEffortLiveConfig`] knobs.
387    /// RFC-015 §4 guidance: `5_000..=30_000` ms. Below ~1000 ms a live
388    /// tailer may not connect in time; above ~60_000 ms the memory
389    /// argument against plain [`Self::Durable`] weakens.
390    pub fn best_effort_live(ttl_ms: u32) -> Self {
391        StreamMode::BestEffortLive {
392            config: BestEffortLiveConfig::with_ttl(ttl_ms),
393        }
394    }
395
396    /// [`Self::BestEffortLive`] with a fully-specified
397    /// [`BestEffortLiveConfig`]. Use for per-workload tuning of α, the
398    /// MAXLEN clamp, or the TTL — defaults are wired from
399    /// [`BestEffortLiveConfig::with_ttl`].
400    pub fn best_effort_live_with_config(config: BestEffortLiveConfig) -> Self {
401        StreamMode::BestEffortLive { config }
402    }
403
404    /// Stable wire-level token for this mode, written to the XADD entry
405    /// `mode` field (RFC-015 §6.1). Tail filters compare against these
406    /// string values server-side.
407    pub fn wire_str(&self) -> &'static str {
408        match self {
409            StreamMode::Durable => "durable",
410            StreamMode::DurableSummary { .. } => "summary",
411            StreamMode::BestEffortLive { .. } => "best_effort",
412        }
413    }
414}
415
416/// Tail-stream visibility filter (RFC-015 §6).
417///
418/// Default [`Self::All`] preserves v1 behaviour; opt-in
419/// [`Self::ExcludeBestEffort`] filters out `BestEffortLive` frames on
420/// the server side (the XADD `mode` field is a cheap field check in
421/// `ff_read_attempt_stream` / `xread_block`).
422#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
423#[non_exhaustive]
424pub enum TailVisibility {
425    /// Default. Returns every XADD entry in the stream regardless of
426    /// mode.
427    #[default]
428    All,
429    /// Returns only frames appended under [`StreamMode::Durable`] or
430    /// [`StreamMode::DurableSummary`] (i.e. filters out
431    /// [`StreamMode::BestEffortLive`]). Named to be self-describing:
432    /// the filter excludes best-effort frames, not "only Durable" —
433    /// `DurableSummary` deltas are included because they have a
434    /// durable backing (the summary Hash).
435    ExcludeBestEffort,
436}
437
438impl TailVisibility {
439    /// Stable wire-level token pushed as an ARGV to the Lua tail/read
440    /// implementations. `""` (empty) means default = `All` (no filter).
441    pub fn wire_str(&self) -> &'static str {
442        match self {
443            TailVisibility::All => "",
444            TailVisibility::ExcludeBestEffort => "exclude_best_effort",
445        }
446    }
447}
448
449/// Byte-exact null-sentinel used by [`StreamMode::DurableSummary`] +
450/// [`PatchKind::JsonMergePatch`] to set a field to JSON `null` (RFC-015
451/// §3.2).
452///
453/// RFC 7396 treats `null` as "delete the key", so callers that actually
454/// want a literal JSON `null` leaf send this sentinel string as the
455/// scalar leaf value; the Lua apply-path rewrites the sentinel to JSON
456/// `null` after the merge.
457///
458/// # Round-trip invariant
459///
460/// The summary document — as returned by a `read_summary` call — NEVER
461/// contains the sentinel string. Any `null` observed always means "this
462/// field is explicitly null." See [`SummaryDocument`].
463pub const SUMMARY_NULL_SENTINEL: &str = "__ff_null__";
464
465/// Materialised rolling summary document returned by a summary-read
466/// call (RFC-015 §6.3).
467///
468/// `document` is the caller-owned JSON value; scalar leaves that the
469/// caller wrote through the [`SUMMARY_NULL_SENTINEL`] contract appear
470/// here as JSON `null` (not as the sentinel string — the round-trip
471/// invariant erases the sentinel on read).
472///
473/// `#[non_exhaustive]` keeps future additive fields (e.g. a compacted
474/// delta cursor, a schema-version tag) additive. Construct via
475/// [`Self::new`].
476#[derive(Clone, Debug, PartialEq, Eq)]
477#[non_exhaustive]
478pub struct SummaryDocument {
479    /// The rolling summary as JSON bytes (UTF-8, encoded by the
480    /// server-side applier after merge). Stored as `Vec<u8>` instead
481    /// of `serde_json::Value` to keep `ff_core::backend` free of a
482    /// public `serde_json` type dependency.
483    pub document_json: Vec<u8>,
484    /// Monotonic version bumped on every delta applied. `0` is never
485    /// observed — the first applied delta returns `1`.
486    pub version: u64,
487    /// Which [`PatchKind`] was used to build the document.
488    pub patch_kind: PatchKind,
489    /// Unix millis of the most recent delta application.
490    pub last_updated_ms: u64,
491    /// Unix millis of the first delta applied to this attempt.
492    pub first_applied_ms: u64,
493}
494
495impl SummaryDocument {
496    /// Build a summary snapshot. Used by backends parsing the Hash
497    /// fields; external consumers receive these from `read_summary`.
498    pub fn new(
499        document_json: Vec<u8>,
500        version: u64,
501        patch_kind: PatchKind,
502        last_updated_ms: u64,
503        first_applied_ms: u64,
504    ) -> Self {
505        Self {
506            document_json,
507            version,
508            patch_kind,
509            last_updated_ms,
510            first_applied_ms,
511        }
512    }
513}
514
515/// Single stream frame appended via `append_frame` (RFC-012 §3.3.0).
516/// Today's FCALL takes the byte payload + frame_type + optional seq as
517/// discrete ARGV; Stage 0 collects them into a named type for trait
518/// signatures.
519///
520/// **Round-7 follow-up (PR #145 → #146):** extended with
521/// `frame_type: String` (the SDK-public free-form classifier — values
522/// like `"delta"`, `"log"`, `"agent_step"`, `"summary_token"`,
523/// `"transcribe_line"`, `"progress"` — distinct from the coarse
524/// [`FrameKind`] enum) and `correlation_id: Option<String>` (the
525/// wire-level `correlation_id` ARGV, surfaced at the SDK as
526/// `metadata: Option<&str>`). Adding these lets
527/// `ClaimedTask::append_frame` forward through the trait without
528/// wire-parity regression.
529///
530/// `frame_type` is free-form and is what the backend writes into the
531/// Lua-side `frame_type` ARGV. [`FrameKind`] remains for typed
532/// classification at the trait surface; when callers populate only
533/// `kind`, the backend falls back to a stable encoding of the enum
534/// variant (see `frame_kind_to_str` in `ff-backend-valkey`).
535#[derive(Clone, Debug, PartialEq, Eq)]
536#[non_exhaustive]
537pub struct Frame {
538    pub bytes: Vec<u8>,
539    pub kind: FrameKind,
540    /// Optional monotonic sequence. Set by the caller when the stream
541    /// protocol is sequence-bound; `None` lets the backend assign.
542    pub seq: Option<u64>,
543    /// Free-form classifier written to the Lua-side `frame_type` ARGV.
544    /// Empty string means "defer to [`FrameKind`]" — the backend
545    /// substitutes the enum-variant encoding.
546    pub frame_type: String,
547    /// Optional correlation id (wire `correlation_id` ARGV). `None`
548    /// encodes as the empty string on the wire.
549    pub correlation_id: Option<String>,
550    /// Durability mode for this frame (RFC-015 §1). Defaults to
551    /// [`StreamMode::Durable`] for v1-caller parity — the field was
552    /// added additively so pre-RFC-015 construction via
553    /// [`Self::new`] / [`Self::with_seq`] sees `Durable` without code
554    /// change.
555    pub mode: StreamMode,
556}
557
558impl Frame {
559    /// Construct a frame. `seq` defaults to `None` (backend-assigned);
560    /// `frame_type` defaults to empty (backend falls back to
561    /// `FrameKind` encoding); `correlation_id` defaults to `None`.
562    /// Callers that need an explicit sequence use [`Frame::with_seq`];
563    /// callers on the SDK forwarder path populate `frame_type` +
564    /// `correlation_id` via [`Frame::with_frame_type`] /
565    /// [`Frame::with_correlation_id`].
566    pub fn new(bytes: Vec<u8>, kind: FrameKind) -> Self {
567        Self {
568            bytes,
569            kind,
570            seq: None,
571            frame_type: String::new(),
572            correlation_id: None,
573            mode: StreamMode::Durable,
574        }
575    }
576
577    /// Construct a frame with an explicit monotonic sequence.
578    pub fn with_seq(bytes: Vec<u8>, kind: FrameKind, seq: u64) -> Self {
579        Self {
580            bytes,
581            kind,
582            seq: Some(seq),
583            frame_type: String::new(),
584            correlation_id: None,
585            mode: StreamMode::Durable,
586        }
587    }
588
589    /// Builder-style setter for the RFC-015 durability [`StreamMode`].
590    /// Defaults to [`StreamMode::Durable`] (v1 parity) when unset.
591    pub fn with_mode(mut self, mode: StreamMode) -> Self {
592        self.mode = mode;
593        self
594    }
595
596    /// Builder-style setter for the free-form `frame_type` classifier.
597    pub fn with_frame_type(mut self, frame_type: impl Into<String>) -> Self {
598        self.frame_type = frame_type.into();
599        self
600    }
601
602    /// Builder-style setter for the optional `correlation_id`.
603    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
604        self.correlation_id = Some(correlation_id.into());
605        self
606    }
607}
608
609// ── §3.3.0 Suspend / waitpoint types ────────────────────────────────────
610
611/// Waitpoint matcher mode (mirrors today's suspend/close matcher kinds
612/// — signal name, correlation id, etc.).
613#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
614#[non_exhaustive]
615pub enum WaitpointKind {
616    /// Match by signal name.
617    SignalName,
618    /// Match by correlation id.
619    CorrelationId,
620    /// Generic external-signal waitpoint (external delivery).
621    External,
622}
623
624/// HMAC token that binds a waitpoint to its mint-time identity. Wire
625/// shape `kid:40hex`.
626///
627/// Wraps [`crate::types::WaitpointToken`] so bearer-credential Debug /
628/// Display redaction (`WaitpointToken("kid1:<REDACTED:len=40>")`) flows
629/// through automatically — no derived formatter can leak the raw
630/// digest when a [`WaitpointSpec`] is debug-printed via
631/// `tracing::debug!(spec=?spec)`.
632///
633/// Newtype-wrapping (vs. a `pub use` alias) keeps trait signatures
634/// naming the waitpoint-bound HMAC role explicitly.
635#[derive(Clone, PartialEq, Eq, Hash)]
636pub struct WaitpointHmac(WaitpointToken);
637
638impl WaitpointHmac {
639    pub fn new(token: impl Into<String>) -> Self {
640        Self(WaitpointToken::from(token.into()))
641    }
642
643    /// Borrow the wrapped token. The wrapped type's `Debug`/`Display`
644    /// redact — call sites that need the raw digest must explicitly
645    /// call [`WaitpointToken::as_str`].
646    pub fn token(&self) -> &WaitpointToken {
647        &self.0
648    }
649
650    /// Borrow the raw `kid:40hex` string. Prefer [`Self::token`] for
651    /// non-redacted call sites; this method exists only for transport
652    /// / FCALL ARGV construction where the raw wire bytes are required.
653    pub fn as_str(&self) -> &str {
654        self.0.as_str()
655    }
656}
657
658// Forward Debug / Display to the wrapped WaitpointToken so the
659// redaction guarantees on that type extend here. Derived Debug would
660// expose the raw string field and defeat the wrap.
661impl std::fmt::Debug for WaitpointHmac {
662    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
663        write!(f, "WaitpointHmac({:?})", self.0)
664    }
665}
666
667/// Handle returned by `create_waitpoint` — the id of the newly-minted
668/// pending waitpoint plus its HMAC token. Signals targeted at the
669/// waitpoint must present the token; a later `suspend` call transitions
670/// the waitpoint from `pending` to `active` (RFC-012 §R7.2.2).
671///
672/// `WaitpointHmac` redacts on `Debug`/`Display`, so deriving `Debug`
673/// here cannot leak the raw digest.
674#[derive(Clone, Debug, PartialEq, Eq)]
675#[non_exhaustive]
676pub struct PendingWaitpoint {
677    pub waitpoint_id: crate::types::WaitpointId,
678    pub hmac_token: WaitpointHmac,
679}
680
681impl PendingWaitpoint {
682    pub fn new(waitpoint_id: crate::types::WaitpointId, hmac_token: WaitpointHmac) -> Self {
683        Self {
684            waitpoint_id,
685            hmac_token,
686        }
687    }
688}
689
690/// One waitpoint inside a suspend request. `suspend` takes a
691/// `Vec<WaitpointSpec>`; the resume condition (`any` / `all`) lives on
692/// the enclosing suspend args in the Phase-1 contract.
693#[derive(Clone, Debug, PartialEq, Eq)]
694#[non_exhaustive]
695pub struct WaitpointSpec {
696    pub kind: WaitpointKind,
697    pub matcher: Vec<u8>,
698    pub hmac_token: WaitpointHmac,
699}
700
701impl WaitpointSpec {
702    pub fn new(kind: WaitpointKind, matcher: Vec<u8>, hmac_token: WaitpointHmac) -> Self {
703        Self {
704            kind,
705            matcher,
706            hmac_token,
707        }
708    }
709}
710
711// ── §3.3.0 Failure classification types ─────────────────────────────────
712
713/// Human-readable failure description + optional structured detail.
714/// Replaces today's ad-hoc string arg to `fail`.
715#[derive(Clone, Debug, PartialEq, Eq)]
716#[non_exhaustive]
717pub struct FailureReason {
718    pub message: String,
719    pub detail: Option<Vec<u8>>,
720}
721
722impl FailureReason {
723    pub fn new(message: impl Into<String>) -> Self {
724        Self {
725            message: message.into(),
726            detail: None,
727        }
728    }
729
730    pub fn with_detail(message: impl Into<String>, detail: Vec<u8>) -> Self {
731        Self {
732            message: message.into(),
733            detail: Some(detail),
734        }
735    }
736}
737
738/// Failure classification — determines retry disposition on the Lua
739/// side. Mirrors the Lua-side classification codes.
740#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
741#[non_exhaustive]
742pub enum FailureClass {
743    /// Retryable transient error.
744    Transient,
745    /// Permanent error — no retries.
746    Permanent,
747    /// Crash / process death inferred from lease expiry.
748    InfraCrash,
749    /// Timeout at the attempt or operation level.
750    Timeout,
751    /// Cooperative cancellation by operator or cancel_flow.
752    Cancelled,
753}
754
755// ── §3.3.0 Usage / budget types ─────────────────────────────────────────
756
757/// Usage report for `report_usage`. Mirrors today's
758/// `ff_report_usage_and_check` ARGV: token-counts, wall-time, custom
759/// dimensions.
760#[derive(Clone, Debug, PartialEq, Eq, Default)]
761#[non_exhaustive]
762pub struct UsageDimensions {
763    /// Input tokens consumed (LLM-shaped usage). `0` if not applicable.
764    pub input_tokens: u64,
765    /// Output tokens produced (LLM-shaped usage).
766    pub output_tokens: u64,
767    /// Wall-clock duration, in milliseconds, attributable to this
768    /// report. `None` for pure token-count reports.
769    pub wall_ms: Option<u64>,
770    /// Arbitrary caller-defined dimensions. Use `BTreeMap` for stable
771    /// iteration order (important for dedup-key derivation on some
772    /// budget schemes).
773    pub custom: BTreeMap<String, u64>,
774    /// Optional caller-supplied idempotency key. When set, the backend
775    /// rejects a repeat application of the same key with
776    /// `ReportUsageResult::AlreadyApplied` rather than double-counting
777    /// (RFC-012 §R7.4; Lua `ff_report_usage_and_check` threads this as
778    /// the trailing ARGV). `None` / empty string disables dedup.
779    pub dedup_key: Option<String>,
780}
781
782impl UsageDimensions {
783    /// Create an empty usage report (all dimensions zero / `None`).
784    ///
785    /// Provided for external-crate consumers: `UsageDimensions` is
786    /// `#[non_exhaustive]`, so struct-literal and functional-update
787    /// construction are unavailable across crate boundaries. Start
788    /// from `new()` and chain `with_*` setters to build a report.
789    pub fn new() -> Self {
790        Self::default()
791    }
792
793    /// Set the input-token count dimension. Consumes and returns
794    /// `self` for chaining.
795    pub fn with_input_tokens(mut self, tokens: u64) -> Self {
796        self.input_tokens = tokens;
797        self
798    }
799
800    /// Set the output-token count dimension. Consumes and returns
801    /// `self` for chaining.
802    pub fn with_output_tokens(mut self, tokens: u64) -> Self {
803        self.output_tokens = tokens;
804        self
805    }
806
807    /// Set the wall-clock duration dimension, in milliseconds.
808    /// Consumes and returns `self` for chaining.
809    pub fn with_wall_ms(mut self, ms: u64) -> Self {
810        self.wall_ms = Some(ms);
811        self
812    }
813
814    /// Set the optional caller-supplied idempotency key. When set,
815    /// the backend rejects a repeat application of the same key with
816    /// `ReportUsageResult::AlreadyApplied` rather than double-counting
817    /// (RFC-012 §R7.4). Consumes and returns `self` for chaining.
818    pub fn with_dedup_key(mut self, key: impl Into<String>) -> Self {
819        self.dedup_key = Some(key.into());
820        self
821    }
822}
823
824// ── §3.3.0 Reclaim / lease types ────────────────────────────────────────
825
826/// Opaque cookie returned by the reclaim scanner; consumed by
827/// `claim_from_reclaim` to mint a resumed Handle.
828///
829/// Wraps [`ReclaimGrant`] today (the scanner's existing product).
830/// Kept as a newtype so trait signatures name the reclaim-bound role
831/// explicitly and so the wrapped shape can evolve without breaking the
832/// trait.
833#[derive(Clone, Debug, PartialEq, Eq)]
834#[non_exhaustive]
835pub struct ReclaimToken {
836    pub grant: ReclaimGrant,
837    /// Worker identity that will claim the resumed execution.
838    /// Wave 2 additive extension — mirrors the `ClaimPolicy`
839    /// shape since `claim_from_reclaim` does not take a `ClaimPolicy`.
840    pub worker_id: WorkerId,
841    /// Worker instance identity.
842    pub worker_instance_id: WorkerInstanceId,
843    /// Lease TTL (ms) for the resumed attempt's fresh lease.
844    pub lease_ttl_ms: u32,
845}
846
847impl ReclaimToken {
848    pub fn new(
849        grant: ReclaimGrant,
850        worker_id: WorkerId,
851        worker_instance_id: WorkerInstanceId,
852        lease_ttl_ms: u32,
853    ) -> Self {
854        Self {
855            grant,
856            worker_id,
857            worker_instance_id,
858            lease_ttl_ms,
859        }
860    }
861}
862
863/// Result of a successful `renew` call.
864#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
865#[non_exhaustive]
866pub struct LeaseRenewal {
867    /// New lease expiry (monotonic ms).
868    pub expires_at_ms: u64,
869    /// Lease epoch after renewal. Monotonic non-decreasing.
870    pub lease_epoch: u64,
871}
872
873impl LeaseRenewal {
874    pub fn new(expires_at_ms: u64, lease_epoch: u64) -> Self {
875        Self {
876            expires_at_ms,
877            lease_epoch,
878        }
879    }
880}
881
882// ── §3.3.0 Cancel-flow supporting types ─────────────────────────────────
883
884/// Cancel-flow policy — what to do with the flow's members. Today
885/// encoded as a `String` on `CancelFlowArgs`; Stage 0 extracts the
886/// policy shape as a typed enum (RFC-012 §3.3.0).
887#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
888#[non_exhaustive]
889pub enum CancelFlowPolicy {
890    /// Cancel only the flow record; leave members alone.
891    FlowOnly,
892    /// Cancel the flow and every non-terminal member execution.
893    CancelAll,
894    /// Cancel the flow and every member currently in `Pending` /
895    /// `Blocked` / `Eligible` — leave `Running` executions alone to
896    /// drain.
897    CancelPending,
898}
899
900/// Caller wait posture for `cancel_flow`.
901#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
902#[non_exhaustive]
903pub enum CancelFlowWait {
904    /// Return after the flow-state flip; member cancellations dispatch
905    /// asynchronously.
906    NoWait,
907    /// Block until member cancellations complete, up to `timeout`.
908    WaitTimeout(Duration),
909    /// Block until member cancellations complete, no deadline.
910    WaitIndefinite,
911}
912
913// ── §3.3.0 Completion stream payload ────────────────────────────────────
914
915/// One completion event delivered through the `CompletionStream`
916/// (RFC-012 §4.3). Also the payload type for issue #90's subscription
917/// API. Stage 0 authorises the type; issue #90 fixes the wire shape.
918///
919/// `flow_id` was added in issue #90 so DAG-dependency routing
920/// (`dispatch_dependency_resolution`) has the partition-routable flow
921/// handle without reparsing the Lua-emitted JSON downstream.
922/// `#[non_exhaustive]` keeps future field additions additive; use
923/// [`CompletionPayload::new`] and [`CompletionPayload::with_flow_id`]
924/// for construction.
925///
926/// `payload_bytes` / `produced_at_ms` are authorised but not yet
927/// populated by the Valkey Lua emitters — consumers on the
928/// `CompletionStream` read `execution_id` + `flow_id` today and must
929/// tolerate `payload_bytes = None` / `produced_at_ms = 0`.
930#[derive(Clone, Debug, PartialEq, Eq)]
931#[non_exhaustive]
932pub struct CompletionPayload {
933    pub execution_id: crate::types::ExecutionId,
934    pub outcome: String,
935    pub payload_bytes: Option<Vec<u8>>,
936    pub produced_at_ms: TimestampMs,
937    /// Flow handle for partition routing. Added in issue #90 (#90);
938    /// `None` for emitters that don't yet surface it.
939    pub flow_id: Option<crate::types::FlowId>,
940}
941
942impl CompletionPayload {
943    pub fn new(
944        execution_id: crate::types::ExecutionId,
945        outcome: impl Into<String>,
946        payload_bytes: Option<Vec<u8>>,
947        produced_at_ms: TimestampMs,
948    ) -> Self {
949        Self {
950            execution_id,
951            outcome: outcome.into(),
952            payload_bytes,
953            produced_at_ms,
954            flow_id: None,
955        }
956    }
957
958    /// Attach a flow handle to the payload. Additive builder so adding
959    /// `flow_id` didn't require a breaking change to [`Self::new`].
960    #[must_use]
961    pub fn with_flow_id(mut self, flow_id: crate::types::FlowId) -> Self {
962        self.flow_id = Some(flow_id);
963        self
964    }
965}
966
967// ── §3.3.0 ResumeSignal (crate move from ff-sdk::task) ──────────────────
968
969/// Signal that satisfied a waitpoint matcher and is therefore part of
970/// the reason an execution resumed. Returned by `observe_signals`
971/// (RFC-012 §3.1.2) and by `ClaimedTask::resume_signals` in ff-sdk.
972///
973/// Moved in Stage 0 from `ff_sdk::task`; `ff_sdk::ResumeSignal` remains
974/// re-exported through the 0.4.x window (removal scheduled for 0.5.0).
975///
976/// Returned only for signals whose matcher slot in the waitpoint's
977/// resume condition is marked satisfied. Pre-buffered-but-unmatched
978/// signals are not included.
979///
980/// Note: NOT `#[non_exhaustive]` to preserve struct-literal compatibility
981/// with ff-sdk call sites that constructed `ResumeSignal { .. }` before
982/// the Stage 0 crate move.
983#[derive(Clone, Debug, PartialEq, Eq)]
984pub struct ResumeSignal {
985    pub signal_id: crate::types::SignalId,
986    pub signal_name: String,
987    pub signal_category: String,
988    pub source_type: String,
989    pub source_identity: String,
990    pub correlation_id: String,
991    /// Valkey-server `now_ms` timestamp at which `ff_deliver_signal`
992    /// accepted this signal. `0` if the stored `accepted_at` field is
993    /// missing or non-numeric (a Lua-side defect — not expected at
994    /// runtime).
995    pub accepted_at: TimestampMs,
996    /// Raw payload bytes, if the signal was delivered with one. `None`
997    /// for signals delivered without a payload.
998    pub payload: Option<Vec<u8>>,
999}
1000
1001// ── Stage 1a: FailOutcome move ──────────────────────────────────────────
1002
1003/// Outcome of a `fail()` call.
1004///
1005/// **RFC-012 Stage 1a:** moved from `ff_sdk::task::FailOutcome` to
1006/// `ff_core::backend::FailOutcome` so it is nameable by the
1007/// `EngineBackend` trait signature. `ff_sdk::task` retains a
1008/// `pub use` shim preserving the `ff_sdk::FailOutcome` path.
1009///
1010/// Not `#[non_exhaustive]` because existing consumers (ff-test,
1011/// ff-readiness-tests) construct and match this enum exhaustively;
1012/// the shape has been stable since the `fail()` API landed and any
1013/// additive growth would be a follow-up RFC's deliberate break.
1014#[derive(Clone, Debug, PartialEq, Eq)]
1015pub enum FailOutcome {
1016    /// Retry was scheduled — execution is in delayed backoff.
1017    RetryScheduled {
1018        delay_until: crate::types::TimestampMs,
1019    },
1020    /// No retries left — execution is terminal failed.
1021    TerminalFailed,
1022}
1023
1024// ── Issue #281: PrepareOutcome (boot-prep trait-method return) ────────
1025
1026/// Outcome of an [`EngineBackend::prepare`](crate::engine_backend::EngineBackend::prepare)
1027/// call — one-time backend-specific boot preparation.
1028///
1029/// Issue #281: moves cairn's `ensure_library` retry loop upstream so
1030/// consumers can boot any backend uniformly via
1031/// `backend.prepare().await?` without knowing whether it is Valkey
1032/// (FUNCTION LOAD) or Postgres (migrations are out-of-band per
1033/// RFC-v0.7 Wave 0 Q12, so `NoOp`). `#[non_exhaustive]` so future
1034/// backends can add variants (e.g. `Skipped { reason }`) additively.
1035#[derive(Clone, Debug, PartialEq, Eq)]
1036#[non_exhaustive]
1037pub enum PrepareOutcome {
1038    /// Backend had nothing to do — either genuinely no-op (Postgres:
1039    /// migrations applied out-of-band) or the requested prep work was
1040    /// already in place and idempotent (Valkey: library already at
1041    /// the expected version; the Valkey impl collapses the
1042    /// "already-present" case into `Applied` to keep one success
1043    /// variant — consumers that want to distinguish can parse
1044    /// `description`).
1045    NoOp,
1046    /// Backend ran preparation work (e.g. Valkey FUNCTION LOAD
1047    /// REPLACE). `description` is a human-readable summary suitable
1048    /// for `info!`-level log lines like
1049    /// `"FUNCTION LOAD (flowfabric lib v<N>)"`; shape is not
1050    /// machine-parseable and MAY change across versions.
1051    Applied {
1052        /// Human-readable summary of what was prepared.
1053        description: String,
1054    },
1055}
1056
1057impl PrepareOutcome {
1058    /// Shorthand constructor for the `Applied` variant.
1059    pub fn applied(description: impl Into<String>) -> Self {
1060        Self::Applied {
1061            description: description.into(),
1062        }
1063    }
1064}
1065
1066// ── RFC-012 §R7: AppendFrameOutcome move ────────────────────────────────
1067
1068/// Outcome of an `append_frame()` call.
1069///
1070/// **RFC-012 §R7.2.1:** moved from `ff_sdk::task::AppendFrameOutcome`
1071/// to `ff_core::backend::AppendFrameOutcome` so it is nameable by the
1072/// `EngineBackend::append_frame` trait return. `ff_sdk::task` retains
1073/// a `pub use` shim preserving the `ff_sdk::task::AppendFrameOutcome`
1074/// path through 0.4.x.
1075///
1076/// Derive set matches the `FailOutcome` precedent
1077/// (`Clone, Debug, PartialEq, Eq`). Not `#[non_exhaustive]`:
1078/// construction is internal to the backend today (parser in
1079/// `ff-backend-valkey`), and no external constructors are anticipated
1080/// (consumer-shape evidence per §R7.2.1 / MN3).
1081///
1082/// `stream_id: String` is a stable shape commitment — a future typed
1083/// `StreamId` newtype would be its own breaking change (§R7.5.6 / MD2).
1084///
1085/// RFC-015 §9 made this type `#[non_exhaustive]` so the new
1086/// `summary_version: Option<u64>` field (populated only for
1087/// [`StreamMode::DurableSummary`] appends) can land additively and
1088/// future per-mode outcome fields can follow. Construct via
1089/// [`Self::new`] + the chainable setters; cross-crate consumers cannot
1090/// use struct-literal construction.
1091#[derive(Clone, Debug, PartialEq, Eq, Default)]
1092#[non_exhaustive]
1093pub struct AppendFrameOutcome {
1094    /// Valkey Stream entry ID assigned to this frame (e.g. `1234567890-0`).
1095    pub stream_id: String,
1096    /// Total frame count in the stream after this append.
1097    pub frame_count: u64,
1098    /// Rolling summary `version` after the delta applied, populated
1099    /// only for [`StreamMode::DurableSummary`] appends (RFC-015 §3.3
1100    /// step 6). `None` for [`StreamMode::Durable`] /
1101    /// [`StreamMode::BestEffortLive`] appends — callers that need
1102    /// "total deltas applied" read this field.
1103    pub summary_version: Option<u64>,
1104}
1105
1106impl AppendFrameOutcome {
1107    /// Build an outcome with the mandatory `stream_id` / `frame_count`
1108    /// fields. `summary_version` defaults to `None` — call
1109    /// [`Self::with_summary_version`] for [`StreamMode::DurableSummary`]
1110    /// appends.
1111    pub fn new(stream_id: impl Into<String>, frame_count: u64) -> Self {
1112        Self {
1113            stream_id: stream_id.into(),
1114            frame_count,
1115            summary_version: None,
1116        }
1117    }
1118
1119    /// Attach a rolling summary version (RFC-015 §9).
1120    #[must_use]
1121    pub fn with_summary_version(mut self, version: u64) -> Self {
1122        self.summary_version = Some(version);
1123        self
1124    }
1125}
1126
1127// ── Stage 1a: BackendConfig + sub-types ─────────────────────────────────
1128
1129/// Pool timing shared across backend connections.
1130///
1131/// Fields are `#[non_exhaustive]` per project convention — connection
1132/// tunables grow as new backends land. Default is the Phase-1 Valkey
1133/// client's out-of-box shape (no explicit timeout, no explicit pool
1134/// cap; inherits ferriskey defaults).
1135#[derive(Clone, Debug, Default, PartialEq, Eq)]
1136#[non_exhaustive]
1137pub struct BackendTimeouts {
1138    /// Per-request timeout. `None` ⇒ backend default.
1139    pub request: Option<Duration>,
1140}
1141
1142/// Retry policy shared across backend connections.
1143///
1144/// Matches ferriskey's `ConnectionRetryStrategy` shape (see
1145/// `ferriskey/src/client/types.rs:151`) so Stage 1c's Valkey wiring is a
1146/// direct pass-through — we don't reimplement what ferriskey already
1147/// provides. The Postgres backend (future) interprets the same fields
1148/// under its own retry semantics, or maps `None` to its own defaults.
1149///
1150/// Each field is `Option<u32>`: `None` ⇒ backend default (for Valkey,
1151/// this means `ConnectionRetryStrategy::default()`); `Some(v)` ⇒ pass
1152/// `v` straight through.
1153#[derive(Clone, Debug, Default, PartialEq, Eq)]
1154#[non_exhaustive]
1155pub struct BackendRetry {
1156    /// Exponent base for the backoff curve. `None` ⇒ backend default.
1157    pub exponent_base: Option<u32>,
1158    /// Multiplicative factor applied to each backoff step. `None` ⇒
1159    /// backend default.
1160    pub factor: Option<u32>,
1161    /// Maximum number of retry attempts on transient transport errors.
1162    /// `None` ⇒ backend default.
1163    pub number_of_retries: Option<u32>,
1164    /// Jitter as a percentage of the computed backoff. `None` ⇒ backend
1165    /// default.
1166    pub jitter_percent: Option<u32>,
1167}
1168
1169/// Valkey-specific connection parameters.
1170#[derive(Clone, Debug, PartialEq, Eq)]
1171#[non_exhaustive]
1172pub struct ValkeyConnection {
1173    /// Valkey hostname.
1174    pub host: String,
1175    /// Valkey port.
1176    pub port: u16,
1177    /// Enable TLS for the Valkey connection.
1178    pub tls: bool,
1179    /// Enable Valkey cluster mode.
1180    pub cluster: bool,
1181}
1182
1183impl ValkeyConnection {
1184    pub fn new(host: impl Into<String>, port: u16) -> Self {
1185        Self {
1186            host: host.into(),
1187            port,
1188            tls: false,
1189            cluster: false,
1190        }
1191    }
1192}
1193
1194/// Postgres-specific connection parameters (RFC-v0.7 Wave 0).
1195///
1196/// `url` is a standard libpq/sqlx connection string
1197/// (`postgres://user:pass@host:port/db`). Pool sizing + acquire
1198/// timeout are carried on this struct rather than
1199/// [`BackendTimeouts`] / [`BackendRetry`] because they map directly
1200/// to `sqlx::postgres::PgPoolOptions` and have no Valkey equivalent.
1201#[derive(Clone, Debug, PartialEq, Eq)]
1202#[non_exhaustive]
1203pub struct PostgresConnection {
1204    /// Postgres connection URL.
1205    pub url: String,
1206    /// Maximum number of pool connections.
1207    pub max_connections: u32,
1208    /// Minimum number of idle pool connections.
1209    pub min_connections: u32,
1210    /// Per-acquire pool timeout.
1211    pub acquire_timeout: Duration,
1212}
1213
1214impl PostgresConnection {
1215    /// Build a Postgres connection from a URL with defaults for pool
1216    /// sizing (matches sqlx's out-of-box defaults: max=10, min=0,
1217    /// acquire=30s).
1218    pub fn new(url: impl Into<String>) -> Self {
1219        Self {
1220            url: url.into(),
1221            max_connections: 10,
1222            min_connections: 0,
1223            acquire_timeout: Duration::from_secs(30),
1224        }
1225    }
1226}
1227
1228/// Discriminated union over per-backend connection shapes. Stage 1a
1229/// shipped the Valkey arm; RFC-v0.7 Wave 0 adds the Postgres arm
1230/// additively under the `#[non_exhaustive]` guard.
1231#[derive(Clone, Debug, PartialEq, Eq)]
1232#[non_exhaustive]
1233pub enum BackendConnection {
1234    Valkey(ValkeyConnection),
1235    Postgres(PostgresConnection),
1236}
1237
1238/// Configuration passed to `ValkeyBackend::connect` (and, later, to
1239/// other backend `connect` constructors). Carries the connection
1240/// details + shared timing/retry policy. Replaces the Valkey-specific
1241/// fields today on `WorkerConfig` (RFC-012 §5.1 migration plan).
1242///
1243/// `BackendConfig` is the replacement target for `WorkerConfig`'s
1244/// `host` / `port` / `tls` / `cluster` fields. The full migration
1245/// lands across Stage 1a (type introduction) and Stage 1c
1246/// (`WorkerConfig` forwarding); worker-policy fields (lease TTL,
1247/// claim poll interval, capability set) stay on `WorkerConfig`.
1248#[derive(Clone, Debug, PartialEq, Eq)]
1249#[non_exhaustive]
1250pub struct BackendConfig {
1251    pub connection: BackendConnection,
1252    pub timeouts: BackendTimeouts,
1253    pub retry: BackendRetry,
1254}
1255
1256impl BackendConfig {
1257    /// Build a Valkey BackendConfig from host+port. Other fields take
1258    /// backend defaults.
1259    pub fn valkey(host: impl Into<String>, port: u16) -> Self {
1260        Self {
1261            connection: BackendConnection::Valkey(ValkeyConnection::new(host, port)),
1262            timeouts: BackendTimeouts::default(),
1263            retry: BackendRetry::default(),
1264        }
1265    }
1266
1267    /// Build a Postgres BackendConfig from a URL. Other fields take
1268    /// backend defaults. RFC-v0.7 Wave 0.
1269    pub fn postgres(url: impl Into<String>) -> Self {
1270        Self {
1271            connection: BackendConnection::Postgres(PostgresConnection::new(url)),
1272            timeouts: BackendTimeouts::default(),
1273            retry: BackendRetry::default(),
1274        }
1275    }
1276}
1277
1278// ── Issue #122: ScannerFilter ───────────────────────────────────────────
1279
1280/// Per-consumer filter applied by FlowFabric's background scanners and
1281/// completion subscribers so multiple FlowFabric instances sharing a
1282/// single Valkey keyspace can operate on disjoint subsets of
1283/// executions without mutual interference (issue #122).
1284///
1285/// Sibling of [`CompletionPayload`]: the former is a scan *output*
1286/// shape, this is the *input* predicate scanners and completion
1287/// subscribers consult per candidate.
1288///
1289/// # Fields & backing storage
1290///
1291/// * `namespace` — matches against the `namespace` field on
1292///   `exec_core` (Valkey hash `ff:exec:{fp:N}:<eid>:core`). Cost:
1293///   one HGET per candidate when set.
1294/// * `instance_tag` — matches against an entry in the execution's
1295///   user-supplied tags hash at the canonical key
1296///   **`ff:exec:{p}:<eid>:tags`** (where `{p}` is the partition
1297///   hash-tag, e.g. `{fp:42}`). Written by the Lua function
1298///   `ff_create_execution` (see `lua/execution.lua`) and
1299///   `ff_set_execution_tags`. The tuple is `(tag_key, tag_value)`:
1300///   the HGET targets `tag_key` on the tags hash and compares the
1301///   returned string (if any) byte-for-byte against `tag_value`.
1302///   Cost: one HGET per candidate when set.
1303///
1304/// When both are set, scanners check `namespace` first (short-circuit
1305/// on mismatch) then `instance_tag`, for a maximum of 2 HGETs per
1306/// candidate.
1307///
1308/// # Semantics
1309///
1310/// * [`Self::is_noop`] returns true when both fields are `None` —
1311///   the filter accepts every candidate. Used by the
1312///   `subscribe_completions_filtered` default body to fall back to
1313///   the unfiltered subscription.
1314/// * [`Self::matches`] is the tight in-memory predicate once the
1315///   HGET values have been fetched. Scanners fetch the fields
1316///   lazily (namespace first) and pass the results in; the helper
1317///   returns false as soon as one component mismatches.
1318///
1319/// # Scope
1320///
1321/// Today the filter is consulted by execution-shaped scanners
1322/// (lease_expiry, attempt_timeout, execution_deadline,
1323/// suspension_timeout, pending_wp_expiry, delayed_promoter,
1324/// dependency_reconciler, cancel_reconciler, unblock,
1325/// index_reconciler, retention_trimmer) and by completion
1326/// subscribers. Non-execution scanners (budget_reconciler,
1327/// budget_reset, quota_reconciler, flow_projector) accept a filter
1328/// for API uniformity but do not apply it — their iteration domains
1329/// (budgets, quotas, flows) are not keyed by the
1330/// per-execution namespace / instance_tag shape.
1331///
1332/// `#[non_exhaustive]` so future dimensions (e.g. `lane_id`,
1333/// `worker_instance`) can land additively.
1334#[derive(Clone, Debug, Default, PartialEq, Eq)]
1335#[non_exhaustive]
1336pub struct ScannerFilter {
1337    /// Tenant / workspace scope. Matches against the `namespace`
1338    /// field on `exec_core`.
1339    pub namespace: Option<Namespace>,
1340    /// Instance-scoped tag predicate `(tag_key, tag_value)`. Matches
1341    /// against an entry in `ff:exec:{p}:<eid>:tags` (the tags hash
1342    /// written by `ff_create_execution`).
1343    pub instance_tag: Option<(String, String)>,
1344}
1345
1346impl ScannerFilter {
1347    /// Shared no-op filter — useful as the default for the
1348    /// `Scanner::filter()` trait method so implementors that don't
1349    /// override can hand back a `&'static` reference without
1350    /// allocating per call.
1351    pub const NOOP: Self = Self {
1352        namespace: None,
1353        instance_tag: None,
1354    };
1355
1356    /// Create an empty filter (equivalent to [`Self::NOOP`]).
1357    ///
1358    /// Provided for external-crate consumers: `ScannerFilter` is
1359    /// `#[non_exhaustive]`, so struct-literal and functional-update
1360    /// construction are unavailable across crate boundaries. Start
1361    /// from `new()` and chain `with_*` setters to build a filter.
1362    pub fn new() -> Self {
1363        Self::default()
1364    }
1365
1366    /// Set the tenant-scope namespace filter dimension. Consumes
1367    /// and returns `self` for chaining.
1368    ///
1369    /// Accepts anything that converts into [`Namespace`] so callers
1370    /// can pass `&str` / `String` directly without the
1371    /// `Namespace::new(...)` ceremony (the conversion is infallible;
1372    /// [`Namespace`] has `From<&str>` and `From<String>` via the
1373    /// crate's `string_id!` macro).
1374    pub fn with_namespace(mut self, ns: impl Into<Namespace>) -> Self {
1375        self.namespace = Some(ns.into());
1376        self
1377    }
1378
1379    /// Set the exact-match exec-tag filter dimension. Consumes and
1380    /// returns `self` for chaining. At filter time, scanners read
1381    /// the `ff:exec:{p:N}:<eid>:tags` hash and compare the value at
1382    /// `key` byte-for-byte against `value`.
1383    pub fn with_instance_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
1384        self.instance_tag = Some((key.into(), value.into()));
1385        self
1386    }
1387
1388    /// True iff the filter has no dimensions set — every candidate
1389    /// passes. Callers use this to short-circuit filtered subscribe
1390    /// paths back to the unfiltered ones.
1391    pub fn is_noop(&self) -> bool {
1392        self.namespace.is_none() && self.instance_tag.is_none()
1393    }
1394
1395    /// Post-HGET in-memory match check.
1396    ///
1397    /// `core_namespace` should be the `namespace` field read from
1398    /// `exec_core` (None if the HGET returned nil / was skipped).
1399    /// `tag_value` should be the HGET result for the configured
1400    /// `instance_tag.0` key on the execution's tags hash (None if
1401    /// the HGET returned nil / was skipped).
1402    ///
1403    /// When a filter dimension is `None` the corresponding argument
1404    /// is ignored — callers may pass `None` to skip the HGET and
1405    /// save a round-trip.
1406    pub fn matches(
1407        &self,
1408        core_namespace: Option<&Namespace>,
1409        tag_value: Option<&str>,
1410    ) -> bool {
1411        if let Some(ref want) = self.namespace {
1412            match core_namespace {
1413                Some(have) if have == want => {}
1414                _ => return false,
1415            }
1416        }
1417        if let Some((_, ref want_value)) = self.instance_tag {
1418            match tag_value {
1419                Some(have) if have == want_value.as_str() => {}
1420                _ => return false,
1421            }
1422        }
1423        true
1424    }
1425}
1426
1427// ── Tests ───────────────────────────────────────────────────────────────
1428
1429#[cfg(test)]
1430mod tests {
1431    use super::*;
1432    use crate::partition::{Partition, PartitionFamily};
1433    use crate::types::{ExecutionId, LaneId, SignalId};
1434
1435    #[test]
1436    fn backend_tag_derives() {
1437        let a = BackendTag::Valkey;
1438        let b = a;
1439        assert_eq!(a, b);
1440        assert_eq!(format!("{a:?}"), "Valkey");
1441    }
1442
1443    #[test]
1444    fn handle_kind_derives() {
1445        for k in [HandleKind::Fresh, HandleKind::Resumed, HandleKind::Suspended] {
1446            let c = k;
1447            assert_eq!(k, c);
1448            // Debug formatter reachable
1449            let _ = format!("{k:?}");
1450        }
1451    }
1452
1453    #[test]
1454    fn handle_opaque_roundtrips() {
1455        let bytes: Box<[u8]> = Box::new([1u8, 2, 3, 4]);
1456        let o = HandleOpaque::new(bytes.clone());
1457        assert_eq!(o.as_bytes(), &[1u8, 2, 3, 4]);
1458        assert_eq!(o, o.clone());
1459        let _ = format!("{o:?}");
1460    }
1461
1462    #[test]
1463    fn handle_composes() {
1464        let h = Handle::new(
1465            BackendTag::Valkey,
1466            HandleKind::Fresh,
1467            HandleOpaque::new(Box::new([0u8; 4])),
1468        );
1469        assert_eq!(h.backend, BackendTag::Valkey);
1470        assert_eq!(h.kind, HandleKind::Fresh);
1471        assert_eq!(h.clone(), h);
1472    }
1473
1474    #[test]
1475    fn capability_set_derives() {
1476        let c1 = CapabilitySet::new(["gpu", "cuda"]);
1477        let c2 = CapabilitySet::new(["gpu", "cuda"]);
1478        assert_eq!(c1, c2);
1479        assert!(!c1.is_empty());
1480        assert!(CapabilitySet::default().is_empty());
1481        let _ = format!("{c1:?}");
1482    }
1483
1484    #[test]
1485    fn claim_policy_derives() {
1486        let p = ClaimPolicy::new(
1487            WorkerId::new("w"),
1488            WorkerInstanceId::new("w-1"),
1489            30_000,
1490            Some(Duration::from_millis(500)),
1491        );
1492        assert_eq!(p.max_wait, Some(Duration::from_millis(500)));
1493        assert_eq!(p.lease_ttl_ms, 30_000);
1494        assert_eq!(p.worker_id.as_str(), "w");
1495        assert_eq!(p.worker_instance_id.as_str(), "w-1");
1496        assert_eq!(p.clone(), p);
1497        let immediate = ClaimPolicy::new(
1498            WorkerId::new("w"),
1499            WorkerInstanceId::new("w-1"),
1500            30_000,
1501            None,
1502        );
1503        assert_eq!(immediate.max_wait, None);
1504    }
1505
1506    #[test]
1507    fn frame_and_kind_derive() {
1508        let f = Frame {
1509            bytes: b"hello".to_vec(),
1510            kind: FrameKind::Stdout,
1511            seq: Some(3),
1512            frame_type: "delta".to_owned(),
1513            correlation_id: Some("req-42".to_owned()),
1514            mode: StreamMode::Durable,
1515        };
1516        assert_eq!(f.clone(), f);
1517        assert_eq!(f.kind, FrameKind::Stdout);
1518        assert_eq!(f.frame_type, "delta");
1519        assert_eq!(f.correlation_id.as_deref(), Some("req-42"));
1520        assert_ne!(FrameKind::Stderr, FrameKind::Event);
1521        let _ = format!("{f:?}");
1522    }
1523
1524    #[test]
1525    fn frame_builders_populate_extended_fields() {
1526        let f = Frame::new(b"payload".to_vec(), FrameKind::Event)
1527            .with_frame_type("agent_step")
1528            .with_correlation_id("corr-1");
1529        assert_eq!(f.frame_type, "agent_step");
1530        assert_eq!(f.correlation_id.as_deref(), Some("corr-1"));
1531        assert_eq!(f.seq, None);
1532
1533        let bare = Frame::new(b"p".to_vec(), FrameKind::Event);
1534        assert_eq!(bare.frame_type, "");
1535        assert_eq!(bare.correlation_id, None);
1536    }
1537
1538    #[test]
1539    fn waitpoint_spec_derives() {
1540        let spec = WaitpointSpec {
1541            kind: WaitpointKind::SignalName,
1542            matcher: b"approved".to_vec(),
1543            hmac_token: WaitpointHmac::new("kid1:deadbeef"),
1544        };
1545        assert_eq!(spec.clone(), spec);
1546        assert_eq!(spec.hmac_token.as_str(), "kid1:deadbeef");
1547        assert_eq!(
1548            WaitpointHmac::new("a"),
1549            WaitpointHmac::new(String::from("a"))
1550        );
1551    }
1552
1553    #[test]
1554    fn failure_reason_and_class() {
1555        let r1 = FailureReason::new("boom");
1556        let r2 = FailureReason::with_detail("boom", b"stack".to_vec());
1557        assert_eq!(r1.message, "boom");
1558        assert!(r1.detail.is_none());
1559        assert_eq!(r2.detail.as_deref(), Some(&b"stack"[..]));
1560        assert_eq!(r1.clone(), r1);
1561        assert_ne!(FailureClass::Transient, FailureClass::Permanent);
1562    }
1563
1564    #[test]
1565    fn usage_dimensions_default_and_eq() {
1566        let u = UsageDimensions {
1567            input_tokens: 10,
1568            output_tokens: 20,
1569            wall_ms: Some(150),
1570            custom: BTreeMap::from([("net_bytes".to_string(), 42)]),
1571            dedup_key: Some("k1".into()),
1572        };
1573        assert_eq!(u.clone(), u);
1574        assert_eq!(UsageDimensions::default().input_tokens, 0);
1575        assert_eq!(UsageDimensions::default().dedup_key, None);
1576    }
1577
1578    #[test]
1579    fn usage_dimensions_builder_chain() {
1580        let u = UsageDimensions::new()
1581            .with_input_tokens(10)
1582            .with_output_tokens(20)
1583            .with_wall_ms(150)
1584            .with_dedup_key("k1");
1585        assert_eq!(u.input_tokens, 10);
1586        assert_eq!(u.output_tokens, 20);
1587        assert_eq!(u.wall_ms, Some(150));
1588        assert_eq!(u.dedup_key.as_deref(), Some("k1"));
1589        assert!(u.custom.is_empty());
1590        // `new()` is equivalent to `default()`.
1591        assert_eq!(UsageDimensions::new(), UsageDimensions::default());
1592    }
1593
1594    #[test]
1595    fn reclaim_token_wraps_grant() {
1596        let grant = ReclaimGrant {
1597            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
1598            partition_key: crate::partition::PartitionKey::from(&Partition {
1599                family: PartitionFamily::Flow,
1600                index: 0,
1601            }),
1602            grant_key: "gkey".into(),
1603            expires_at_ms: 123,
1604            lane_id: LaneId::new("default"),
1605        };
1606        let t = ReclaimToken::new(
1607            grant.clone(),
1608            WorkerId::new("w"),
1609            WorkerInstanceId::new("w-1"),
1610            30_000,
1611        );
1612        assert_eq!(t.grant, grant);
1613        assert_eq!(t.worker_id.as_str(), "w");
1614        assert_eq!(t.worker_instance_id.as_str(), "w-1");
1615        assert_eq!(t.lease_ttl_ms, 30_000);
1616        assert_eq!(t.clone(), t);
1617    }
1618
1619    #[test]
1620    fn lease_renewal_is_copy() {
1621        let r = LeaseRenewal {
1622            expires_at_ms: 100,
1623            lease_epoch: 2,
1624        };
1625        let s = r; // Copy
1626        assert_eq!(r, s);
1627    }
1628
1629    #[test]
1630    fn cancel_flow_policy_and_wait() {
1631        assert_ne!(CancelFlowPolicy::FlowOnly, CancelFlowPolicy::CancelAll);
1632        let w = CancelFlowWait::WaitTimeout(Duration::from_secs(1));
1633        assert_eq!(w, w);
1634        assert_ne!(CancelFlowWait::NoWait, CancelFlowWait::WaitIndefinite);
1635    }
1636
1637    #[test]
1638    fn completion_payload_derives() {
1639        let c = CompletionPayload {
1640            execution_id: ExecutionId::solo(&LaneId::new("default"), &Default::default()),
1641            outcome: "success".into(),
1642            payload_bytes: Some(b"ok".to_vec()),
1643            produced_at_ms: TimestampMs::from_millis(1234),
1644            flow_id: None,
1645        };
1646        assert_eq!(c.clone(), c);
1647        let _ = format!("{c:?}");
1648    }
1649
1650    #[test]
1651    fn resume_signal_moved_and_derives() {
1652        let s = ResumeSignal {
1653            signal_id: SignalId::new(),
1654            signal_name: "approve".into(),
1655            signal_category: "decision".into(),
1656            source_type: "user".into(),
1657            source_identity: "u1".into(),
1658            correlation_id: "c1".into(),
1659            accepted_at: TimestampMs::from_millis(10),
1660            payload: None,
1661        };
1662        assert_eq!(s.clone(), s);
1663        let _ = format!("{s:?}");
1664    }
1665
1666    #[test]
1667    fn fail_outcome_variants() {
1668        let retry = FailOutcome::RetryScheduled {
1669            delay_until: TimestampMs::from_millis(42),
1670        };
1671        let terminal = FailOutcome::TerminalFailed;
1672        assert_ne!(retry, terminal);
1673        assert_eq!(retry.clone(), retry);
1674    }
1675
1676    #[test]
1677    fn scanner_filter_noop_and_default() {
1678        let f = ScannerFilter::default();
1679        assert!(f.is_noop());
1680        assert_eq!(f, ScannerFilter::NOOP);
1681        // A no-op filter matches any candidate, including ones that
1682        // produced no HGET results.
1683        assert!(f.matches(None, None));
1684        assert!(f.matches(Some(&Namespace::new("t1")), Some("v")));
1685    }
1686
1687    #[test]
1688    fn scanner_filter_namespace_match() {
1689        let f = ScannerFilter {
1690            namespace: Some(Namespace::new("tenant-a")),
1691            instance_tag: None,
1692        };
1693        assert!(!f.is_noop());
1694        assert!(f.matches(Some(&Namespace::new("tenant-a")), None));
1695        assert!(!f.matches(Some(&Namespace::new("tenant-b")), None));
1696        // Missing core namespace ⇒ no match.
1697        assert!(!f.matches(None, None));
1698    }
1699
1700    #[test]
1701    fn scanner_filter_instance_tag_match() {
1702        let f = ScannerFilter {
1703            namespace: None,
1704            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1705        };
1706        assert!(f.matches(None, Some("i-1")));
1707        assert!(!f.matches(None, Some("i-2")));
1708        assert!(!f.matches(None, None));
1709    }
1710
1711    #[test]
1712    fn scanner_filter_both_dimensions() {
1713        let f = ScannerFilter {
1714            namespace: Some(Namespace::new("tenant-a")),
1715            instance_tag: Some(("cairn.instance_id".into(), "i-1".into())),
1716        };
1717        assert!(f.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1718        assert!(!f.matches(Some(&Namespace::new("tenant-a")), Some("i-2")));
1719        assert!(!f.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1720        assert!(!f.matches(None, Some("i-1")));
1721    }
1722
1723    #[test]
1724    fn scanner_filter_builder_construction() {
1725        // Simulates external-crate usage: only public constructors
1726        // and chainable setters (no struct-literal access to the
1727        // `#[non_exhaustive]` fields).
1728        let empty = ScannerFilter::new();
1729        assert!(empty.is_noop());
1730        assert_eq!(empty, ScannerFilter::NOOP);
1731
1732        let ns_only = ScannerFilter::new().with_namespace(Namespace::new("tenant-a"));
1733        assert!(!ns_only.is_noop());
1734        assert!(ns_only.matches(Some(&Namespace::new("tenant-a")), None));
1735
1736        let tag_only = ScannerFilter::new().with_instance_tag("cairn.instance_id", "i-1");
1737        assert!(!tag_only.is_noop());
1738        assert!(tag_only.matches(None, Some("i-1")));
1739
1740        let both = ScannerFilter::new()
1741            .with_namespace(Namespace::new("tenant-a"))
1742            .with_instance_tag("cairn.instance_id", "i-1");
1743        assert!(both.matches(Some(&Namespace::new("tenant-a")), Some("i-1")));
1744        assert!(!both.matches(Some(&Namespace::new("tenant-b")), Some("i-1")));
1745    }
1746
1747    // ── RFC-015 Stream-durability-mode types ──
1748
1749    #[test]
1750    fn stream_mode_constructors_and_wire_str() {
1751        assert_eq!(StreamMode::durable().wire_str(), "durable");
1752        assert_eq!(StreamMode::durable(), StreamMode::Durable);
1753
1754        let s = StreamMode::durable_summary();
1755        assert_eq!(s.wire_str(), "summary");
1756        match s {
1757            StreamMode::DurableSummary { patch_kind } => {
1758                assert_eq!(patch_kind, PatchKind::JsonMergePatch);
1759            }
1760            _ => panic!("expected DurableSummary"),
1761        }
1762
1763        let b = StreamMode::best_effort_live(15_000);
1764        assert_eq!(b.wire_str(), "best_effort");
1765        match b {
1766            StreamMode::BestEffortLive { config } => {
1767                assert_eq!(config.ttl_ms, 15_000);
1768                assert_eq!(config.maxlen_floor, 64);
1769                assert_eq!(config.maxlen_ceiling, 16_384);
1770                assert!((config.ema_alpha - 0.2).abs() < 1e-9);
1771            }
1772            _ => panic!("expected BestEffortLive"),
1773        }
1774
1775        let cfg = BestEffortLiveConfig::with_ttl(10_000)
1776            .with_maxlen_floor(128)
1777            .with_maxlen_ceiling(8_192)
1778            .with_ema_alpha(0.3);
1779        let b2 = StreamMode::best_effort_live_with_config(cfg);
1780        match b2 {
1781            StreamMode::BestEffortLive { config } => {
1782                assert_eq!(config.ttl_ms, 10_000);
1783                assert_eq!(config.maxlen_floor, 128);
1784                assert_eq!(config.maxlen_ceiling, 8_192);
1785                assert!((config.ema_alpha - 0.3).abs() < 1e-9);
1786            }
1787            _ => panic!("expected BestEffortLive"),
1788        }
1789
1790        assert_eq!(StreamMode::default(), StreamMode::Durable);
1791    }
1792
1793    #[test]
1794    fn tail_visibility_default_and_wire() {
1795        assert_eq!(TailVisibility::default(), TailVisibility::All);
1796        assert_eq!(TailVisibility::All.wire_str(), "");
1797        assert_eq!(
1798            TailVisibility::ExcludeBestEffort.wire_str(),
1799            "exclude_best_effort"
1800        );
1801    }
1802
1803    #[test]
1804    fn append_frame_outcome_summary_version_builder() {
1805        let base = AppendFrameOutcome::new("1713-0", 3);
1806        assert_eq!(base.stream_id, "1713-0");
1807        assert_eq!(base.frame_count, 3);
1808        assert_eq!(base.summary_version, None);
1809
1810        let with_v = AppendFrameOutcome::new("1713-0", 3).with_summary_version(7);
1811        assert_eq!(with_v.summary_version, Some(7));
1812        assert_eq!(with_v.clone(), with_v);
1813    }
1814
1815    /// RFC-015 §3.2 null-sentinel round-trip invariant.
1816    ///
1817    /// The sentinel (`"__ff_null__"`) is the byte-exact string callers
1818    /// use to encode "set this leaf to JSON null" in a JSON Merge Patch
1819    /// frame (because RFC 7396 otherwise treats `null` as delete-key).
1820    /// The invariant: on read, the summary document NEVER contains the
1821    /// sentinel string — it is always rewritten to JSON null.
1822    ///
1823    /// The full end-to-end invariant is exercised against the Lua
1824    /// applier in `crates/ff-test/tests/engine_backend_stream_modes.rs`
1825    /// (integration). Here we just pin the byte sequence + the
1826    /// type-level constant so any accidental edit of the sentinel
1827    /// string fails the unit build before it reaches an integration
1828    /// run.
1829    #[test]
1830    fn summary_null_sentinel_byte_exact() {
1831        assert_eq!(SUMMARY_NULL_SENTINEL, "__ff_null__");
1832        assert_eq!(SUMMARY_NULL_SENTINEL.len(), 11);
1833        assert!(SUMMARY_NULL_SENTINEL.bytes().all(|b| b.is_ascii()));
1834        assert_eq!(SUMMARY_NULL_SENTINEL.trim(), SUMMARY_NULL_SENTINEL);
1835    }
1836
1837    #[test]
1838    fn summary_document_constructor() {
1839        let doc = SummaryDocument::new(
1840            br#"{"tokens":3}"#.to_vec(),
1841            2,
1842            PatchKind::JsonMergePatch,
1843            1_700_000_100,
1844            1_700_000_000,
1845        );
1846        assert_eq!(doc.version, 2);
1847        assert_eq!(doc.patch_kind, PatchKind::JsonMergePatch);
1848        assert_eq!(doc.first_applied_ms, 1_700_000_000);
1849        assert_eq!(doc.clone(), doc);
1850    }
1851
1852    #[test]
1853    fn frame_builder_sets_mode() {
1854        let f = Frame::new(b"p".to_vec(), FrameKind::Event);
1855        assert_eq!(f.mode, StreamMode::Durable);
1856
1857        let f = Frame::new(b"p".to_vec(), FrameKind::Event)
1858            .with_mode(StreamMode::durable_summary());
1859        match f.mode {
1860            StreamMode::DurableSummary { patch_kind } => {
1861                assert_eq!(patch_kind, PatchKind::JsonMergePatch);
1862            }
1863            other => panic!("expected DurableSummary, got {other:?}"),
1864        }
1865    }
1866
1867    #[test]
1868    fn backend_config_valkey_ctor() {
1869        let c = BackendConfig::valkey("host.local", 6379);
1870        // Wave 0 (#230) added `BackendConnection::Postgres`, so the
1871        // match is no longer irrefutable. `let-else` keeps the test
1872        // focused on the Valkey arm without a full `match`.
1873        let BackendConnection::Valkey(v) = &c.connection else {
1874            panic!("expected Valkey connection, got Postgres");
1875        };
1876        assert_eq!(v.host, "host.local");
1877        assert_eq!(v.port, 6379);
1878        assert!(!v.tls);
1879        assert!(!v.cluster);
1880        assert_eq!(c.timeouts, BackendTimeouts::default());
1881        assert_eq!(c.retry, BackendRetry::default());
1882        assert_eq!(c.clone(), c);
1883    }
1884}