Skip to main content

ff_core/
stream_events.rs

1//! RFC-019 Stage C — Typed stream events.
2//!
3//! Stage A/B shipped the cursor machinery + backend adapters emitting
4//! `StreamEvent { payload: Bytes, … }`, forcing every consumer to
5//! parse a backend-shaped byte blob (Valkey: NUL-delimited field map,
6//! Postgres: `serde_json`). This module promotes each family to a
7//! typed enum so consumers `match` instead of parsing.
8//!
9//! One enum + one subscription alias per family, paralleling the
10//! four-family allow-list in [`crate::stream_subscribe::StreamFamily`]:
11//!
12//! | Family           | Event enum                 | Subscription alias           |
13//! |------------------|----------------------------|------------------------------|
14//! | LeaseHistory     | [`LeaseHistoryEvent`]      | [`LeaseHistorySubscription`] |
15//! | Completion       | [`CompletionEvent`]        | [`CompletionSubscription`]   |
16//! | SignalDelivery   | [`SignalDeliveryEvent`]    | [`SignalDeliverySubscription`] |
17//! | InstanceTags     | [`InstanceTagEvent`]       | [`InstanceTagSubscription`]  |
18//!
19//! Each event carries the same inline-hot fields the Stage A
20//! `StreamEvent` envelope carried (`cursor`, `execution_id` where
21//! applicable, `timestamp`) plus family-specific fields promoted out
22//! of the old opaque payload.
23//!
24//! `#[non_exhaustive]` is applied to every variant-bearing enum and
25//! every event struct so new variants / fields land additively. The
26//! owner-adjudicated v0.9 four-family allow-list stays in force — new
27//! families require an RFC amendment.
28
29use std::pin::Pin;
30
31use tokio_stream::Stream;
32
33use crate::engine_error::EngineError;
34use crate::stream_subscribe::StreamCursor;
35use crate::types::{ExecutionId, LeaseId, SignalId, TimestampMs, WaitpointId, WorkerInstanceId};
36
37// Rustdoc reminder: every `#[non_exhaustive]` struct below MUST pair
38// with a public constructor so external backend crates (ff-backend-
39// valkey, ff-backend-postgres) can still build instances. Adding a
40// new field to a non_exhaustive struct is additive at call sites that
41// use the constructor; callers that build via struct literal would
42// break, which is exactly what `non_exhaustive` is there to prevent.
43
44// ─── LeaseHistory ─────────────────────────────────────────────────
45
46/// Per-event payload of `subscribe_lease_history`.
47///
48/// Each variant carries the fence triple fields the Lua producer
49/// writes (`lease_id`, plus attempt/worker correlation where
50/// available) and the monotonic `at` timestamp derived from the
51/// backend's native stream id / event id.
52///
53/// `revoked_by` is `String` today for forward compatibility; we may
54/// promote to a typed enum post-v0.10 once the revocation-source
55/// taxonomy settles. Known producers emit `"operator"`,
56/// `"reconciler"`, or `"backend"` today.
57#[non_exhaustive]
58#[derive(Clone, Debug, PartialEq, Eq)]
59pub enum LeaseHistoryEvent {
60    Acquired {
61        cursor: StreamCursor,
62        execution_id: ExecutionId,
63        /// Valkey populates this from the Lua producer; Postgres
64        /// outbox may carry `None` today because the attempt row
65        /// identity is `(lease_epoch, attempt_index, execution_id)`
66        /// rather than a stable uuid (see `lease_event::emit`).
67        lease_id: Option<LeaseId>,
68        worker_instance_id: Option<WorkerInstanceId>,
69        at: TimestampMs,
70    },
71    Renewed {
72        cursor: StreamCursor,
73        execution_id: ExecutionId,
74        /// Same availability caveat as `Acquired::lease_id`.
75        lease_id: Option<LeaseId>,
76        worker_instance_id: Option<WorkerInstanceId>,
77        at: TimestampMs,
78    },
79    Expired {
80        cursor: StreamCursor,
81        execution_id: ExecutionId,
82        lease_id: Option<LeaseId>,
83        prev_owner: Option<WorkerInstanceId>,
84        at: TimestampMs,
85    },
86    Reclaimed {
87        cursor: StreamCursor,
88        execution_id: ExecutionId,
89        /// Same availability caveat as `Acquired::lease_id`.
90        new_lease_id: Option<LeaseId>,
91        new_owner: Option<WorkerInstanceId>,
92        at: TimestampMs,
93    },
94    Revoked {
95        cursor: StreamCursor,
96        execution_id: ExecutionId,
97        lease_id: Option<LeaseId>,
98        /// String today for forward compat; may promote to typed
99        /// enum post-v0.10 once taxonomy settles. Known values:
100        /// `"operator"`, `"reconciler"`, `"backend"`.
101        revoked_by: String,
102        at: TimestampMs,
103    },
104}
105
106impl LeaseHistoryEvent {
107    /// Position cursor to persist + replay from.
108    pub fn cursor(&self) -> &StreamCursor {
109        match self {
110            Self::Acquired { cursor, .. }
111            | Self::Renewed { cursor, .. }
112            | Self::Expired { cursor, .. }
113            | Self::Reclaimed { cursor, .. }
114            | Self::Revoked { cursor, .. } => cursor,
115        }
116    }
117
118    /// Execution the event pertains to.
119    pub fn execution_id(&self) -> &ExecutionId {
120        match self {
121            Self::Acquired { execution_id, .. }
122            | Self::Renewed { execution_id, .. }
123            | Self::Expired { execution_id, .. }
124            | Self::Reclaimed { execution_id, .. }
125            | Self::Revoked { execution_id, .. } => execution_id,
126        }
127    }
128
129    /// Monotonic backend-stamped time of the event.
130    pub fn at(&self) -> TimestampMs {
131        match self {
132            Self::Acquired { at, .. }
133            | Self::Renewed { at, .. }
134            | Self::Expired { at, .. }
135            | Self::Reclaimed { at, .. }
136            | Self::Revoked { at, .. } => *at,
137        }
138    }
139}
140
141/// Stream of typed lease-history events.
142pub type LeaseHistorySubscription =
143    Pin<Box<dyn Stream<Item = Result<LeaseHistoryEvent, EngineError>> + Send>>;
144
145// ─── Completion ──────────────────────────────────────────────────
146
147/// Outcome classification for a completion event.
148///
149/// `#[non_exhaustive]` so future terminal states (e.g. `Suspended`
150/// promoted to a terminal distinct from `Cancelled`) land additively.
151#[non_exhaustive]
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub enum CompletionOutcome {
154    Success,
155    Failure,
156    Cancelled,
157    /// Outcome the backend surfaced but this crate version does not
158    /// recognise. The raw string is retained so operator tooling can
159    /// still log it.
160    Other(String),
161}
162
163impl CompletionOutcome {
164    /// Map a wire-string outcome to the typed enum. Unknown strings
165    /// fall through to [`CompletionOutcome::Other`] rather than an
166    /// error — completion events are advisory and a new producer
167    /// variant must not crash old subscribers.
168    pub fn from_wire(s: &str) -> Self {
169        match s {
170            "success" => Self::Success,
171            "failure" => Self::Failure,
172            "cancelled" => Self::Cancelled,
173            other => Self::Other(other.to_string()),
174        }
175    }
176}
177
178/// Per-event payload of `subscribe_completion`.
179///
180/// Completion events are terminal state transitions surfaced to
181/// downstream DAG consumers. Postgres carries a durable event-id
182/// cursor; Valkey Stage B rides pubsub + always yields
183/// [`StreamCursor::empty`] (no durable replay).
184#[non_exhaustive]
185#[derive(Clone, Debug)]
186pub struct CompletionEvent {
187    pub cursor: StreamCursor,
188    pub execution_id: ExecutionId,
189    pub outcome: CompletionOutcome,
190    pub at: TimestampMs,
191}
192
193impl CompletionEvent {
194    /// Construct a `CompletionEvent`. Backend adapters go through
195    /// this constructor instead of struct-literal syntax so future
196    /// additive fields land as builder methods without breaking
197    /// call sites.
198    pub fn new(
199        cursor: StreamCursor,
200        execution_id: ExecutionId,
201        outcome: CompletionOutcome,
202        at: TimestampMs,
203    ) -> Self {
204        Self {
205            cursor,
206            execution_id,
207            outcome,
208            at,
209        }
210    }
211}
212
213/// Stream of typed completion events.
214pub type CompletionSubscription =
215    Pin<Box<dyn Stream<Item = Result<CompletionEvent, EngineError>> + Send>>;
216
217// ─── SignalDelivery ──────────────────────────────────────────────
218
219/// Effect of a signal delivery on the target waitpoint.
220///
221/// `#[non_exhaustive]` for future effects (e.g. `Throttled`,
222/// `Coalesced`).
223#[non_exhaustive]
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub enum SignalDeliveryEffect {
226    /// The signal was delivered and its waitpoint transitioned to
227    /// satisfied.
228    Satisfied,
229    /// The signal was buffered against a pending waitpoint.
230    Buffered,
231    /// The signal was deduped against an earlier delivery with the
232    /// same idempotency key.
233    Deduped,
234    /// Effect surfaced by the backend but not recognised by this
235    /// crate version. Raw string preserved for observability.
236    Other(String),
237}
238
239impl SignalDeliveryEffect {
240    pub fn from_wire(s: &str) -> Self {
241        match s {
242            "satisfied" => Self::Satisfied,
243            "buffered" => Self::Buffered,
244            "deduped" => Self::Deduped,
245            other => Self::Other(other.to_string()),
246        }
247    }
248}
249
250/// Per-event payload of `subscribe_signal_delivery`.
251#[non_exhaustive]
252#[derive(Clone, Debug)]
253pub struct SignalDeliveryEvent {
254    pub cursor: StreamCursor,
255    pub execution_id: ExecutionId,
256    pub signal_id: SignalId,
257    pub waitpoint_id: Option<WaitpointId>,
258    pub source_identity: Option<String>,
259    pub effect: SignalDeliveryEffect,
260    pub at: TimestampMs,
261}
262
263impl SignalDeliveryEvent {
264    /// Construct a `SignalDeliveryEvent`. Backend adapters use this
265    /// constructor so future additive fields are non-breaking.
266    pub fn new(
267        cursor: StreamCursor,
268        execution_id: ExecutionId,
269        signal_id: SignalId,
270        waitpoint_id: Option<WaitpointId>,
271        source_identity: Option<String>,
272        effect: SignalDeliveryEffect,
273        at: TimestampMs,
274    ) -> Self {
275        Self {
276            cursor,
277            execution_id,
278            signal_id,
279            waitpoint_id,
280            source_identity,
281            effect,
282            at,
283        }
284    }
285}
286
287/// Stream of typed signal-delivery events.
288pub type SignalDeliverySubscription =
289    Pin<Box<dyn Stream<Item = Result<SignalDeliveryEvent, EngineError>> + Send>>;
290
291// ─── InstanceTags ────────────────────────────────────────────────
292
293/// Per-event payload of `subscribe_instance_tags`.
294///
295/// Producer wiring is deferred (see #311); trait-level surface exists
296/// for API uniformity across the four families. Every backend's
297/// default impl returns `EngineError::Unavailable`.
298#[non_exhaustive]
299#[derive(Clone, Debug, PartialEq, Eq)]
300pub enum InstanceTagEvent {
301    /// Tag attached to an instance-scoped grouping.
302    Attached {
303        cursor: StreamCursor,
304        execution_id: ExecutionId,
305        tag: String,
306        at: TimestampMs,
307    },
308    /// Tag cleared from an instance-scoped grouping.
309    Cleared {
310        cursor: StreamCursor,
311        execution_id: ExecutionId,
312        tag: String,
313        at: TimestampMs,
314    },
315}
316
317/// Stream of typed instance-tag events.
318pub type InstanceTagSubscription =
319    Pin<Box<dyn Stream<Item = Result<InstanceTagEvent, EngineError>> + Send>>;
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324
325    #[test]
326    fn completion_outcome_wire_round_trip() {
327        assert_eq!(CompletionOutcome::from_wire("success"), CompletionOutcome::Success);
328        assert_eq!(CompletionOutcome::from_wire("failure"), CompletionOutcome::Failure);
329        assert_eq!(CompletionOutcome::from_wire("cancelled"), CompletionOutcome::Cancelled);
330        match CompletionOutcome::from_wire("timed_out") {
331            CompletionOutcome::Other(s) => assert_eq!(s, "timed_out"),
332            other => panic!("expected Other, got {other:?}"),
333        }
334    }
335
336    #[test]
337    fn signal_delivery_effect_wire_round_trip() {
338        assert_eq!(
339            SignalDeliveryEffect::from_wire("satisfied"),
340            SignalDeliveryEffect::Satisfied
341        );
342        assert_eq!(
343            SignalDeliveryEffect::from_wire("buffered"),
344            SignalDeliveryEffect::Buffered
345        );
346        assert_eq!(
347            SignalDeliveryEffect::from_wire("deduped"),
348            SignalDeliveryEffect::Deduped
349        );
350        match SignalDeliveryEffect::from_wire("throttled") {
351            SignalDeliveryEffect::Other(s) => assert_eq!(s, "throttled"),
352            other => panic!("expected Other, got {other:?}"),
353        }
354    }
355
356    #[test]
357    fn lease_history_event_accessors() {
358        use crate::types::ExecutionId;
359        let cursor = StreamCursor::new(vec![0x01, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 42]);
360        let exec = ExecutionId::parse("{fp:0}:11111111-1111-4111-8111-111111111111").unwrap();
361        let ev = LeaseHistoryEvent::Expired {
362            cursor: cursor.clone(),
363            execution_id: exec.clone(),
364            lease_id: None,
365            prev_owner: None,
366            at: TimestampMs(1_700_000_000_000),
367        };
368        assert_eq!(ev.cursor(), &cursor);
369        assert_eq!(ev.execution_id(), &exec);
370        assert_eq!(ev.at(), TimestampMs(1_700_000_000_000));
371    }
372}