ff_core/stream_events.rs
1//! RFC-019 Stage C — Typed stream events.
2//!
3//! Stage A/B shipped the cursor machinery + backend adapters emitting
4//! `StreamEvent { payload: Bytes, … }`, forcing every consumer to
5//! parse a backend-shaped byte blob (Valkey: NUL-delimited field map,
6//! Postgres: `serde_json`). This module promotes each family to a
7//! typed enum so consumers `match` instead of parsing.
8//!
9//! One enum + one subscription alias per family, paralleling the
10//! four-family allow-list in [`crate::stream_subscribe::StreamFamily`]:
11//!
12//! | Family | Event enum | Subscription alias |
13//! |------------------|----------------------------|------------------------------|
14//! | LeaseHistory | [`LeaseHistoryEvent`] | [`LeaseHistorySubscription`] |
15//! | Completion | [`CompletionEvent`] | [`CompletionSubscription`] |
16//! | SignalDelivery | [`SignalDeliveryEvent`] | [`SignalDeliverySubscription`] |
17//! | InstanceTags | [`InstanceTagEvent`] | [`InstanceTagSubscription`] |
18//!
19//! Each event carries the same inline-hot fields the Stage A
20//! `StreamEvent` envelope carried (`cursor`, `execution_id` where
21//! applicable, `timestamp`) plus family-specific fields promoted out
22//! of the old opaque payload.
23//!
24//! `#[non_exhaustive]` is applied to every variant-bearing enum and
25//! every event struct so new variants / fields land additively. The
26//! owner-adjudicated v0.9 four-family allow-list stays in force — new
27//! families require an RFC amendment.
28
29use std::pin::Pin;
30
31use tokio_stream::Stream;
32
33use crate::engine_error::EngineError;
34use crate::stream_subscribe::StreamCursor;
35use crate::types::{ExecutionId, LeaseId, SignalId, TimestampMs, WaitpointId, WorkerInstanceId};
36
37// Rustdoc reminder: every `#[non_exhaustive]` struct below MUST pair
38// with a public constructor so external backend crates (ff-backend-
39// valkey, ff-backend-postgres) can still build instances. Adding a
40// new field to a non_exhaustive struct is additive at call sites that
41// use the constructor; callers that build via struct literal would
42// break, which is exactly what `non_exhaustive` is there to prevent.
43
44// ─── LeaseHistory ─────────────────────────────────────────────────
45
46/// Per-event payload of `subscribe_lease_history`.
47///
48/// Each variant carries the fence triple fields the Lua producer
49/// writes (`lease_id`, plus attempt/worker correlation where
50/// available) and the monotonic `at` timestamp derived from the
51/// backend's native stream id / event id.
52///
53/// `revoked_by` is `String` today for forward compatibility; we may
54/// promote to a typed enum post-v0.10 once the revocation-source
55/// taxonomy settles. Known producers emit `"operator"`,
56/// `"reconciler"`, or `"backend"` today.
57#[non_exhaustive]
58#[derive(Clone, Debug, PartialEq, Eq)]
59pub enum LeaseHistoryEvent {
60 Acquired {
61 cursor: StreamCursor,
62 execution_id: ExecutionId,
63 /// Valkey populates this from the Lua producer; Postgres
64 /// outbox may carry `None` today because the attempt row
65 /// identity is `(lease_epoch, attempt_index, execution_id)`
66 /// rather than a stable uuid (see `lease_event::emit`).
67 lease_id: Option<LeaseId>,
68 worker_instance_id: Option<WorkerInstanceId>,
69 at: TimestampMs,
70 },
71 Renewed {
72 cursor: StreamCursor,
73 execution_id: ExecutionId,
74 /// Same availability caveat as `Acquired::lease_id`.
75 lease_id: Option<LeaseId>,
76 worker_instance_id: Option<WorkerInstanceId>,
77 at: TimestampMs,
78 },
79 Expired {
80 cursor: StreamCursor,
81 execution_id: ExecutionId,
82 lease_id: Option<LeaseId>,
83 prev_owner: Option<WorkerInstanceId>,
84 at: TimestampMs,
85 },
86 Reclaimed {
87 cursor: StreamCursor,
88 execution_id: ExecutionId,
89 /// Same availability caveat as `Acquired::lease_id`.
90 new_lease_id: Option<LeaseId>,
91 new_owner: Option<WorkerInstanceId>,
92 at: TimestampMs,
93 },
94 Revoked {
95 cursor: StreamCursor,
96 execution_id: ExecutionId,
97 lease_id: Option<LeaseId>,
98 /// String today for forward compat; may promote to typed
99 /// enum post-v0.10 once taxonomy settles. Known values:
100 /// `"operator"`, `"reconciler"`, `"backend"`.
101 revoked_by: String,
102 at: TimestampMs,
103 },
104}
105
106impl LeaseHistoryEvent {
107 /// Position cursor to persist + replay from.
108 pub fn cursor(&self) -> &StreamCursor {
109 match self {
110 Self::Acquired { cursor, .. }
111 | Self::Renewed { cursor, .. }
112 | Self::Expired { cursor, .. }
113 | Self::Reclaimed { cursor, .. }
114 | Self::Revoked { cursor, .. } => cursor,
115 }
116 }
117
118 /// Execution the event pertains to.
119 pub fn execution_id(&self) -> &ExecutionId {
120 match self {
121 Self::Acquired { execution_id, .. }
122 | Self::Renewed { execution_id, .. }
123 | Self::Expired { execution_id, .. }
124 | Self::Reclaimed { execution_id, .. }
125 | Self::Revoked { execution_id, .. } => execution_id,
126 }
127 }
128
129 /// Monotonic backend-stamped time of the event.
130 pub fn at(&self) -> TimestampMs {
131 match self {
132 Self::Acquired { at, .. }
133 | Self::Renewed { at, .. }
134 | Self::Expired { at, .. }
135 | Self::Reclaimed { at, .. }
136 | Self::Revoked { at, .. } => *at,
137 }
138 }
139}
140
141/// Stream of typed lease-history events.
142pub type LeaseHistorySubscription =
143 Pin<Box<dyn Stream<Item = Result<LeaseHistoryEvent, EngineError>> + Send>>;
144
145// ─── Completion ──────────────────────────────────────────────────
146
147/// Outcome classification for a completion event.
148///
149/// `#[non_exhaustive]` so future terminal states (e.g. `Suspended`
150/// promoted to a terminal distinct from `Cancelled`) land additively.
151#[non_exhaustive]
152#[derive(Clone, Debug, PartialEq, Eq)]
153pub enum CompletionOutcome {
154 Success,
155 Failure,
156 Cancelled,
157 /// Outcome the backend surfaced but this crate version does not
158 /// recognise. The raw string is retained so operator tooling can
159 /// still log it.
160 Other(String),
161}
162
163impl CompletionOutcome {
164 /// Map a wire-string outcome to the typed enum. Unknown strings
165 /// fall through to [`CompletionOutcome::Other`] rather than an
166 /// error — completion events are advisory and a new producer
167 /// variant must not crash old subscribers.
168 pub fn from_wire(s: &str) -> Self {
169 match s {
170 "success" => Self::Success,
171 "failure" => Self::Failure,
172 "cancelled" => Self::Cancelled,
173 other => Self::Other(other.to_string()),
174 }
175 }
176}
177
178/// Per-event payload of `subscribe_completion`.
179///
180/// Completion events are terminal state transitions surfaced to
181/// downstream DAG consumers. Postgres carries a durable event-id
182/// cursor; Valkey Stage B rides pubsub + always yields
183/// [`StreamCursor::empty`] (no durable replay).
184#[non_exhaustive]
185#[derive(Clone, Debug)]
186pub struct CompletionEvent {
187 pub cursor: StreamCursor,
188 pub execution_id: ExecutionId,
189 pub outcome: CompletionOutcome,
190 pub at: TimestampMs,
191}
192
193impl CompletionEvent {
194 /// Construct a `CompletionEvent`. Backend adapters go through
195 /// this constructor instead of struct-literal syntax so future
196 /// additive fields land as builder methods without breaking
197 /// call sites.
198 pub fn new(
199 cursor: StreamCursor,
200 execution_id: ExecutionId,
201 outcome: CompletionOutcome,
202 at: TimestampMs,
203 ) -> Self {
204 Self {
205 cursor,
206 execution_id,
207 outcome,
208 at,
209 }
210 }
211}
212
213/// Stream of typed completion events.
214pub type CompletionSubscription =
215 Pin<Box<dyn Stream<Item = Result<CompletionEvent, EngineError>> + Send>>;
216
217// ─── SignalDelivery ──────────────────────────────────────────────
218
219/// Effect of a signal delivery on the target waitpoint.
220///
221/// `#[non_exhaustive]` for future effects (e.g. `Throttled`,
222/// `Coalesced`).
223#[non_exhaustive]
224#[derive(Clone, Debug, PartialEq, Eq)]
225pub enum SignalDeliveryEffect {
226 /// The signal was delivered and its waitpoint transitioned to
227 /// satisfied.
228 Satisfied,
229 /// The signal was buffered against a pending waitpoint.
230 Buffered,
231 /// The signal was deduped against an earlier delivery with the
232 /// same idempotency key.
233 Deduped,
234 /// Effect surfaced by the backend but not recognised by this
235 /// crate version. Raw string preserved for observability.
236 Other(String),
237}
238
239impl SignalDeliveryEffect {
240 pub fn from_wire(s: &str) -> Self {
241 match s {
242 "satisfied" => Self::Satisfied,
243 "buffered" => Self::Buffered,
244 "deduped" => Self::Deduped,
245 other => Self::Other(other.to_string()),
246 }
247 }
248}
249
250/// Per-event payload of `subscribe_signal_delivery`.
251#[non_exhaustive]
252#[derive(Clone, Debug)]
253pub struct SignalDeliveryEvent {
254 pub cursor: StreamCursor,
255 pub execution_id: ExecutionId,
256 pub signal_id: SignalId,
257 pub waitpoint_id: Option<WaitpointId>,
258 pub source_identity: Option<String>,
259 pub effect: SignalDeliveryEffect,
260 pub at: TimestampMs,
261}
262
263impl SignalDeliveryEvent {
264 /// Construct a `SignalDeliveryEvent`. Backend adapters use this
265 /// constructor so future additive fields are non-breaking.
266 pub fn new(
267 cursor: StreamCursor,
268 execution_id: ExecutionId,
269 signal_id: SignalId,
270 waitpoint_id: Option<WaitpointId>,
271 source_identity: Option<String>,
272 effect: SignalDeliveryEffect,
273 at: TimestampMs,
274 ) -> Self {
275 Self {
276 cursor,
277 execution_id,
278 signal_id,
279 waitpoint_id,
280 source_identity,
281 effect,
282 at,
283 }
284 }
285}
286
287/// Stream of typed signal-delivery events.
288pub type SignalDeliverySubscription =
289 Pin<Box<dyn Stream<Item = Result<SignalDeliveryEvent, EngineError>> + Send>>;
290
291// ─── InstanceTags ────────────────────────────────────────────────
292
293/// Per-event payload of `subscribe_instance_tags`.
294///
295/// Producer wiring is deferred (see #311); trait-level surface exists
296/// for API uniformity across the four families. Every backend's
297/// default impl returns `EngineError::Unavailable`.
298#[non_exhaustive]
299#[derive(Clone, Debug, PartialEq, Eq)]
300pub enum InstanceTagEvent {
301 /// Tag attached to an instance-scoped grouping.
302 Attached {
303 cursor: StreamCursor,
304 execution_id: ExecutionId,
305 tag: String,
306 at: TimestampMs,
307 },
308 /// Tag cleared from an instance-scoped grouping.
309 Cleared {
310 cursor: StreamCursor,
311 execution_id: ExecutionId,
312 tag: String,
313 at: TimestampMs,
314 },
315}
316
317/// Stream of typed instance-tag events.
318pub type InstanceTagSubscription =
319 Pin<Box<dyn Stream<Item = Result<InstanceTagEvent, EngineError>> + Send>>;
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324
325 #[test]
326 fn completion_outcome_wire_round_trip() {
327 assert_eq!(CompletionOutcome::from_wire("success"), CompletionOutcome::Success);
328 assert_eq!(CompletionOutcome::from_wire("failure"), CompletionOutcome::Failure);
329 assert_eq!(CompletionOutcome::from_wire("cancelled"), CompletionOutcome::Cancelled);
330 match CompletionOutcome::from_wire("timed_out") {
331 CompletionOutcome::Other(s) => assert_eq!(s, "timed_out"),
332 other => panic!("expected Other, got {other:?}"),
333 }
334 }
335
336 #[test]
337 fn signal_delivery_effect_wire_round_trip() {
338 assert_eq!(
339 SignalDeliveryEffect::from_wire("satisfied"),
340 SignalDeliveryEffect::Satisfied
341 );
342 assert_eq!(
343 SignalDeliveryEffect::from_wire("buffered"),
344 SignalDeliveryEffect::Buffered
345 );
346 assert_eq!(
347 SignalDeliveryEffect::from_wire("deduped"),
348 SignalDeliveryEffect::Deduped
349 );
350 match SignalDeliveryEffect::from_wire("throttled") {
351 SignalDeliveryEffect::Other(s) => assert_eq!(s, "throttled"),
352 other => panic!("expected Other, got {other:?}"),
353 }
354 }
355
356 #[test]
357 fn lease_history_event_accessors() {
358 use crate::types::ExecutionId;
359 let cursor = StreamCursor::new(vec![0x01, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 42]);
360 let exec = ExecutionId::parse("{fp:0}:11111111-1111-4111-8111-111111111111").unwrap();
361 let ev = LeaseHistoryEvent::Expired {
362 cursor: cursor.clone(),
363 execution_id: exec.clone(),
364 lease_id: None,
365 prev_owner: None,
366 at: TimestampMs(1_700_000_000_000),
367 };
368 assert_eq!(ev.cursor(), &cursor);
369 assert_eq!(ev.execution_id(), &exec);
370 assert_eq!(ev.at(), TimestampMs(1_700_000_000_000));
371 }
372}