ff_core/stream_subscribe.rs
1//! RFC-019 Stage A — cross-backend stream-cursor subscription surface.
2//!
3//! `EngineBackend::subscribe_{lease_history,completion,signal_delivery,
4//! instance_tags}` return a [`StreamSubscription`] — a pinned
5//! `tokio_stream::Stream` of `Result<StreamEvent, EngineError>`. The
6//! cursor is an opaque byte blob the consumer persists across crashes
7//! and hands back on resume; the first byte identifies the backend
8//! family + version so cursors stay stable across backend upgrades.
9//!
10//! This module defines the wire types only — no trait default is here;
11//! defaults live on the `EngineBackend` trait in
12//! [`crate::engine_backend`]. Real impls live in `ff-backend-valkey`
13//! (Valkey `XREAD BLOCK`-backed lease_history) and `ff-backend-postgres`
14//! (`LISTEN/NOTIFY`-backed completion) per RFC-019 §Implementation Plan.
15//!
16//! Four-family allow-list (RFC-019 §Open Questions #5, owner-
17//! adjudicated 2026-04-24): new families require an RFC amendment.
18
19use std::pin::Pin;
20
21use bytes::Bytes;
22use tokio_stream::Stream;
23
24use crate::engine_error::EngineError;
25use crate::types::{ExecutionId, TimestampMs};
26
27/// Opaque, backend-versioned cursor. Consumers persist the bytes, hand
28/// them back on resume. The first byte MUST encode a backend-family +
29/// version prefix so cursors stay stable across backend upgrades
30/// (Valkey: `0x01`, Postgres: `0x02`, …). The remainder is backend-
31/// specific.
32///
33/// [`StreamCursor::empty`] is the "start from tail / now" sentinel
34/// every backend accepts. Cursor byte order is owner-adjudicated
35/// opaque — consumers MUST NOT parse the bytes.
36#[derive(Clone, Debug, Eq, PartialEq)]
37pub struct StreamCursor(pub Bytes);
38
39impl StreamCursor {
40 /// Wrap arbitrary cursor bytes — typically only called by backend
41 /// impls that just encoded a fresh cursor from a backend-native
42 /// position (Valkey stream id, Postgres event_id).
43 pub fn new(raw: impl Into<Bytes>) -> Self {
44 Self(raw.into())
45 }
46
47 /// Borrow the raw cursor bytes for persistence. Consumers treat
48 /// the slice as opaque.
49 pub fn as_bytes(&self) -> &[u8] {
50 &self.0
51 }
52
53 /// Sentinel "start from tail" cursor. Backends interpret an empty
54 /// cursor as "subscribe from now" (Valkey: XREAD `$`; Postgres:
55 /// LISTEN from the current `max(event_id)`).
56 pub fn empty() -> Self {
57 Self(Bytes::new())
58 }
59}
60
61/// Event families covered by the v0.9 allow-list (RFC-019 §Open
62/// Questions #5). `#[non_exhaustive]` so v0.10+ families land without
63/// breaking consumer match blocks, but the owner-adjudicated stance is
64/// that new families require an RFC amendment — this is not a generic
65/// escape hatch.
66#[non_exhaustive]
67#[derive(Clone, Copy, Debug, Eq, PartialEq)]
68pub enum StreamFamily {
69 LeaseHistory,
70 Completion,
71 SignalDelivery,
72 InstanceTags,
73}
74
75/// Per-event payload.
76///
77/// - `family` identifies the event shape.
78/// - `cursor` is the position this event occupies in the stream;
79/// consumers persist it + hand it back on reconnect so replay begins
80/// strictly after this event.
81/// - `execution_id`, `attempt_index`, `timestamp` are inline hot fields
82/// (RFC-019 §Open Questions #4, owner-adjudicated inline) so common
83/// consumers do not need a follow-up `describe_execution` RPC.
84/// - `payload` is the family-specific binary event body. Schema is
85/// the backend's (not stabilised in Stage A — consumers parse via
86/// family-specific helpers Stage B will ship).
87///
88/// `#[non_exhaustive]` so future inline metadata (e.g. `namespace`)
89/// adds without a breaking change.
90#[non_exhaustive]
91#[derive(Clone, Debug)]
92pub struct StreamEvent {
93 pub family: StreamFamily,
94 pub cursor: StreamCursor,
95 pub execution_id: Option<ExecutionId>,
96 pub attempt_index: Option<u32>,
97 pub timestamp: TimestampMs,
98 pub payload: Bytes,
99}
100
101impl StreamEvent {
102 /// Construct a minimal `StreamEvent`. The struct is
103 /// `#[non_exhaustive]` so external crates cannot build via a
104 /// literal — backends go through this constructor + builder.
105 /// Optional inline metadata is added via `with_execution_id` /
106 /// `with_attempt_index` so call-site shape stays stable across
107 /// future field additions.
108 pub fn new(
109 family: StreamFamily,
110 cursor: StreamCursor,
111 timestamp: TimestampMs,
112 payload: Bytes,
113 ) -> Self {
114 Self {
115 family,
116 cursor,
117 execution_id: None,
118 attempt_index: None,
119 timestamp,
120 payload,
121 }
122 }
123
124 #[must_use]
125 pub fn with_execution_id(mut self, id: ExecutionId) -> Self {
126 self.execution_id = Some(id);
127 self
128 }
129
130 #[must_use]
131 pub fn with_attempt_index(mut self, idx: u32) -> Self {
132 self.attempt_index = Some(idx);
133 self
134 }
135}
136
137/// Shape of a subscription stream.
138///
139/// Errors surface as `Err` items (not panics / stream-end); the
140/// terminal `Err(EngineError::StreamDisconnected { cursor })` is the
141/// owner-adjudicated disconnect contract (RFC-019 §Open Questions #2):
142/// the consumer reconnects by re-calling the relevant `subscribe_*`
143/// method with the cursor they observed. Non-terminal errors may be
144/// followed by further events.
145pub type StreamSubscription =
146 Pin<Box<dyn Stream<Item = Result<StreamEvent, EngineError>> + Send>>;
147
148// ─── Valkey cursor codec ───
149//
150// Layout: `0x01` ++ ms_be(8) ++ seq_be(8). 17 bytes total. Empty
151// `StreamCursor` (zero-length) means "tail from `$`" (current end).
152//
153// These helpers live in ff-core rather than ff-backend-valkey because
154// the wire shape is part of the RFC-019 contract — a Postgres cursor
155// can never decode as a Valkey cursor (different first byte) and
156// consumers migrating between backends discard + restart cleanly.
157
158/// Valkey family prefix byte (RFC-019 §Backend Semantics Appendix).
159pub const VALKEY_CURSOR_PREFIX: u8 = 0x01;
160
161/// Postgres family prefix byte.
162pub const POSTGRES_CURSOR_PREFIX: u8 = 0x02;
163
164/// Encode a Valkey stream id `(ms, seq)` into a cursor.
165pub fn encode_valkey_cursor(ms: u64, seq: u64) -> StreamCursor {
166 let mut buf = Vec::with_capacity(17);
167 buf.push(VALKEY_CURSOR_PREFIX);
168 buf.extend_from_slice(&ms.to_be_bytes());
169 buf.extend_from_slice(&seq.to_be_bytes());
170 StreamCursor::new(buf)
171}
172
173/// Decode a Valkey cursor back to `(ms, seq)`. Returns `None` for the
174/// empty / tail-from-now cursor. Returns `Err` for malformed bytes
175/// (wrong prefix, truncated).
176pub fn decode_valkey_cursor(cursor: &StreamCursor) -> Result<Option<(u64, u64)>, &'static str> {
177 let bytes = cursor.as_bytes();
178 if bytes.is_empty() {
179 return Ok(None);
180 }
181 if bytes.len() != 17 || bytes[0] != VALKEY_CURSOR_PREFIX {
182 return Err("stream_subscribe: cursor does not belong to the Valkey backend");
183 }
184 let mut ms = [0u8; 8];
185 let mut seq = [0u8; 8];
186 ms.copy_from_slice(&bytes[1..9]);
187 seq.copy_from_slice(&bytes[9..17]);
188 Ok(Some((u64::from_be_bytes(ms), u64::from_be_bytes(seq))))
189}
190
191/// Encode a Postgres `event_id` (i64, from `ff_completion_event`) into
192/// a cursor.
193pub fn encode_postgres_event_cursor(event_id: i64) -> StreamCursor {
194 let mut buf = Vec::with_capacity(9);
195 buf.push(POSTGRES_CURSOR_PREFIX);
196 buf.extend_from_slice(&event_id.to_be_bytes());
197 StreamCursor::new(buf)
198}
199
200/// Decode a Postgres event cursor back to `event_id`. `None` =
201/// start-from-tail.
202pub fn decode_postgres_event_cursor(cursor: &StreamCursor) -> Result<Option<i64>, &'static str> {
203 let bytes = cursor.as_bytes();
204 if bytes.is_empty() {
205 return Ok(None);
206 }
207 if bytes.len() != 9 || bytes[0] != POSTGRES_CURSOR_PREFIX {
208 return Err("stream_subscribe: cursor does not belong to the Postgres backend");
209 }
210 let mut event_id = [0u8; 8];
211 event_id.copy_from_slice(&bytes[1..9]);
212 Ok(Some(i64::from_be_bytes(event_id)))
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn valkey_cursor_roundtrip() {
221 let c = encode_valkey_cursor(1_700_000_000_000, 42);
222 assert_eq!(c.as_bytes()[0], VALKEY_CURSOR_PREFIX);
223 let (ms, seq) = decode_valkey_cursor(&c).unwrap().unwrap();
224 assert_eq!(ms, 1_700_000_000_000);
225 assert_eq!(seq, 42);
226 }
227
228 #[test]
229 fn valkey_empty_is_tail() {
230 let c = StreamCursor::empty();
231 assert!(decode_valkey_cursor(&c).unwrap().is_none());
232 }
233
234 #[test]
235 fn valkey_rejects_postgres_cursor() {
236 let c = encode_postgres_event_cursor(7);
237 assert!(decode_valkey_cursor(&c).is_err());
238 }
239
240 #[test]
241 fn postgres_cursor_roundtrip() {
242 let c = encode_postgres_event_cursor(12345);
243 assert_eq!(c.as_bytes()[0], POSTGRES_CURSOR_PREFIX);
244 assert_eq!(decode_postgres_event_cursor(&c).unwrap(), Some(12345));
245 }
246
247 #[test]
248 fn postgres_rejects_valkey_cursor() {
249 let c = encode_valkey_cursor(1, 1);
250 assert!(decode_postgres_event_cursor(&c).is_err());
251 }
252}